Uniswap v2 reading swaps real-time (minimal)
This is a minimal example code for showing live swaps happening on Uniswap v2 compatible examples.
This example runs on a free Polygon JSON-RPC node.
It will print out live trade events for all Uniswap v2 compatible exchanges. This includes QuickSwap, Sushi and couple of others. See the full DEX list here.
It demonstrates the chain reorganisation detection and event reader API.
To run:
python scripts/uniswap-v2-swaps-live-minimal.py
Example output:
Swap at block:42,549,528 tx:0x2011d03b4f3d80992339eb6303b0b7b86ec77f629ce7f2508344e739c4536cc7
Swap at block:42,549,528 tx:0x67af6d9d28634747d83f14d48bdc3d56421df7b686055c4519850b97e863291d
.
Block 42,549,527 is 0x83fd3f8dfd6065fcc3406fed9e81b069a45cf0e823fe4863c89a5e9cef49bdc6
Block 42,549,528 is 0xb29ce58ad7267b5c9906eea32aeacf043965a7223a35e0aa3c495dcdf3815eac
.
.
Swap at block:42,549,529 tx:0xa9ba7e61c1bdedf53b657419874d528b4164b9e286c3baf162f20f8d1c428b80
.
Block 42,549,529 is 0x686ebaa6ac7fa5f32aedbdc05e1352f681072acb46c0c158b779afd8e0fce21f
.
.
"""Minimal example of a chain reader with chain reorganisation detection.
.. code-block:: shell
python scripts/read-uniswap-v2-pairs-and-swaps-live.py
"""
import os
import time
from web3 import HTTPProvider, Web3
from eth_defi.abi import get_contract
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.filter import Filter
from eth_defi.event_reader.reader import read_events, LogResult
from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor
def main():
json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com")
web3 = Web3(HTTPProvider(json_rpc_url))
web3.middleware_onion.clear()
install_chain_middleware(web3)
# Get contracts
Pair = get_contract(web3, "sushi/UniswapV2Pair.json")
filter = Filter.create_filter(address=None, event_types=[Pair.events.Swap]) # Listen events from any smart contract
reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3)
# Get the headers of last 5 blocks before starting
reorg_mon.load_initial_block_headers(block_count=5)
processed_events = set()
latest_block = reorg_mon.get_last_block_live()
# Keep reading events as they land
while True:
chain_reorg_resolution = reorg_mon.update_chain()
start, end = chain_reorg_resolution.get_read_range()
if chain_reorg_resolution.reorg_detected:
print("Chain reorg warning")
evt: LogResult
for evt in read_events(
web3,
start_block=start,
end_block=end,
filter=filter,
):
# How to uniquely identify EVM logs
key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"]
# The reader may cause duplicate events as the chain tip reorganises
if key not in processed_events:
print(f"Swap at block:{evt['blockNumber']:,} tx:{evt['transactionHash']}")
processed_events.add(key)
else:
print(".")
if end != latest_block:
for block_num in range(latest_block + 1, end + 1):
block_data = reorg_mon.get_block_by_number(block_num)
print(f"Block {block_num:,} is {block_data.block_hash}")
latest_block = end
time.sleep(0.5)
if __name__ == "__main__":
main()