event_reader.multicall_batcher

Documentation for eth_defi.event_reader.multicall_batcher Python module.

Multicall3 contract helpers.

For usage see read_multicall_chunked() and read_multicall_historical_stateful functions.

Warning

See Multicall private key leak hack warning.

Functions

call_multicall(multicall_contract, calls, ...)

Call a multicall contract.

call_multicall_batched_single_thread(...[, ...])

Call Multicall contract with a payload.

call_multicall_debug_single_thread(...)

Skip Multicall contract and try eth_call directly.

call_multicall_encoded(multicall_contract, ...)

Call a multicall contract.

get_multicall_block_number(chain_id)

When the multicall contract was deployed for a chain.

get_multicall_contract(web3[, address, ...])

Return a multicall smart contract instance.

pin_fallback_provider_by_host(...)

Pin a FallbackProvider to the provider whose host matches a substring.

read_multicall_chunked(chain_id, ...[, ...])

Read current data using multiple processes in parallel for speedup.

read_multicall_historical(chain_id, ...[, ...])

Read historical data using multiple threads in parallel for speedup.

read_multicall_historical_stateful(chain_id, ...)

Read historical data using multicall with reading state and adaptive frequency filtering.

resolve_hyperevm_consensus_failover(...)

Decide whether a failed HyperEVM multicall should fail over to a single node.

Classes

BatchCallState

Allow mutlicall calls to maintain state over the multiple invocations.

CombinedEncodedCallResult

Historical read result of multiple multicalls.

EncodedCall

Multicall payload, minified implementation.

EncodedCallResult

Result of an one multicall.

MulticallHistoricalTask

Pickled task send between multicall reader loop and subprocesses.

MulticallWrapper

Wrap a call going through the Multicall contract.

MultiprocessMulticallReader

An instance created in a subprocess to do calls.

Exceptions

MulticallNonRetryable

Need to take a manual look these errors.

MulticallRetryable

Out of gas.

MulticallStateProblem

TODO

class BatchCallState

Bases: abc.ABC

Allow mutlicall calls to maintain state over the multiple invocations.

  • Mostly useful for historical mutlticall read and frequency management

abstract load(data)

Persist state across multiple runs

Parameters

data (dict) –

abstract save()

Persist state across multiple runs.

Returns

Pickleable Python object

Return type

dict

abstract should_invoke(call, block_identifier, timestamp)

Check the condition if this multicall is good to go.

Parameters
Return type

bool

class CombinedEncodedCallResult

Bases: object

Historical read result of multiple multicalls.

Return the whole block worth of calls when iterating over chain block by block.

__init__(block_number, timestamp, results)
Parameters
Return type

None

class EncodedCall

Bases: object

Multicall payload, minified implementation.

  • Designed for multiprocessing and historical reads

  • Only carry encoded data, not ABI etc. metadata

  • Contain extra_data which allows route to call results from several calls to one handler class

Example:

convert_to_shares_payload = eth_abi.encode(["uint256"], [share_probe_amount])

share_price_call = EncodedCall.from_keccak_signature(
    address=address,
    signature=Web3.keccak(text="convertToShares(uint256)")[0:4],
    function="convertToShares",
    data=convert_to_shares_payload,
    extra_data=None,
)
__init__(func_name, address, data, extra_data, first_block_number=None, call_id=<factory>, _hash=None)
Parameters
Return type

None

call(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=None, ignore_error=False, silent_error=False, attempts=3, retry_sleep=30.0)

Return raw results of the call.

Example how to read:

erc_7575_call = EncodedCall.from_keccak_signature(
    address=self.vault_address,
    signature=Web3.keccak(text="share()")[0:4],
    function="share",
    data=b"",
    extra_data=None,
)

result = erc_7575_call.call(self.web3, block_identifier="latest")
share_token_address = convert_uint256_bytes_to_address(result)
Parameters
  • ignore_error – Set to True to inform middleware that it is normal for this call to fail and do not log it as a failed call, or retry it.

  • attempts (int) –

    Use built-in retry mechanism for flaky RPC.

    This works regardless of middleware installed. Set to zero to ignore.

    Cannot be used with ignore_errors.

  • gas (int) –

    Gas limit.

    If not given, use 15M limit except for Mantle use 99M.

  • web3 (web3.main.Web3) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Returns

Raw call results as bytes

Raises

ValueError – If the call reverts

Return type

bytes

call_as_result(web3, block_identifier, from_='0x0000000000000000000000000000000000000000', gas=15000000, ignore_error=False)

Perform RPC call and return the result as an EncodedCallResult.

See call() for info.

Parameters
  • gas_limit

    eth_call RPC gas limit.

    Set to 15M by default, assume to be safe on every chain.

  • web3 (web3.main.Web3) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

eth_defi.event_reader.multicall_batcher.EncodedCallResult

static from_contract_call(call, extra_data=None, first_block_number=None)

Create poller call from Web3.py Contract proxy object

Parameters
  • call (web3.contract.contract.ContractFunction) –

  • extra_data (Optional[dict]) –

  • first_block_number (Optional[int]) –

Return type

eth_defi.event_reader.multicall_batcher.EncodedCall

static from_keccak_signature(address, function, signature, data, extra_data, first_block_number=None, ignore_errors=False, state=None)

Create poller call directly from a raw function signature

Parameters
Return type

eth_defi.event_reader.multicall_batcher.EncodedCall

get_curl_info(block_number)

Get human-readable details for debugging.

  • Punch into Tenderly simulator

  • Data contains both function signature and data payload

Parameters

block_number (int) –

Return type

str

get_debug_info()

Get human-readable details for debugging.

  • Punch into Tenderly simulator

  • Data contains both function signature and data payload

Return type

str

transact(from_, gas_limit)

Build a transaction payload for this call.

Example:

gas_limit = 15_000_000

# function settleDeposit(uint256 _newTotalAssets) public virtual;
call = EncodedCall.from_keccak_signature(
    address=vault.address,
    function="settleDeposit()",
    signature=Web3.keccak(text="settleDeposit(uint256)")[0:4],
    data=convert_uin256_to_bytes(raw_nav),
    extra_data=None,
)
tx_data = call.transact(
    from_=asset_manager,
    gas_limit=gas_limit,
)
tx_hash = web3.eth.send_transaction(tx_data)
assert_transaction_success_with_explanation(web3, tx_hash)
Parameters
Return type

dict

class EncodedCallResult

Bases: object

Result of an one multicall.

Example:

# File 21 of 47 : PlasmaVaultStorageLib.sol
#     /// @custom:storage-location erc7201:io.ipor.PlasmaVaultPerformanceFeeData
#     struct PerformanceFeeData {
#         address feeManager;
#         uint16 feeInPercentage;
#     }
data = call_by_name["getPerformanceFeeData"].result
performance_fee = int.from_bytes(data[32:64], byteorder="big") / 10_000
__init__(call, success, result, block_identifier, timestamp=None, revert_exception=None, state=None)
Parameters
Return type

None

class MulticallHistoricalTask

Bases: object

Pickled task send between multicall reader loop and subprocesses.

Send a batch of calls to a specific block.

__init__(chain_id, web3factory, block_number, calls, require_multicall_result=False, timestamp=None, task_id=<factory>)
Parameters
Return type

None

exception MulticallNonRetryable

Bases: Exception

Need to take a manual look these errors.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note(note, /)

Add a note to the exception

with_traceback(tb, /)

Set self.__traceback__ to tb and return self.

exception MulticallRetryable

Bases: Exception

Out of gas.

  • Broken contract in a gas loop

Try to decrease batch size.

__init__(message, status_code=None, headers=None)
Parameters
__new__(**kwargs)
add_note(note, /)

Add a note to the exception

with_traceback(tb, /)

Set self.__traceback__ to tb and return self.

exception MulticallStateProblem

Bases: Exception

TODO

__init__(*args, **kwargs)
__new__(**kwargs)
add_note(note, /)

Add a note to the exception

with_traceback(tb, /)

Set self.__traceback__ to tb and return self.

class MulticallWrapper

Bases: abc.ABC

Wrap a call going through the Multicall contract.

__init__(call, debug)
Parameters
  • call (web3.contract.contract.ContractFunction) –

  • debug (bool) –

Return type

None

get_human_args()

Get Solidity args as human readable string for debugging.

Return type

str

abstract get_key()

Get key that will identify this call in the result dictionary

Return type

Hashable

abstract handle(succeed, raw_return_value)

Parse the call result.

Parameters
  • succeed (bool) – Did we revert or not

  • raw_return_value (bytes) – Undecoded bytes from the Solidity function call

Returns

The value placed in the return dict

Return type

Any

multicall_callback(succeed, raw_return_value)

Convert the raw Solidity function call result to a denominated token amount.

  • Multicall library callback

Returns

The token amount in the reserve currency we get on the market sell.

None if this path was not supported (Solidity reverted).

Parameters
  • succeed (bool) –

  • raw_return_value (Any) –

Return type

Any

class MultiprocessMulticallReader

Bases: object

An instance created in a subprocess to do calls.

  • Specific to a chain (connection is married with a chain, otherwise stateless)

  • Initialises the web3 connection at the start of the process

  • If you try to read using multicall when the contract is not yet deployed (see get_multicall_block_number()) then you get no results

Create subprocess worker instance.

Parameters
  • web3factory – Initialise connection within the subprocess

  • batch_size

    How many calls we pack into the multicall.

    Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.

__init__(web3factory, batch_size=40, backswitch_threshold=100, too_many_requets_sleep=61.0)

Create subprocess worker instance.

Parameters
  • web3factory (Union[eth_defi.event_reader.web3factory.Web3Factory, web3.main.Web3]) – Initialise connection within the subprocess

  • batch_size

    How many calls we pack into the multicall.

    Manually tuned number if your RPC nodes start to crap out, as they hit their internal time limits.

call_multicall_with_batch_size(multicall_contract, block_identifier, batch_size, encoded_calls, require_multicall_result)

Communicate with Multicall3 contract.

  • Fail safes for ugly situations

Parameters
  • multicall_contract (web3.contract.contract.Contract) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

  • batch_size (int) –

  • encoded_calls (list[tuple[eth_typing.evm.HexAddress, bytes]]) –

  • require_multicall_result (bool) –

Return type

list[tuple[bool, bytes]]

get_batch_size(web3, chain_id, block_identifier)

Fix non-standard out of gas issues.

TODO: Move these rules to their own module.

Parameters
  • web3 (web3.main.Web3) –

  • chain_id (int) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

Optional[int]

get_gas_hint(chain_id, batch_calls)

Fix non-standard out of gas issues

Parameters
Return type

Optional[int]

process_calls(block_identifier, calls, require_multicall_result=False, timestamp=None, min_fallback_retries=5)

Work a chunk of calls in the subprocess.

  • Divide unlimited number of calls to something we think Multicall3 and RPC node can handle

  • If a single batch fail

Parameters
  • require_multicall_result – Headache debug flag.

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) – Block number

  • timestamp (Optional[datetime.datetime]) – Block timestamp

  • min_fallback_retries – Bang all RPCs at least this many times when attempting to make progress.

  • calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) –

Return type

Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]

call_multicall(multicall_contract, calls, block_identifier)

Call a multicall contract.

Parameters
Return type

dict[Hashable, Any]

call_multicall_batched_single_thread(multicall_contract, calls, block_identifier, batch_size=15)

Call Multicall contract with a payload.

  • Single threaded

Parameters
  • web3_factory

    • Each thread will get its own web3 instance

  • batch_size – Don’t do more than this calls per one RPC.

  • multicall_contract (web3.contract.contract.Contract) –

  • calls (list[eth_defi.event_reader.multicall_batcher.MulticallWrapper]) –

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

Return type

dict[Hashable, Any]

call_multicall_debug_single_thread(multicall_contract, calls, block_identifier)

Skip Multicall contract and try eth_call directly.

  • For debugging problems

  • Perform normal eth_call

  • Log output what calls are going out to diagnose issues

Parameters
call_multicall_encoded(multicall_contract, calls, block_identifier)

Call a multicall contract.

Parameters
Return type

dict[Hashable, Any]

get_multicall_block_number(chain_id)

When the multicall contract was deployed for a chain.

Parameters

chain_id (int) –

Return type

Optional[int]

get_multicall_contract(web3, address=None, block_identifier=None)

Return a multicall smart contract instance.

  • Get IMulticall3 compiled with Forge

  • Use multicall3 ABI.

Parameters
Return type

web3.contract.contract.Contract

pin_fallback_provider_by_host(fallback_provider, host_substring)

Pin a FallbackProvider to the provider whose host matches a substring.

Unlike FallbackProvider.switch_provider() (which cycles or randomises), this deterministically selects a specific upstream — used to force HyperEVM multicall retries onto the Alchemy single node, bypassing goldsky’s eRPC consensus endpoint. See docs/README-hyperevm-goldsky-failure.md.

The switch goes through FallbackProvider.switch_to_provider_index(), so the pinned provider is chain-id verified and rolled back if it is misconfigured or routing to the wrong chain — we never silently read from a bad endpoint.

Parameters
Returns

True if a matching provider was found and successfully selected, False if no provider matched or the match failed chain-id verification (in which case the caller should resume normal provider switching).

Return type

bool

read_multicall_chunked(chain_id, web3factory, calls, block_identifier, max_workers=8, timeout=1800, chunk_size=40, progress_bar_desc=None, timestamped_results=True, backend='loky')

Read current data using multiple processes in parallel for speedup.

  • All calls hit the same block number

  • Show a progress bar using tqdm

Example:

# Generated packed multicall for each token contract we want to query
balance_of_signature = Web3.keccak(text="balanceOf(address)")[0:4]


def _gen_calls(addresses: Iterable[str]) -> Iterable[EncodedCall]:
    for _token_address in addresses:
        yield EncodedCall.from_keccak_signature(
            address=_token_address.lower(),
            signature=balance_of_signature,
            data=convert_address_to_bytes32(out_address),
            extra_data={},
            ignore_errors=True,
            function="balanceOf",
        )


web3factory = MultiProviderWeb3Factory(web3.provider.endpoint_uri, hint="fetch_erc20_balances_multicall")

# Execute calls for all token balance reads at a specific block.
# read_multicall_chunked() will automatically split calls to multiple chunks
# if we are querying too many.
results = read_multicall_chunked(
    chain_id=chain_id,
    web3factory=web3factory,
    calls=list(_gen_calls(tokens)),
    block_identifier=block_identifier,
    max_workers=max_workers,
    timestamped_results=False,
)

results = list(results)

addr_to_balance = LowercaseDict()

for result in results:
    token_address = result.call.address

    if not result.result:
        if raise_on_error:
            raise BalanceFetchFailed(f"Could not read token balance for ERC-20: {token_address} for address {out_address}")
        value = None
    else:
        raw_value = convert_int256_bytes_to_int(result.result)
        if decimalise:
            token = fetch_erc20_details(web3, token_address, cache=token_cache, chain_id=chain_id)
            value = token.convert_to_decimals(raw_value)
        else:
            value = raw_value

    addr_to_balance[token_address] = value
Parameters
  • chain_id (int) – Which EVM chain we are targeting with calls.

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses

  • calls (list[eth_defi.event_reader.multicall_batcher.EncodedCall]) – List of calls to perform against Multicall3.

  • chunk_size (int) – Max calls per one chunk sent to Multicall contract, to stay below JSON-RPC read gas limit.

  • max_workers – How many parallel processes to use.

  • timeout – Joblib timeout to wait for a result from an individual task.

  • block_identifier (Union[Literal['latest', 'earliest', 'pending', 'safe', 'finalized'], eth_typing.evm.BlockNumber, eth_typing.evm.Hash32, eth_typing.encoding.HexStr, int]) –

    Block number to read.

    • Can be a block number or “latest” or “earliest”

  • progress_bar_desc (Optional[str]) – If set, display a TQDM progress bar for the process.

  • timestamped_results

    Need timestamp of the block number in each result.

    Causes very slow eth_getBlock call, use only if needed.

  • backend

    Joblib backend to use.

    Either “loky” or “threading”.

Returns

Iterable of results.

One entry per each call.

Calls may be different order than originally given.

Return type

Iterable[eth_defi.event_reader.multicall_batcher.EncodedCallResult]

read_multicall_historical(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))

Read historical data using multiple threads in parallel for speedup.

  • Run over period of time (blocks)

  • Use multicall to harvest data from a single block at a time

  • Show a progress bar using tqdm

Parameters
  • chain_id (int) – Which chain we are targeting with calls.

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – The connection factory for subprocesses

  • start_block (int) – Block range to scoop

  • end_block (int) – Block range to scoop

  • step (int) – How many blocks we iterate at once

  • timeout – Joblib timeout to wait for a result from an individual task

  • progress_suffix (Optional[Callable]) – Allow caller to decorate the progress bar

  • require_multicall_result – Debug parameter to crash the reader if we start to get invalid replies from Multicall3 contract.

  • display_progress (Union[bool, str]) –

    Whether to display progress bar or not.

    Set to string to have a progress bar label.

  • hypersync_client (HypersyncClient | None) – Not used in this reader

  • calls (Iterable[eth_defi.event_reader.multicall_batcher.EncodedCall]) –

  • timestamp_cache_file (pathlib.Path) –

Return type

Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]

read_multicall_historical_stateful(chain_id, web3factory, calls, start_block, end_block, step, max_workers=8, timeout=1800, display_progress=True, progress_suffix=None, require_multicall_result=False, chunk_size=48, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))

Read historical data using multicall with reading state and adaptive frequency filtering.

  • Allow adaptive frequency with read state

  • Slower loop than the dumb read_multicall_historical() as it has to maintain state

  • Because of state, we need to do block by block reading, as we need to evaluate state to see which calls are needed for which block, and the state depends on the result of the previous blocks

Parameters
Return type

Iterable[eth_defi.event_reader.multicall_batcher.CombinedEncodedCallResult]

resolve_hyperevm_consensus_failover(chain_id, provider, exception)

Decide whether a failed HyperEVM multicall should fail over to a single node.

On HyperEVM (chain 999) the scan’s primary provider is goldsky’s eRPC endpoint running in consensus mode: it fans each eth_call to several upstream nodes and only returns a result when enough of them agree byte-for-byte. For some vaults the upstreams intermittently disagree and eRPC returns ERPC_CONSENSUS_DISAGREEMENT_CLUE. Retrying or randomly cycling back onto the same consensus endpoint is futile; a single (non-consensus) node such as Alchemy returns a usable answer immediately.

This helper detects that exact situation — HyperEVM chain id, a consensus disagreement error, and a provider mix that contains both a goldsky and an Alchemy endpoint — and returns the provider host substring to pin retries to.

See docs/README-hyperevm-goldsky-failure.md for the full failure analysis, the nodes involved, and the on-chain evidence.

Parameters
  • chain_id (int) – Chain id of the multicall being retried.

  • provider (Any) – The active web3 provider. Only FallbackProvider mixes are eligible (we need an alternative single node to fail over to).

  • exception (Exception) – The MulticallRetryable (or its cause) raised by the failed call.

Returns

Lower-case provider host substring ("alchemy") to pin retries to, or None if this is not the HyperEVM goldsky consensus failure mode.

Return type

Optional[str]