Uniswap v2 live price with web3.py

Below is an example script that displays the real-time price of Uniswap v2 compatible trading pair in a terminal.

  • Display latest price

  • Time-weighted average price (TWAP)

  • Update the price for every new block in BNB Smart chain

  • Abort the application with CTRL+C

Uniswap v2 compatible DEXes include

  • PancakeSwap

  • TradeJoe

The example displays BNB/BUSD price from PancakeSwap.

Sample output:

Block 19,937,848 at 2022-07-28 06:16:16 current price:269.3162 WBNB/BUSD TWAP:269.3539 WBNB/BUSD
    Oracle data updates: Counter({'created': 6, 'discarded': 1, 'reorgs': 0}), trades in TWAP buffer:144, oldest:2022-07-28 06:11:16, newest:2022-07-28 06:16:13
"""Live Uniswap v2 swap event monitor with chain reorganisation detection.

- This example runs on free Polygon JSON-RPC nodes,
  you do not need any self-hosted or commercial node service providers.

- This is an modified example of `read-uniswap-v2-pairs-and-swaps.py` to support
  chain reorganisations, thus suitable for live event reading.

- It will print out live trade events for Uniswap v2 compatible exchange.

- This will also show how to track block headers on disk,
  so that next start up is faster.

- This is a dummy example just showing how to build the live loop,
  because how stores are constructed it is not good for processing
  actual data.

- Because pair and token details are dynamically fetched
  when a swap for a pair is encountered for the first time,
  the startup is a bit slow as the pair details cache
  is warming up.


To run for Polygon (and QuickSwap):

.. code-block:: shell

    # Need for nice output
    pip install coloredlogs

    # Switch between INFO and DEBUG
    export LOG_LEVEL=INFO
    # Your Ethereum node RPC
    export JSON_RPC_URL="https://polygon-rpc.com"
    python scripts/read-uniswap-v2-pairs-and-swaps-live.py


"""
import datetime
import os
import time
from pathlib import Path
import logging
from typing import Dict

import coloredlogs
import requests

from web3 import HTTPProvider, Web3

from eth_defi.abi import get_contract
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.block_time import measure_block_time
from eth_defi.event_reader.conversion import decode_data, convert_int256_bytes_to_int, convert_jsonrpc_value_to_int
from eth_defi.event_reader.csv_block_data_store import CSVDatasetBlockDataStore
from eth_defi.event_reader.fast_json_rpc import patch_web3
from eth_defi.event_reader.logresult import LogContext
from eth_defi.event_reader.reader import read_events, LogResult, prepare_filter
from eth_defi.event_reader.reorganisation_monitor import ChainReorganisationDetected, JSONRPCReorganisationMonitor
from eth_defi.token import fetch_erc20_details, TokenDetails
from eth_defi.uniswap_v2.pair import PairDetails, fetch_pair_details


#: List of output columns to swaps.csv
SWAP_FIELD_NAMES = [
    "block_number",
    "timestamp",
    "tx_hash",
    "log_index",
    "pair_contract_address",
    "amount0_in",
    "amount1_in",
    "amount0_out",
    "amount1_out",
]


logger = logging.getLogger(__name__)


class BlockchainStateCache(LogContext):
    """Manage cache of token and pair data.

    - Read data from the chain state

    - Store process in-memory for the duration of the session
    """

    def __init__(self, web3: Web3):
        self.web3 = web3
        self.token_cache: Dict[str, TokenDetails] = {}
        self.pair_cache: Dict[str, PairDetails] = {}

    def get_token_details(self, address: str) -> TokenDetails:
        if address not in self.token_cache:
            self.token_cache[address] = fetch_erc20_details(self.web3, address, raise_on_error=False)
        return self.token_cache[address]

    def get_pair_details(self, address: str) -> PairDetails:
        if address not in self.pair_cache:
            self.pair_cache[address] = fetch_pair_details(self.web3, address)
        return self.pair_cache[address]


def decode_swap(web3: Web3, cache: BlockchainStateCache, log: LogResult) -> dict:
    """Process swap event.

    This function does manually optimised high speed decoding of the event.

    The event signature is:

    .. code-block::

        event Swap(
          address indexed sender,
          uint amount0In,
          uint amount1In,
          uint amount0Out,
          uint amount1Out,
          address indexed to
        );
    """

    # Raw example event
    # {'address': '0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc', 'blockHash': '0x4ba33a650f9e3d8430f94b61a382e60490ec7a06c2f4441ecf225858ec748b78', 'blockNumber': '0x98b7f6', 'data': '0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000046ec814a2e900000000000000000000000000000000000000000000000000000000000003e80000000000000000000000000000000000000000000000000000000000000000', 'logIndex': '0x4', 'removed': False, 'topics': ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822', '0x000000000000000000000000f164fc0ec4e93095b804a4795bbe1e041497b92a', '0x0000000000000000000000008688a84fcfd84d8f78020d0fc0b35987cc58911f'], 'transactionHash': '0x932cb88306450d481a0e43365a3ed832625b68f036e9887684ef6da594891366', 'transactionIndex': '0x1', 'context': <__main__.TokenCache object at 0x104ab7e20>, 'event': <class 'web3._utils.datatypes.Swap'>, 'timestamp': 1588712972}

    block_time = datetime.datetime.utcfromtimestamp(log["timestamp"])

    pair_contract_address = log["address"]

    pair_details = cache.get_pair_details(Web3.toChecksumAddress(pair_contract_address))

    # Chop data blob to byte32 entries
    data_entries = decode_data(log["data"])

    amount0_in, amount1_in, amount0_out, amount1_out = data_entries

    data = {
        "block_number": convert_jsonrpc_value_to_int(log["blockNumber"]),
        "timestamp": block_time.isoformat(),
        "tx_hash": log["transactionHash"],
        "log_index": int(log["logIndex"], 16),
        "pair_contract_address": pair_contract_address,
        "amount0_in": convert_int256_bytes_to_int(amount0_in),
        "amount1_in": convert_int256_bytes_to_int(amount1_in),
        "amount0_out": convert_int256_bytes_to_int(amount0_out),
        "amount1_out": convert_int256_bytes_to_int(amount1_out),
        "pair_details": pair_details,
    }
    return data


def format_swap(swap: dict) -> str:
    """Write swap in human readable format.

    - Two simplified format for swaps

    - Complex output format for more complex swaps
    """

    pair: PairDetails = swap["pair_details"]
    token0 = pair.token0
    token1 = pair.token1
    tx_hash = swap["tx_hash"]
    block_number = swap["block_number"]

    if swap["amount0_in"] and not swap["amount1_in"]:
        token_in = token0
        token_out = token1
        amount_in = token0.convert_to_decimals(swap["amount0_in"])
        amount_out = token1.convert_to_decimals(swap["amount1_out"])
        return f"{block_number:,} {tx_hash} {amount_in} {token_in.symbol} -> {amount_out} {token_out.symbol}"
    elif swap["amount1_in"] and not swap["amount0_in"]:
        token_in = token1
        token_out = token0
        amount_in = token1.convert_to_decimals(swap["amount1_in"])
        amount_out = token0.convert_to_decimals(swap["amount0_out"])
        return f"{block_number:,} {tx_hash} {amount_in} {token_in.symbol} -> {amount_out} {token_out.symbol}"
    else:
        amount0_in = token0.convert_to_decimals(swap["amount0_in"])
        amount1_in = token1.convert_to_decimals(swap["amount1_in"])
        amount0_out = token0.convert_to_decimals(swap["amount0_out"])
        amount1_out = token1.convert_to_decimals(swap["amount1_out"])
        return f"{block_number:,} {tx_hash} {amount0_in} {token0.symbol}, {amount1_in} {token1.symbol} -> {amount0_out} {token0.symbol}, {amount1_out} {token1.symbol}"


def setup_logging():
    level = os.environ.get("LOG_LEVEL", "info").upper()

    fmt = "%(asctime)s %(name)-44s %(message)s"
    date_fmt = "%H:%M:%S"
    coloredlogs.install(level=level, fmt=fmt, date_fmt=date_fmt)

    logging.basicConfig(level=level, handlers=[logging.StreamHandler()])

    # Mute noise
    logging.getLogger("web3.providers.HTTPProvider").setLevel(logging.WARNING)
    logging.getLogger("web3.RequestManager").setLevel(logging.WARNING)
    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)


def main():

    setup_logging()

    # HTTP 1.1 keep-alive
    session = requests.Session()

    json_rpc_url = os.environ["JSON_RPC_URL"]
    web3 = Web3(HTTPProvider(json_rpc_url, session=session))

    # Enable faster ujson reads
    patch_web3(web3)

    web3.middleware_onion.clear()

    # Support Polygon
    install_chain_middleware(web3)

    # Get contracts
    Factory = get_contract(web3, "UniswapV2Factory.json")
    Pair = get_contract(web3, "UniswapV2Pair.json")

    events = [Factory.events.PairCreated, Pair.events.Swap]  # https://etherscan.io/txs?ea=0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f&topic0=0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9

    block_store = CSVDatasetBlockDataStore(Path("/tmp/uni-v2-last-block-state.csv"))

    reorg_mon = JSONRPCReorganisationMonitor(web3)
    if not block_store.is_virgin():
        block_header_df = block_store.load()
        reorg_mon.load_pandas(block_header_df)
        logger.info("Loaded %d existing blocks from %s", len(block_header_df), block_store.path)
    else:
        logger.info("Starting with fresh block header store at %s", block_store.path)

    # Block time can be between 3 seconds to 12 seconds depending on
    # the EVM chain
    block_time = measure_block_time(web3)

    token_cache = BlockchainStateCache(web3)

    total_reorgs = 0

    filter = prepare_filter(events)

    # Cache pair and token details read from the blockchain state
    cache = BlockchainStateCache(web3)

    stat_delay = 10
    next_stat_print = time.time() + stat_delay

    while True:

        try:

            # Figure out the next good unscanned block range,
            # and fetch block headers and timestamps for this block range
            chain_reorg_resolution = reorg_mon.update_chain()

            if chain_reorg_resolution.reorg_detected:
                logger.info(f"Chain reorganisation data updated: {chain_reorg_resolution}")

            # Read specified events in block range
            for log_result in read_events(
                web3,
                start_block=chain_reorg_resolution.latest_block_with_good_data + 1,
                end_block=chain_reorg_resolution.last_live_block,
                filter=filter,
                notify=None,
                chunk_size=100,
                context=token_cache,
                extract_timestamps=None,
                reorg_mon=reorg_mon,
            ):
                if log_result["event"].event_name == "PairCreated":
                    logger.info(f"New pair created: {log_result}")
                elif log_result["event"].event_name == "Swap":
                    swap = decode_swap(web3, cache, log_result)
                    swap_fmt = format_swap(swap)
                    logger.info("%s", swap_fmt)
                else:
                    raise NotImplementedError()

            # Dump stats to the output
            if time.time() > next_stat_print:
                logger.info("**STATS** Reorgs detected: %d, block headers buffered: %d, pairs cached: %d", total_reorgs, len(reorg_mon.block_map), len(cache.pair_cache))
                next_stat_print = time.time() + stat_delay

        except ChainReorganisationDetected as e:
            # Chain reorganisation was detected during reading the events.
            # reorg_mon.update_chain() will detect the fork and purge bad state
            total_reorgs += 1
            logger.warning("Chain reorg event raised: %s, we have now detected %d chain reorganisations.", e, total_reorgs)

        # Save the current block headers on disk
        df = reorg_mon.to_pandas()
        block_store.save(df)

        time.sleep(block_time)


if __name__ == "__main__":
    main()