ParquetDatasetBlockDataStore

Documentation for eth_defi.event_reader.parquet_block_data_store.ParquetDatasetBlockDataStore Python class.

class ParquetDatasetBlockDataStore[source]

Store block data as Parquet dataset.

  • Partitions are keyed by block number.

  • Partitioning allows fast incremental updates, by overwriting the last two partitions,

Methods summary

__init__(path[, partition_size])

param path

floor_block_number_to_partition_start(n)

is_virgin()

Has this store any stored data.

load([since_block_number])

Load data from parquet.

peak_last_block()

Return the last block number stored on the disk.

save(df[, since_block_number, ...])

Save all data from parquet.

save_incremental(df)

Write all partitions we are missing from the data.

__init__(path, partition_size=100000)[source]
Parameters
  • path (pathlib.Path) – Directory and a metadata file there

  • partition_size

is_virgin()[source]

Has this store any stored data.

Returns

There is data to load.

Return type

bool

load(since_block_number=0)[source]

Load data from parquet.

Parameters

since_block_number (int) – May return earlier rows than this if a block is a middle of a partition

Return type

pandas.core.frame.DataFrame

save(df, since_block_number=0, check_contains_all_blocks=True)[source]

Save all data from parquet.

If there are existing block headers written, any data will be overwritten on per partition basis.

Parameters
  • since_block_number (int) – Write only the latest data after this block number (inclusive)

  • check_contains_all_blocks – Check that we have at least one data record for every block. Note that trades might not happen on every block.

  • df (pandas.core.frame.DataFrame) –

save_incremental(df)[source]

Write all partitions we are missing from the data.

  • We need to write minimum two partitions

  • There might be gaps in data we can write

  • There might be gaps data on disk we have already written

  • Do some heurestics what to write

Parameters

df (pandas.core.frame.DataFrame) –

Return type

Tuple[int, int]

peak_last_block()[source]

Return the last block number stored on the disk.

Return type

Optional[int]