read_events_concurrent
Documentation for eth_defi.event_reader.reader.read_events_concurrent function.
- read_events_concurrent(executor, start_block, end_block, events=None, notify=None, chunk_size=100, context=None, extract_timestamps=<function extract_timestamps_json_rpc>, filter=None, reorg_mon=None)[source]
Reads multiple events from the blockchain parallel using a thread pool for IO.
Optimized to read multiple events fast.
Uses a thread worker pool for concurrency
Even though we receive data from JSON-RPC API in random order, the iterable results are always in the correct order (and processes in a single thread)
Returns events as a dict for optimal performance
Can resume scan
Supports interactive progress bar
Reads all the events matching signature - any filtering must be done by the reader
See scripts/read-uniswap-v2-pairs-and-swaps-concurrent.py for a full example.
Example:
json_rpc_url = os.environ["JSON_RPC_URL"] token_cache = TokenCache() threads = 16 http_adapter = requests.adapters.HTTPAdapter(pool_connections=threads, pool_maxsize=threads) web3_factory = TunedWeb3Factory(json_rpc_url, http_adapter) web3 = web3_factory(token_cache) executor = create_thread_pool_executor(web3_factory, context=token_cache, max_workers=threads) # Get contracts Factory = get_contract(web3, "UniswapV2Factory.json") events = [ Factory.events.PairCreated, ] start_block = 10_000_835 # Uni deployed end_block = 10_009_000 # The first pair created before this block # Read through the blog ran out = [] for log_result in read_events_concurrent( executor, start_block, end_block, events, None, chunk_size=100, context=token_cache, extract_timestamps=None, ): out.append(decode_pair_created(log_result))
- Parameters
executor (futureproof.executors.ThreadPoolExecutor) – Thread pool executor created with
eth_defi.event_reader.web3worker.create_thread_pool_executor()
events (Optional[List[web3.contract.contract.ContractEvent]]) – List of Web3.py contract event classes to scan for
notify (Optional[eth_defi.event_reader.reader.ProgressUpdate]) – Optional callback to be called before starting to scan each chunk
start_block (int) – First block to process (inclusive)
end_block (int) – Last block to process (inclusive)
extract_timestamps (Optional[Callable]) – Override for different block timestamp extraction methods
chunk_size (int) – How many blocks to scan in one eth_getLogs call
context (Optional[eth_defi.event_reader.logresult.LogContext]) – Passed to the all generated logs
filter (Optional[eth_defi.event_reader.filter.Filter]) – Pass a custom event filter for the readers
reorg_mon (Optional[eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor]) – If passed, use this instance to monitor and raise chain reorganisation exceptions.
- Returns
Iterate over
LogResult
instances for each event matched in the filter.- Return type