Uniswap v2 reading swaps real-time (minimal)

This is a minimal example code for showing live swaps happening on Uniswap v2 compatible examples.

  • This example runs on a free Polygon JSON-RPC node.

  • It will print out live trade events for all Uniswap v2 compatible exchanges. This includes QuickSwap, Sushi and couple of others. See the full DEX list here.

  • It demonstrates the chain reorganisation detection and event reader API.

  • See the more complete example

To run:

python scripts/uniswap-v2-swaps-live-minimal.py

Example output:

Swap at block:42,549,528 tx:0x2011d03b4f3d80992339eb6303b0b7b86ec77f629ce7f2508344e739c4536cc7
Swap at block:42,549,528 tx:0x67af6d9d28634747d83f14d48bdc3d56421df7b686055c4519850b97e863291d
.
Block 42,549,527 is 0x83fd3f8dfd6065fcc3406fed9e81b069a45cf0e823fe4863c89a5e9cef49bdc6
Block 42,549,528 is 0xb29ce58ad7267b5c9906eea32aeacf043965a7223a35e0aa3c495dcdf3815eac
.
.
Swap at block:42,549,529 tx:0xa9ba7e61c1bdedf53b657419874d528b4164b9e286c3baf162f20f8d1c428b80
.
Block 42,549,529 is 0x686ebaa6ac7fa5f32aedbdc05e1352f681072acb46c0c158b779afd8e0fce21f
.
.
"""Minimal example of a chain reader with chain reorganisation detection.

.. code-block:: shell

    python scripts/read-uniswap-v2-pairs-and-swaps-live.py

"""

import os
import time

from web3 import HTTPProvider, Web3

from eth_defi.abi import get_contract
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.filter import Filter
from eth_defi.event_reader.reader import read_events, LogResult
from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor


def main():
    json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com")
    web3 = Web3(HTTPProvider(json_rpc_url))
    web3.middleware_onion.clear()
    install_chain_middleware(web3)

    # Get contracts
    Pair = get_contract(web3, "sushi/UniswapV2Pair.json")

    filter = Filter.create_filter(address=None, event_types=[Pair.events.Swap])  # Listen events from any smart contract

    reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3)

    # Get the headers of last 5 blocks before starting
    reorg_mon.load_initial_block_headers(block_count=5)

    processed_events = set()

    latest_block = reorg_mon.get_last_block_live()

    # Keep reading events as they land
    while True:
        chain_reorg_resolution = reorg_mon.update_chain()
        start, end = chain_reorg_resolution.get_read_range()

        if chain_reorg_resolution.reorg_detected:
            print("Chain reorg warning")

        evt: LogResult
        for evt in read_events(
            web3,
            start_block=start,
            end_block=end,
            filter=filter,
        ):
            # How to uniquely identify EVM logs
            key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"]

            # The reader may cause duplicate events as the chain tip reorganises
            if key not in processed_events:
                print(f"Swap at block:{evt['blockNumber']:,} tx:{evt['transactionHash']}")
                processed_events.add(key)
        else:
            print(".")

        if end != latest_block:
            for block_num in range(latest_block + 1, end + 1):
                block_data = reorg_mon.get_block_by_number(block_num)
                print(f"Block {block_num:,} is {block_data.block_hash}")

            latest_block = end

        time.sleep(0.5)


if __name__ == "__main__":
    main()