event_reader.multicall_batcher
Documentation for eth_defi.event_reader.multicall_batcher Python module.
Multicall3 contract helpers.
Read Multicall3: Doing fast chain data reading with Web3.py for tutorial
Perform several smart contract calls in one RPC request using Multicall contract
Increase smart contract call throughput using Multicall smart contract
Further increase call throughput using multiprocessing with
joblib.ParallelDo fast historical reads several blocks with
read_multicall_historical()
For usage see read_multicall_chunked() and read_multicall_historical_stateful functions.
Warning
See Multicall private key leak hack warning.
Functions
|
Call a multicall contract. |
|
Call Multicall contract with a payload. |
Skip Multicall contract and try eth_call directly. |
|
|
Call a multicall contract. |
|
When the multicall contract was deployed for a chain. |
|
Return a multicall smart contract instance. |
Pin a |
|
|
Read current data using multiple processes in parallel for speedup. |
|
Read historical data using multiple threads in parallel for speedup. |
|
Read historical data using multicall with reading state and adaptive frequency filtering. |
Decide whether a failed HyperEVM multicall should fail over to a single node. |
Classes
Allow mutlicall calls to maintain state over the multiple invocations. |
|
Historical read result of multiple multicalls. |
|
Multicall payload, minified implementation. |
|
Result of an one multicall. |
|
Pickled task send between multicall reader loop and subprocesses. |
|
Wrap a call going through the Multicall contract. |
|
An instance created in a subprocess to do calls. |
Exceptions
Need to take a manual look these errors. |
|
Out of gas. |
|
TODO |
- class BatchCallState
Bases:
abc.ABCAllow mutlicall calls to maintain state over the multiple invocations.
Mostly useful for historical mutlticall read and frequency management
- abstract save()
Persist state across multiple runs.
- Returns
Pickleable Python object
- Return type
- abstract should_invoke(call, block_identifier, timestamp)
Check the condition if this multicall is good to go.
- Parameters
call (eth_defi.event_reader.multicall_batcher.EncodedCall) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
timestamp (datetime.datetime) –
- Return type
- class CombinedEncodedCallResult
Bases:
objectHistorical read result of multiple multicalls.
Return the whole block worth of calls when iterating over chain block by block.
- __init__(block_number, timestamp, results)
- Parameters
block_number (int) –
timestamp (datetime.datetime) –
results (list[eth_defi.event_reader.multicall_batcher.EncodedCallResult]) –
- Return type
None
- class EncodedCall
Bases:
objectMulticall payload, minified implementation.
Designed for multiprocessing and historical reads
Only carry encoded data, not ABI etc. metadata
Contain
extra_datawhich allows route to call results from several calls to one handler class
Example:
convert_to_shares_payload = eth_abi.encode(["uint256"], [share_probe_amount]) share_price_call = EncodedCall.from_keccak_signature( address=address, signature=Web3.keccak(text="convertToShares(uint256)")[0:4], function="convertToShares", data=convert_to_shares_payload, extra_data=None, )
- __init__(func_name, address, data, extra_data, first_block_number=None, call_id=<factory>, _hash=None)
- call(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=None, ignore_error=False, silent_error=False, attempts=3, retry_sleep=30.0)
Return raw results of the call.
Example how to read:
erc_7575_call = EncodedCall.from_keccak_signature( address=self.vault_address, signature=Web3.keccak(text="share()")[0:4], function="share", data=b"", extra_data=None, ) result = erc_7575_call.call(self.web3, block_identifier="latest") share_token_address = convert_uint256_bytes_to_address(result)
- Parameters
ignore_error – Set to True to inform middleware that it is normal for this call to fail and do not log it as a failed call, or retry it.
attempts (int) –
Use built-in retry mechanism for flaky RPC.
This works regardless of middleware installed. Set to zero to ignore.
Cannot be used with ignore_errors.
gas (int) –
Gas limit.
If not given, use 15M limit except for Mantle use 99M.
web3 (web3.main.Web3) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Returns
Raw call results as bytes
- Raises
ValueError – If the call reverts
- Return type
- call_as_result(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=15000000, ignore_error=False)
Perform RPC call and return the result as an
EncodedCallResult.Performs an RPC call and returns a wrapped result in an
EncodedCallResult.
See
call()for info.- Parameters
- Return type
- static from_contract_call(call, extra_data=None, first_block_number=None)
Create poller call from Web3.py Contract proxy object
- Parameters
- Return type
- static from_keccak_signature(address, function, signature, data, extra_data, first_block_number=None, ignore_errors=False, state=None)
Create poller call directly from a raw function signature
- Parameters
- Return type
- get_curl_info(block_number)
Get human-readable details for debugging.
Punch into Tenderly simulator
Data contains both function signature and data payload
- get_debug_info()
Get human-readable details for debugging.
Punch into Tenderly simulator
Data contains both function signature and data payload
- Return type
- transact(from_, gas_limit)
Build a transaction payload for this call.
Example:
gas_limit = 15_000_000 # function settleDeposit(uint256 _newTotalAssets) public virtual; call = EncodedCall.from_keccak_signature( address=vault.address, function="settleDeposit()", signature=Web3.keccak(text="settleDeposit(uint256)")[0:4], data=convert_uin256_to_bytes(raw_nav), extra_data=None, ) tx_data = call.transact( from_=asset_manager, gas_limit=gas_limit, ) tx_hash = web3.eth.send_transaction(tx_data) assert_transaction_success_with_explanation(web3, tx_hash)
- Parameters
from_ (eth_typing.evm.HexAddress) –
gas_limit (int) –
- Return type
- class EncodedCallResult
Bases:
objectResult of an one multicall.
Example:
# File 21 of 47 : PlasmaVaultStorageLib.sol # /// @custom:storage-location erc7201:io.ipor.PlasmaVaultPerformanceFeeData # struct PerformanceFeeData { # address feeManager; # uint16 feeInPercentage; # } data = call_by_name["getPerformanceFeeData"].result performance_fee = int.from_bytes(data[32:64], byteorder="big") / 10_000
- __init__(call, success, result, block_identifier, timestamp=None, revert_exception=None, state=None)
- Parameters
call (eth_defi.event_reader.multicall_batcher.EncodedCall) –
success (bool) –
result (bytes) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
timestamp (Optional[datetime.datetime]) –
state (Optional[eth_defi.event_reader.multicall_batcher.BatchCallState]) –
- Return type
None
- class MulticallHistoricalTask
Bases:
objectPickled task send between multicall reader loop and subprocesses.
Send a batch of calls to a specific block.
- __init__(chain_id, web3factory, block_number, calls, require_multicall_result=False, timestamp=None, task_id=<factory>)
- Parameters
chain_id (int) –
web3factory (eth_defi.event_reader.web3factory.Web3Factory) –
block_number (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
require_multicall_result (bool) –
timestamp (datetime.datetime) –
task_id (int) –
- Return type
None
- exception MulticallNonRetryable
Bases:
ExceptionNeed to take a manual look these errors.
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note(note, /)
Add a note to the exception
- with_traceback(tb, /)
Set self.__traceback__ to tb and return self.
- exception MulticallRetryable
Bases:
ExceptionOut of gas.
Broken contract in a gas loop
Try to decrease batch size.
- __init__(message, status_code=None, headers=None)
- __new__(**kwargs)
- add_note(note, /)
Add a note to the exception
- with_traceback(tb, /)
Set self.__traceback__ to tb and return self.
- exception MulticallStateProblem
Bases:
ExceptionTODO
- __init__(*args, **kwargs)
- __new__(**kwargs)
- add_note(note, /)
Add a note to the exception
- with_traceback(tb, /)
Set self.__traceback__ to tb and return self.
- class MulticallWrapper
Bases:
abc.ABCWrap a call going through the Multicall contract.
Each call in the batch is represented by one instance of
MulticallWrapperThis class must be subclassed and needed
get_key(),handle()and__repr__()
- __init__(call, debug)
- Parameters
call (web3.contract.contract.ContractFunction) –
debug (bool) –
- Return type
None
- abstract get_key()
Get key that will identify this call in the result dictionary
- Return type
- abstract handle(succeed, raw_return_value)
Parse the call result.
- multicall_callback(succeed, raw_return_value)
Convert the raw Solidity function call result to a denominated token amount.
Multicall library callback
- class MultiprocessMulticallReader
Bases:
objectAn instance created in a subprocess to do calls.
Specific to a chain (connection is married with a chain, otherwise stateless)
Initialises the web3 connection at the start of the process
If you try to read using multicall when the contract is not yet deployed (see
get_multicall_block_number()) then you get no results
Create subprocess worker instance.
- Parameters
web3factory – Initialise connection within the subprocess
batch_size –
How many calls we pack into the multicall.
Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.
- __init__(web3factory, batch_size=40, backswitch_threshold=100, too_many_requets_sleep=61.0)
Create subprocess worker instance.
- Parameters
web3factory (Union[eth_defi.event_reader.web3factory.Web3Factory, web3.main.Web3]) – Initialise connection within the subprocess
batch_size –
How many calls we pack into the multicall.
Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.
- call_multicall_with_batch_size(multicall_contract, block_identifier, batch_size, encoded_calls, require_multicall_result)
Communicate with Multicall3 contract.
Fail safes for ugly situations
- Parameters
multicall_contract (web3.contract.contract.Contract) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
batch_size (int) –
encoded_calls (list[tuple[eth_typing.evm.HexAddress, bytes]]) –
require_multicall_result (bool) –
- Return type
- get_batch_size(web3, chain_id, block_identifier)
Fix non-standard out of gas issues.
TODO: Move these rules to their own module.
- get_gas_hint(chain_id, batch_calls)
Fix non-standard out of gas issues
- process_calls(block_identifier, calls, require_multicall_result=False, timestamp=None, min_fallback_retries=5)
Work a chunk of calls in the subprocess.
Divide unlimited number of calls to something we think Multicall3 and RPC node can handle
If a single batch fail
- Parameters
require_multicall_result – Headache debug flag.
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) – Block number
timestamp (Optional[datetime.datetime]) – Block timestamp
min_fallback_retries – Bang all RPCs at least this many times when attempting to make progress.
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]
- call_multicall(multicall_contract, calls, block_identifier)
Call a multicall contract.
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- call_multicall_batched_single_thread(multicall_contract, calls, block_identifier, batch_size=15)
Call Multicall contract with a payload.
Single threaded
- Parameters
web3_factory –
Each thread will get its own web3 instance
batch_size – Don’t do more than this calls per one RPC.
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- call_multicall_debug_single_thread(multicall_contract, calls, block_identifier)
Skip Multicall contract and try eth_call directly.
For debugging problems
Perform normal eth_call
Log output what calls are going out to diagnose issues
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- call_multicall_encoded(multicall_contract, calls, block_identifier)
Call a multicall contract.
- Parameters
multicall_contract (web3.contract.contract.Contract) –
calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
- Return type
- get_multicall_block_number(chain_id)
When the multicall contract was deployed for a chain.
- get_multicall_contract(web3, address=None, block_identifier=None)
Return a multicall smart contract instance.
Get IMulticall3 compiled with Forge
Use multicall3 ABI.
- Parameters
web3 (web3.main.Web3) –
address (Optional[Union[eth_typing.evm.HexAddress, str]]) –
block_identifier (eth_typing.evm.BlockNumber) –
- Return type
web3.contract.contract.Contract
- pin_fallback_provider_by_host(fallback_provider, host_substring)
Pin a
FallbackProviderto the provider whose host matches a substring.Unlike
FallbackProvider.switch_provider()(which cycles or randomises), this deterministically selects a specific upstream — used to force HyperEVM multicall retries onto the Alchemy single node, bypassing goldsky’s eRPC consensus endpoint. Seedocs/README-hyperevm-goldsky-failure.md.The switch goes through
FallbackProvider.switch_to_provider_index(), so the pinned provider is chain-id verified and rolled back if it is misconfigured or routing to the wrong chain — we never silently read from a bad endpoint.- Parameters
fallback_provider (eth_defi.provider.fallback.FallbackProvider) – The fallback provider to repoint.
host_substring (str) – Lower-case substring matched against
get_provider_name()output.
- Returns
Trueif a matching provider was found and successfully selected,Falseif no provider matched or the match failed chain-id verification (in which case the caller should resume normal provider switching).- Return type
- read_multicall_chunked(chain_id, web3factory, calls, block_identifier, max_workers=8, timeout=1800, chunk_size=40, progress_bar_desc=None, timestamped_results=True, backend='loky')
Read current data using multiple processes in parallel for speedup.
All calls hit the same block number
Show a progress bar using
tqdm
Example:
# Generated packed multicall for each token contract we want to query balance_of_signature = Web3.keccak(text="balanceOf(address)")[0:4] def _gen_calls(addresses: Iterable[str]) -> Iterable[EncodedCall]: for _token_address in addresses: yield EncodedCall.from_keccak_signature( address=_token_address.lower(), signature=balance_of_signature, data=convert_address_to_bytes32(out_address), extra_data={}, ignore_errors=True, function="balanceOf", ) web3factory = MultiProviderWeb3Factory(web3.provider.endpoint_uri, hint="fetch_erc20_balances_multicall") # Execute calls for all token balance reads at a specific block. # read_multicall_chunked() will automatically split calls to multiple chunks # if we are querying too many. results = read_multicall_chunked( chain_id=chain_id, web3factory=web3factory, calls=list(_gen_calls(tokens)), block_identifier=block_identifier, max_workers=max_workers, timestamped_results=False, ) results = list(results) addr_to_balance = LowercaseDict() for result in results: token_address = result.call.address if not result.result: if raise_on_error: raise BalanceFetchFailed(f"Could not read token balance for ERC-20: {token_address} for address {out_address}") value = None else: raw_value = convert_int256_bytes_to_int(result.result) if decimalise: token = fetch_erc20_details(web3, token_address, cache=token_cache, chain_id=chain_id) value = token.convert_to_decimals(raw_value) else: value = raw_value addr_to_balance[token_address] = value
- Parameters
chain_id (int) – Which EVM chain we are targeting with calls.
web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses
calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) – List of calls to perform against Multicall3.
chunk_size (int) – Max calls per one chunk sent to Multicall contract, to stay below JSON-RPC read gas limit.
max_workers – How many parallel processes to use.
timeout – Joblib timeout to wait for a result from an individual task.
block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –
Block number to read.
Can be a block number or “latest” or “earliest”
progress_bar_desc (Optional[str]) – If set, display a TQDM progress bar for the process.
timestamped_results –
Need timestamp of the block number in each result.
Causes very slow eth_getBlock call, use only if needed.
backend –
Joblib backend to use.
Either “loky” or “threading”.
- Returns
Iterable of results.
One entry per each call.
Calls may be different order than originally given.
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]
- read_multicall_historical(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))
Read historical data using multiple threads in parallel for speedup.
Run over period of time (blocks)
Use multicall to harvest data from a single block at a time
Show a progress bar using
tqdm
- Parameters
chain_id (int) – Which chain we are targeting with calls.
web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses
start_block (int) – Block range to scoop
end_block (int) – Block range to scoop
step (int) – How many blocks we iterate at once
timeout – Joblib timeout to wait for a result from an individual task
progress_suffix (Optional[Callable]) – Allow caller to decorate the progress bar
require_multicall_result – Debug parameter to crash the reader if we start to get invalid replies from Multicall3 contract.
display_progress (Union[bool, str]) –
Whether to display progress bar or not.
Set to string to have a progress bar label.
hypersync_client (HypersyncClient | None) – Not used in this reader
calls (Iterable[eth_defi.event_reader.multicall_batcher.EncodedCall]) –
timestamp_cache_file (pathlib.Path) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]
- read_multicall_historical_stateful(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, chunk_size=48, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))
Read historical data using multicall with reading state and adaptive frequency filtering.
Allow adaptive frequency with read state
Slower loop than the dumb
read_multicall_historical()as it has to maintain stateBecause of state, we need to do block by block reading, as we need to evaluate state to see which calls are needed for which block, and the state depends on the result of the previous blocks
- Parameters
chunk_size –
We guarantee to update the reader state at least this many steps.
24 = 24h hours per day, assuming we update state once for every day data read.
Between chunks we blindly push data to subprocesses for speedup, do not attempt to hear back from the multiprocess to update the state.
chain_id (int) –
web3factory (eth_defi.event_reader.web3factory.Web3Factory) –
calls (dict[eth_defi.event_reader.multicall_batcher.EncodedCall, eth_defi.event_reader.multicall_batcher.BatchCallState]) –
start_block (int) –
end_block (int) –
step (int) –
hypersync_client (HypersyncClient | None) –
timestamp_cache_file (pathlib.Path) –
- Return type
Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]
- resolve_hyperevm_consensus_failover(chain_id, provider, exception)
Decide whether a failed HyperEVM multicall should fail over to a single node.
On HyperEVM (chain 999) the scan’s primary provider is goldsky’s eRPC endpoint running in consensus mode: it fans each
eth_callto several upstream nodes and only returns a result when enough of them agree byte-for-byte. For some vaults the upstreams intermittently disagree and eRPC returnsERPC_CONSENSUS_DISAGREEMENT_CLUE. Retrying or randomly cycling back onto the same consensus endpoint is futile; a single (non-consensus) node such as Alchemy returns a usable answer immediately.This helper detects that exact situation — HyperEVM chain id, a consensus disagreement error, and a provider mix that contains both a goldsky and an Alchemy endpoint — and returns the provider host substring to pin retries to.
See
docs/README-hyperevm-goldsky-failure.mdfor the full failure analysis, the nodes involved, and the on-chain evidence.- Parameters
chain_id (int) – Chain id of the multicall being retried.
provider (Any) – The active web3 provider. Only
FallbackProvidermixes are eligible (we need an alternative single node to fail over to).exception (Exception) – The
MulticallRetryable(or its cause) raised by the failed call.
- Returns
Lower-case provider host substring (
"alchemy") to pin retries to, orNoneif this is not the HyperEVM goldsky consensus failure mode.- Return type