read_events_concurrent

Documentation for eth_defi.event_reader.reader.read_events_concurrent function.

read_events_concurrent(executor, start_block, end_block, events, notify, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None)[source]

Reads multiple events from the blockchain parallel using a thread pool for IO.

Optimized to read multiple events fast.

  • Uses a thread worker pool for concurrency

  • Even though we receive data from JSON-RPC API in random order, the iterable results are always in the correct order (and processes in a single thread)

  • Returns events as a dict for optimal performance

  • Can resume scan

  • Supports interactive progress bar

  • Reads all the events matching signature - any filtering must be done by the reader

See scripts/read-uniswap-v2-pairs-and-swaps-concurrent.py for a full example.

Example:

json_rpc_url = os.environ["JSON_RPC_URL"]
token_cache = TokenCache()
threads = 16
http_adapter = requests.adapters.HTTPAdapter(pool_connections=threads, pool_maxsize=threads)
web3_factory = TunedWeb3Factory(json_rpc_url, http_adapter)
web3 = web3_factory(token_cache)
executor = create_thread_pool_executor(web3_factory, context=token_cache, max_workers=threads)

# Get contracts
Factory = get_contract(web3, "UniswapV2Factory.json")

events = [
    Factory.events.PairCreated,
]

start_block = 10_000_835  # Uni deployed
end_block = 10_009_000  # The first pair created before this block

# Read through the blog ran
out = []
for log_result in read_events_concurrent(
    executor,
    start_block,
    end_block,
    events,
    None,
    chunk_size=100,
    context=token_cache,
    extract_timestamps=None,
):
    out.append(decode_pair_created(log_result))
Parameters
  • executor (futureproof.executors.ThreadPoolExecutor) – Thread pool executor created with eth_defi.event_reader.web3worker.create_thread_pool_executor()

  • events (List[web3.contract.ContractEvent]) – List of Web3.py contract event classes to scan for

  • notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Optional callback to be called before starting to scan each chunk

  • start_block (int) – First block to process (inclusive)

  • end_block (int) – Last block to process (inclusive)

  • extract_timestamps (Optional[Callable]) – Override for different block timestamp extraction methods

  • chunk_size (int) – How many blocks to scan in one eth_getLogs call

  • context (Optional[eth_defi.event_reader.logresult.LogContext]) – Passed to the all generated logs

  • filter (Optional[eth_defi.event_reader.filter.Filter]) – Pass a custom event filter for the readers

Return type

Iterable[eth_defi.event_reader.logresult.LogResult]