ReorganisationMonitor

Documentation for eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor Python class.

class ReorganisationMonitor[source]

Watch blockchain for reorgs.

Most EMV blockchains have several minor chain organisations per day, when your node switched from one chain tip to another, due to block propagation issues. Any application reading blockchain event data must be able to detect such reorganisations and purge incorrect data from their data feeds.

  • Abstract base class for different ways to support chain reorganisations

  • Maintain the state where our blockchain read cursor is, using get_last_block_read()

  • Ingest and maintain the state of the last read blocks using update_chain()

  • Check block headers for chain reorganisations when reading events from the chain using check_block_reorg()

  • Manages the service for block timestamp lookups, get_block_timestamp()

  • Save and load block header state to disk cache, because APIs are slow, using load_pandas() and to_pandas()

Example:

import os
import time

from web3 import HTTPProvider, Web3

from eth_defi.abi import get_contract
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.filter import Filter
from eth_defi.event_reader.reader import read_events, LogResult,
from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor


def main():

    json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com")
    web3 = Web3(HTTPProvider(json_rpc_url))
    web3.middleware_onion.clear()
    install_chain_middleware(web3)

    # Get contracts
    Pair = get_contract(web3, "sushi/UniswapV2Pair.json")

    filter = Filter.create_filter(
        address=None,  # Listen events from any smart contract
        event_types=[Pair.events.Swap]
    )

    reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3)

    reorg_mon.load_initial_block_headers(block_count=5)

    processed_events = set()

    latest_block = None

    # Keep reading events as they land
    while True:
        chain_reorg_resolution = reorg_mon.update_chain()
        start, end = chain_reorg_resolution.get_read_range()

        if chain_reorg_resolution.reorg_detected:
            print("Chain reorg warning")

        evt: LogResult
        for evt in read_events(
            web3,
            start_block=start,
            end_block=end,
            filter=filter,
        ):
            # How to uniquely identify EVM logs
            key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"]

            # The reader may cause duplicate events as the chain tip reorganises
            if key not in processed_events:
                print(f"Swap at block {evt['blockNumber']:,} tx: {evt['transactionHash']}")
                processed_events.add(key)

        if end != latest_block:
            print(f"Latest block is {end:,}")
            latest_block = end

        time.sleep(0.5)


if __name__ == "__main__":
    main()

Attributes summary

check_depth

How many blocks we replay from the blockchain to detect any chain organisations

last_block_read

Last block served by update_chain() in the duty cycle

max_cycle_tries

How many times we try to re-read data from the blockchain in the case of reorganisation.

reorg_wait_seconds

How long we allow our node to catch up in the case there has been a change in the chain tip.

block_map

Internal buffer of our block data

Methods summary

__init__([block_map, last_block_read, ...])

add_block(record)

Add new block to header tracking.

check_block_reorg(block_number, block_hash)

Check that newly read block matches our record.

fetch_block_data(start_block, end_block)

Read the new block headers.

figure_reorganisation_and_new_blocks([max_range])

Compare the local block database against the live data from chain.

get_block_by_number(block_number)

Get block header data for a specific block number from our memory buffer.

get_block_timestamp(block_number)

Return UNIX UTC timestamp of a block.

get_block_timestamp_as_pandas(block_number)

Return UNIX UTC timestamp of a block.

get_last_block_live()

Get last block number

get_last_block_read()

Get the number of the last block served by update_chain().

has_data()

Do we have any data available yet.

load_initial_block_headers([block_count, ...])

Get the initial block buffer filled up.

load_pandas(df)

Load block header data from Pandas data frame.

restore(block_map)

Restore the chain state from a saved data.

skip_to_block(block_number)

Skip scanning initial chain and directly start from a certain block.

to_pandas([partition_size])

Convert the data to Pandas DataFrame format for storing.

truncate(latest_good_block)

Delete data after a block number because chain reorg happened.

update_chain()

Update the internal memory buffer of block headers from the blockchain node.

block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]

Internal buffer of our block data

Block number -> Block header data

last_block_read: int = 0

Last block served by update_chain() in the duty cycle

check_depth: int = 20

How many blocks we replay from the blockchain to detect any chain organisations

Done by figure_reorganisation_and_new_blocks(). Adjust this for your EVM chain.

max_cycle_tries = 10

How many times we try to re-read data from the blockchain in the case of reorganisation.

If our node constantly feeds us changing data give up.

reorg_wait_seconds = 5

How long we allow our node to catch up in the case there has been a change in the chain tip.

If our node constantly feeds us changing data give up.

has_data()[source]

Do we have any data available yet.

Return type

bool

get_last_block_read()[source]

Get the number of the last block served by update_chain().

Return type

int

get_block_by_number(block_number)[source]

Get block header data for a specific block number from our memory buffer.

Parameters

block_number (int) –

Return type

eth_defi.event_reader.block_header.BlockHeader

skip_to_block(block_number)[source]

Skip scanning initial chain and directly start from a certain block.

Parameters

block_number (int) –

load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)[source]

Get the initial block buffer filled up.

You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.

Parameters
  • block_count (Optional[int]) –

    How many latest block to load

    Give start_block or block_count.

  • start_block (Optional[int]) –

    What is the first block to read.

    Give start_block or block_count.

  • tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar

  • save_callable (Optional[Callable]) –

    Save after every block.

    Called after every block.

    TODO: Hack. Design a better interface.

Returns

The initial block range to start to work with

Return type

Tuple[int, int]

add_block(record)[source]

Add new block to header tracking.

Blocks must be added in order.

Parameters

record (eth_defi.event_reader.block_header.BlockHeader) –

check_block_reorg(block_number, block_hash)[source]

Check that newly read block matches our record.

  • Called during the event reader

  • Event reader gets the block number and hash with the event

  • We have initial block_map in memory, previously buffered in

  • We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation

If we do not have records, ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

Parameters
  • block_number (int) –

  • block_hash (str) –

Return type

Optional[int]

truncate(latest_good_block)[source]

Delete data after a block number because chain reorg happened.

Parameters

latest_good_block (int) – Delete all data starting after this block (exclusive)

figure_reorganisation_and_new_blocks(max_range=1000000)[source]

Compare the local block database against the live data from chain.

Spot the differences in (block number, block header) tuples and determine a chain reorg.

Parameters

max_range (Optional[int]) –

Abort if we need to scan more than this amount of blocks.

This is because giving too long block range to scan is likely to take forever on non-graphql nodes.

Set None to ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

get_block_timestamp(block_number)[source]

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

int

get_block_timestamp_as_pandas(block_number)[source]

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

pandas._libs.tslibs.timestamps.Timestamp

update_chain()[source]

Update the internal memory buffer of block headers from the blockchain node.

  • Do several attempt to read data (as a fork can cause other forks can cause fork)

  • Give up after some time if we detect the chain to be in a doom loop

Returns

What block range the consumer application should read.

What we think about the chain state.

Return type

eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution

to_pandas(partition_size=0)[source]

Convert the data to Pandas DataFrame format for storing.

Parameters

partition_size (int) –

To partition the outgoing data.

Set 0 to ignore.

Return type

pandas.core.frame.DataFrame

load_pandas(df)[source]

Load block header data from Pandas data frame.

Parameters

df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with to_pandas().

restore(block_map)[source]

Restore the chain state from a saved data.

Parameters

block_map (dict) – Block number -> Block header dictionary

abstract fetch_block_data(start_block, end_block)[source]

Read the new block headers.

Parameters
  • start_block – The first block where to read (inclusive)

  • end_block – The block where to read (inclusive)

Return type

Iterable[eth_defi.event_reader.block_header.BlockHeader]

abstract get_last_block_live()[source]

Get last block number

Return type

int

__init__(block_map=<factory>, last_block_read=0, check_depth=20)
Parameters
Return type

None