MultithreadEventReader

Documentation for eth_defi.event_reader.multithread.MultithreadEventReader Python class.

class MultithreadEventReader[source]

Multithreaded and parallel Solidity event reading.

  • A high performance event reader for JSON-RPC APIs

  • Uses parallel requests, but the consumer will always get the events in the order they have happened on-chain

  • This class wraps around lower level event reading functions to simpler to use and well-documented example.

  • It is designed to be used standalone (you read the events)

  • It is designed to be passed to the functions that expect Web3EventReader protocol

See Solidity event high-speed multithread reading for full tutorial.

Example how to read events:

# Get one of the contracts prepackaged ABIs from eth_defi package
value_interpreter_contract = get_contract(web3, "enzyme/ValueInterpreter.json")

# Read events only for this contract
# See https://docs.enzyme.finance/developers/contracts/polygon
target_contract_address = "0x66De7e286Aae66f7f3Daf693c22d16EEa48a0f45"

# Create eth_getLogs event filtering
filter = Filter.create_filter(
    target_contract_address,
    [value_interpreter_contract.events.PrimitiveAdded],
)

# Set up multithreaded Polygon event reader.
# Print progress to the console how many blocks there are left to read.
reader = MultithreadEventReader(
    json_rpc_url,
    max_threads=16,
    notify=PrintProgressUpdate(),
    max_blocks_once=10_000)

# Loop over the events as the multihreaded reader pool is feeding them to us.
# Events will always arrive in the order they happened on chain.
decoded_events = []
start = datetime.datetime.utcnow()
for event in reader(
    web3,
    start_block,
    end_block,
    filter=filter,
):
    # Decode the solidity event
    #
    # Indexed event parameters go to EVM topics, the second element is the first parameter
    # Non-indexed event parameters go to EVM arguments, first element is the first parameter
    arguments = decode_data(event["data"])
    topics = event["topics"]

    # event PrimitiveAdded(
    #     address indexed primitive,
    #     address aggregator,
    #     RateAsset rateAsset,
    #     uint256 unit
    # );
    primitive = convert_uint256_bytes_to_address(HexBytes(topics[1]))
    aggregator = convert_uint256_bytes_to_address(arguments[0])
    rate_asset = convert_int256_bytes_to_int(arguments[1])
    unit = convert_int256_bytes_to_int(arguments[2])

    # Primitive is a ERC-20 token, resolve its name and symbol while we are decoded the events
    token = fetch_erc20_details(web3, primitive)

    decoded = {
        "primitive": primitive,
        "aggregator": aggregator,
        "rate_asset": rate_asset,
        "unit": unit,
        "token": token,
    }

Example how to pass the multithread reader to a function consuming events:

provider = cast(HTTPProvider, web3.provider)
json_rpc_url = provider.endpoint_uri
reader = MultithreadEventReader(json_rpc_url, max_threads=16)

start_block = 1
end_block = web3.eth.block_number

# Iterate over all price feed added events
# over the whole blockchain range
feed_iter = fetch_price_feeds(
    deployment,
    start_block,
    end_block,
    reader,
)
feeds = list(feed_iter)
reader.close()
assert len(feeds) == 2

Because Ethereum does not have native JSON-RPC API to get block timestamps and headers easily, there are many work arounds how to get timestamps for events. Here is an example how to fetch timestamps “lazily” only for blocks where you have events:

from eth_defi.event_reader.lazy_timestamp_reader import extract_timestamps_json_rpc_lazy

provider = cast(HTTPProvider, web3.provider)
json_rpc_url = provider.endpoint_uri
reader = MultithreadEventReader(json_rpc_url, max_threads=16)

start_block = 1
end_block = web3.eth.block_number

reader = MultithreadEventReader(
    json_rpc_url,
)

for log_result in reader(
    web3,
    restored_start_block,
    end_block,
    filter=filter,
    extract_timestamps=extract_timestamps_json_rpc_lazy,
):
    pass

See eth_defi.event_reader.lazy_timestamp_reader.extract_timestamps_json_rpc_lazy() and eth_defi.uniswap_v3.events.fetch_events_to_csv() for more details.

Methods summary

__init__(json_rpc_url[, max_threads, ...])

Creates a multithreaded JSON-RPC reader pool.

close()

Release the allocated resources.

get_max_threads()

get_total_api_call_counts()

Sum API call counts across all threads.

__init__(json_rpc_url, max_threads=10, reader_context=None, api_counter=True, max_blocks_once=50000, reorg_mon=None, notify=None, auto_close_notify=True)[source]

Creates a multithreaded JSON-RPC reader pool.

Can be passed to any function that expects Web3EventReader protocol.

  • Sets up requests session pool for max_threads threads

  • Sets up futureproof thread pool executors for max_threads threads

Parameters
  • json_rpc_url (str) – Your node URL

  • max_threads – How many threads to allocate

  • reader_context (Optional[Any]) – Passed to the reader callback

  • api_counter – Enable cross-thread API counter

  • notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Notify interface for progress reports.

  • max_blocks_once

    How many blocks your node’s eth_getLogs call can serve.

    Crappy node providers set this value very low, around 1000, slowing down the reading.

  • reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) –

    Chain reorganisation monitor.

    The policy class for dealing with chain tip changes during the read or between event reads.

    If you do not want block hashes and timestamps for the events, or you do not want to check for potential reorganisations, you can set this to None.

  • auto_close_notify

    Close the notifier after the operation is done.

    Assume notifier object has close() method.

close()[source]

Release the allocated resources.

get_total_api_call_counts()[source]

Sum API call counts across all threads.

See eth_defi.chain.install_api_call_counter_middleware() for details.

Return type

Counter