read_events_concurrent

Documentation for eth_defi.event_reader.reader.read_events_concurrent function.

read_events_concurrent(executor, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)[source]

Reads multiple events from the blockchain parallel using a thread pool for IO.

Optimized to read multiple events fast.

  • Uses a thread worker pool for concurrency

  • Even though we receive data from JSON-RPC API in random order, the iterable results are always in the correct order (and processes in a single thread)

  • Returns events as a dict for optimal performance

  • Can resume scan

  • Supports interactive progress bar

  • Reads all the events matching signature - any filtering must be done by the reader

See scripts/read-uniswap-v2-pairs-and-swaps-concurrent.py for a full example.

Example:

json_rpc_url = os.environ["JSON_RPC_URL"]
token_cache = TokenCache()
threads = 16
http_adapter = requests.adapters.HTTPAdapter(pool_connections=threads, pool_maxsize=threads)
web3_factory = TunedWeb3Factory(json_rpc_url, http_adapter)
web3 = web3_factory(token_cache)
executor = create_thread_pool_executor(web3_factory, context=token_cache, max_workers=threads)

# Get contracts
Factory = get_contract(web3, "UniswapV2Factory.json")

events = [
    Factory.events.PairCreated,
]

start_block = 10_000_835  # Uni deployed
end_block = 10_009_000  # The first pair created before this block

# Read through the blog ran
out = []
for log_result in read_events_concurrent(
    executor,
    start_block,
    end_block,
    events,
    None,
    chunk_size=100,
    context=token_cache,
    extract_timestamps=None,
):
    out.append(decode_pair_created(log_result))
Parameters
Returns

Iterate over LogResult instances for each event matched in the filter.

Return type

Iterable[eth_defi.event_reader.logresult.LogResult]