vault.historical

Documentation for eth_defi.vault.historical Python module.

Read historical state of vaults.

  • Use multicall to get data points for multiple vaults once

  • Include
    • Share price

    • TVL

    • Fees

See VaultHistoricalReadMulticaller for usage.

Functions

pformat_scan_result(self)

Format the result as a string.

scan_historical_prices_to_parquet(...[, ...])

Scan all historical vault share prices of vaults and save them in to Parquet file.

Classes

ParquetScanResult

Result of generating historical prices Parquet file.

VaultHistoricalReadMulticaller

Read historical data from multiple vaults using multicall and archive node polling.

Exceptions

VaultReadNotSupported

Vault cannot be read due to misconfiguration somewhere.

class ParquetScanResult

Bases: TypedDict

Result of generating historical prices Parquet file.

__init__(*args, **kwargs)
__new__(**kwargs)
clear()

Remove all items from the dict.

copy()

Return a shallow copy of the dict.

fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items()

Return a set-like object providing a view on the dict’s items.

keys()

Return a set-like object providing a view on the dict’s keys.

pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from mapping/iterable E and F.

If E is present and has a .keys() method, then does: for k in E.keys(): D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()

Return an object providing a view on the dict’s values.

class VaultHistoricalReadMulticaller

Bases: object

Read historical data from multiple vaults using multicall and archive node polling.

Parameters

supported_quote_tokens – Allows us to validate vaults against list of supported tokens

__init__(web3factory, supported_quote_tokens=set[eth_defi.token.TokenDetails] | None, max_workers=8, token_cache=None, require_multicall_result=False, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'))
Parameters
generate_vault_historical_calls(readers, display_progress=True)

Generate multicalls for each vault to read its state at any block.

Parameters
Return type

Iterable[tuple[eth_defi.event_reader.multicall_batcher.EncodedCall, eth_defi.event_reader.multicall_batcher.BatchCallState]]

prepare_readers(vaults, stateful=False, saved_states=None)

Create readrs for vaults.

Parameters
Return type

dict[eth_typing.evm.HexAddress, eth_defi.vault.base.VaultHistoricalReader]

read_historical(vaults, start_block, end_block, step, reader_func=<function read_multicall_historical>, saved_states=None)

Create an iterable that extracts vault record from RPC.

Parameters
Returns

Unordered results

Return type

Iterable[eth_defi.vault.base.VaultHistoricalRead]

save_reader_state()

Save the state of all readers.

Returns

Dictionary keyed by the vault spce

Return type

dict[eth_defi.vault.base.VaultSpec, dict]

validate_vaults(vaults)

Check that we can read these vaults.

  • Validate that we know how to read vaults

Raises

VaultReadNotSupported – In the case we cannot read some of the vaults

Parameters

vaults (list[eth_defi.vault.base.VaultBase]) –

exception VaultReadNotSupported

Bases: Exception

Vault cannot be read due to misconfiguration somewhere.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note(note, /)

Add a note to the exception

with_traceback(tb, /)

Set self.__traceback__ to tb and return self.

pformat_scan_result(self)

Format the result as a string.

Return type

str

scan_historical_prices_to_parquet(output_fname, web3, web3factory, vaults, token_cache, start_block=None, end_block=None, step=None, chunk_size=1024, compression='zstd', max_workers=8, require_multicall_result=False, frequency='1d', reader_states=None, hypersync_client=None, timestamp_cache_file=PosixPath('/home/runner/.tradingstrategy/block-timestamp'), vault_addresses=None)

Scan all historical vault share prices of vaults and save them in to Parquet file.

  • Write historical prices to a Parquet file

  • Multiprocess-boosted

  • The same Parquet file can contain data from multiple chains

Parameters
  • output_fname (pathlib.Path) –

    Path to a destination Parquet file.

    If the file exists and vault_addresses is set, only entries for those vaults are deleted and rewritten. Otherwise all entries for the current chain are deleted and rewritten.

  • web3 (web3.main.Web3) – Web3 connection

  • web3factory (eth_defi.event_reader.web3factory.Web3Factory) – Creation of connections in subprocess

  • vaults (list[eth_defi.vault.base.VaultBase]) –

    Vaults of which historical price we scan.

    All vaults must have their first_seen_at_block attribute set to increase scan performance.

  • start_block

    First block to scan.

    Leave empty to autodetect

  • end_block

    Last block to scan.

    Leave empty to autodetect.

  • step_duration

    What is the historical step size (1 day).

    Will be automatically attmpeted to map to a block time.

  • step – What is the step is in number of blocks.

  • chunk_size – How many rows to write to the Parquet file in one buffer.

  • max_workers – Number of subprocesses to use for multicall

  • hypersync_client – Speed up the discovery of timestamps

  • vault_addresses (Optional[set[str]]) –

    If set, only delete and rewrite parquet rows for these vault addresses.

    Addresses must be lowercase. When None, all rows for the chain are deleted and rewritten (default behaviour).

  • token_cache (eth_defi.token.TokenDiskCache) –

  • frequency (Literal['1d', '1h']) –

  • reader_states (Optional[dict[eth_defi.vault.base.VaultSpec, dict]]) –

Returns

Scan report.

Return type

eth_defi.vault.historical.ParquetScanResult