MultithreadEventReader
Documentation for eth_defi.event_reader.multithread.MultithreadEventReader Python class.
- class MultithreadEventReader[source]
Multithreaded and parallel Solidity event reading.
A high performance event reader for JSON-RPC APIs
Uses parallel requests, but the consumer will always get the events in the order they have happened on-chain
This class wraps around lower level event reading functions to simpler to use and well-documented example.
It is designed to be used standalone (you read the events)
It is designed to be passed to the functions that expect
Web3EventReader
protocol
See Solidity event high-speed multithread reading for full tutorial.
Example how to read events:
# Get one of the contracts prepackaged ABIs from eth_defi package value_interpreter_contract = get_contract(web3, "enzyme/ValueInterpreter.json") # Read events only for this contract # See https://docs.enzyme.finance/developers/contracts/polygon target_contract_address = "0x66De7e286Aae66f7f3Daf693c22d16EEa48a0f45" # Create eth_getLogs event filtering filter = Filter.create_filter( target_contract_address, [value_interpreter_contract.events.PrimitiveAdded], ) # Set up multithreaded Polygon event reader. # Print progress to the console how many blocks there are left to read. reader = MultithreadEventReader( json_rpc_url, max_threads=16, notify=PrintProgressUpdate(), max_blocks_once=10_000) # Loop over the events as the multihreaded reader pool is feeding them to us. # Events will always arrive in the order they happened on chain. decoded_events = [] start = datetime.datetime.utcnow() for event in reader( web3, start_block, end_block, filter=filter, ): # Decode the solidity event # # Indexed event parameters go to EVM topics, the second element is the first parameter # Non-indexed event parameters go to EVM arguments, first element is the first parameter arguments = decode_data(event["data"]) topics = event["topics"] # event PrimitiveAdded( # address indexed primitive, # address aggregator, # RateAsset rateAsset, # uint256 unit # ); primitive = convert_uint256_bytes_to_address(HexBytes(topics[1])) aggregator = convert_uint256_bytes_to_address(arguments[0]) rate_asset = convert_int256_bytes_to_int(arguments[1]) unit = convert_int256_bytes_to_int(arguments[2]) # Primitive is a ERC-20 token, resolve its name and symbol while we are decoded the events token = fetch_erc20_details(web3, primitive) decoded = { "primitive": primitive, "aggregator": aggregator, "rate_asset": rate_asset, "unit": unit, "token": token, }
Example how to pass the multithread reader to a function consuming events:
provider = cast(HTTPProvider, web3.provider) json_rpc_url = provider.endpoint_uri reader = MultithreadEventReader(json_rpc_url, max_threads=16) start_block = 1 end_block = web3.eth.block_number # Iterate over all price feed added events # over the whole blockchain range feed_iter = fetch_price_feeds( deployment, start_block, end_block, reader, ) feeds = list(feed_iter) reader.close() assert len(feeds) == 2
Methods summary
__init__
(json_rpc_url[, max_threads, ...])Creates a multithreaded JSON-RPC reader pool.
close
()Release the allocated resources.
Sum API call counts across all threads.
- __init__(json_rpc_url, max_threads=10, reader_context=None, api_counter=True, max_blocks_once=50000, reorg_mon=None, notify=None, auto_close_notify=True)[source]
Creates a multithreaded JSON-RPC reader pool.
Can be passed to any function that expects
Web3EventReader
protocol.Sets up requests session pool for max_threads threads
Sets up futureproof thread pool executors for max_threads threads
- Parameters
json_rpc_url (str) – Your node URL
max_threads – How many threads to allocate
reader_context (Optional[Any]) – Passed to the reader callback
api_counter – Enable cross-thread API counter
notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Notify interface for progress reports.
max_blocks_once –
How many blocks your node’s eth_getLogs call can serve.
Crappy node providers set this value very low, around 1000, slowing down the reading.
reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) –
Chain reorganisation monitor.
The policy class for dealing with chain tip changes during the read or between event reads.
If you do not want block hashes and timestamps for the events, or you do not want to check for potential reorganisations, you can set this to None.
auto_close_notify –
Close the notifier after the operation is done.
Assume notifier object has close() method.
- get_total_api_call_counts()[source]
Sum API call counts across all threads.
See
eth_defi.chain.install_api_call_counter_middleware()
for details.- Return type