ReorganisationMonitor
Documentation for eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor Python class.
- class ReorganisationMonitor[source]
Watch blockchain for reorgs.
Most EMV blockchains have several minor chain organisations per day, when your node switched from one chain tip to another, due to block propagation issues. Any application reading blockchain event data must be able to detect such reorganisations and purge incorrect data from their data feeds.
Abstract base class for different ways to support chain reorganisations
Maintain the state where our blockchain read cursor is, using
get_last_block_read()
Ingest and maintain the state of the last read blocks using
update_chain()
Check block headers for chain reorganisations when reading events from the chain using
check_block_reorg()
Manages the service for block timestamp lookups,
get_block_timestamp()
Save and load block header state to disk cache, because APIs are slow, using
load_pandas()
andto_pandas()
Example:
import os import time from web3 import HTTPProvider, Web3 from eth_defi.abi import get_contract from eth_defi.chain import install_chain_middleware from eth_defi.event_reader.filter import Filter from eth_defi.event_reader.reader import read_events, LogResult, from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor def main(): json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com") web3 = Web3(HTTPProvider(json_rpc_url)) web3.middleware_onion.clear() install_chain_middleware(web3) # Get contracts Pair = get_contract(web3, "sushi/UniswapV2Pair.json") filter = Filter.create_filter( address=None, # Listen events from any smart contract event_types=[Pair.events.Swap] ) reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3) reorg_mon.load_initial_block_headers(block_count=5) processed_events = set() latest_block = None # Keep reading events as they land while True: chain_reorg_resolution = reorg_mon.update_chain() start, end = chain_reorg_resolution.get_read_range() if chain_reorg_resolution.reorg_detected: print("Chain reorg warning") evt: LogResult for evt in read_events( web3, start_block=start, end_block=end, filter=filter, ): # How to uniquely identify EVM logs key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"] # The reader may cause duplicate events as the chain tip reorganises if key not in processed_events: print(f"Swap at block {evt['blockNumber']:,} tx: {evt['transactionHash']}") processed_events.add(key) if end != latest_block: print(f"Latest block is {end:,}") latest_block = end time.sleep(0.5) if __name__ == "__main__": main()
Attributes summary
How many blocks we replay from the blockchain to detect any chain organisations
Last block served by
update_chain()
in the duty cycleHow many times we try to re-read data from the blockchain in the case of reorganisation.
How long we allow our node to catch up in the case there has been a change in the chain tip.
Internal buffer of our block data
Methods summary
__init__
([block_map, last_block_read, ...])add_block
(record)Add new block to header tracking.
check_block_reorg
(block_number, block_hash)Check that newly read block matches our record.
fetch_block_data
(start_block, end_block)Read the new block headers.
figure_reorganisation_and_new_blocks
([max_range])Compare the local block database against the live data from chain.
get_block_by_number
(block_number)Get block header data for a specific block number from our memory buffer.
get_block_timestamp
(block_number)Return UNIX UTC timestamp of a block.
get_block_timestamp_as_pandas
(block_number)Return UNIX UTC timestamp of a block.
Get last block number
Get the number of the last block served by update_chain().
has_data
()Do we have any data available yet.
load_initial_block_headers
([block_count, ...])Get the initial block buffer filled up.
load_pandas
(df)Load block header data from Pandas data frame.
restore
(block_map)Restore the chain state from a saved data.
skip_to_block
(block_number)Skip scanning initial chain and directly start from a certain block.
to_pandas
([partition_size])Convert the data to Pandas DataFrame format for storing.
truncate
(latest_good_block)Delete data after a block number because chain reorg happened.
Update the internal memory buffer of block headers from the blockchain node.
- block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]
Internal buffer of our block data
Block number -> Block header data
- last_block_read: int = 0
Last block served by
update_chain()
in the duty cycle
- check_depth: int = 20
How many blocks we replay from the blockchain to detect any chain organisations
Done by
figure_reorganisation_and_new_blocks()
. Adjust this for your EVM chain.
- max_cycle_tries = 10
How many times we try to re-read data from the blockchain in the case of reorganisation.
If our node constantly feeds us changing data give up.
- reorg_wait_seconds = 5
How long we allow our node to catch up in the case there has been a change in the chain tip.
If our node constantly feeds us changing data give up.
- get_last_block_read()[source]
Get the number of the last block served by update_chain().
- Return type
- get_block_by_number(block_number)[source]
Get block header data for a specific block number from our memory buffer.
- Parameters
block_number (int) –
- Return type
- skip_to_block(block_number)[source]
Skip scanning initial chain and directly start from a certain block.
- Parameters
block_number (int) –
- load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)[source]
Get the initial block buffer filled up.
You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.
- Parameters
How many latest block to load
Give start_block or block_count.
What is the first block to read.
Give start_block or block_count.
tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar
save_callable (Optional[Callable]) –
Save after every block.
Called after every block.
TODO: Hack. Design a better interface.
- Returns
The initial block range to start to work with
- Return type
- add_block(record)[source]
Add new block to header tracking.
Blocks must be added in order.
- Parameters
- check_block_reorg(block_number, block_hash)[source]
Check that newly read block matches our record.
Called during the event reader
Event reader gets the block number and hash with the event
We have initial block_map in memory, previously buffered in
We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation
If we do not have records, ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- Parameters
- Return type
- truncate(latest_good_block)[source]
Delete data after a block number because chain reorg happened.
- Parameters
latest_good_block (int) – Delete all data starting after this block (exclusive)
- figure_reorganisation_and_new_blocks(max_range=1000000)[source]
Compare the local block database against the live data from chain.
Spot the differences in (block number, block header) tuples and determine a chain reorg.
- Parameters
Abort if we need to scan more than this amount of blocks.
This is because giving too long block range to scan is likely to take forever on non-graphql nodes.
Set None to ignore.
- Raises
ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.
- get_block_timestamp_as_pandas(block_number)[source]
Return UNIX UTC timestamp of a block.
- Parameters
block_number (int) –
- Return type
- update_chain()[source]
Update the internal memory buffer of block headers from the blockchain node.
Do several attempt to read data (as a fork can cause other forks can cause fork)
Give up after some time if we detect the chain to be in a doom loop
- Returns
What block range the consumer application should read.
What we think about the chain state.
- Return type
eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution
- to_pandas(partition_size=0)[source]
Convert the data to Pandas DataFrame format for storing.
- Parameters
partition_size (int) –
To partition the outgoing data.
Set 0 to ignore.
- Return type
pandas.core.frame.DataFrame
- load_pandas(df)[source]
Load block header data from Pandas data frame.
- Parameters
df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with
to_pandas()
.
- restore(block_map)[source]
Restore the chain state from a saved data.
- Parameters
block_map (dict) – Block number -> Block header dictionary