Source code for eth_defi.event_reader.block_header

"""Block header data.

Structures and helpers to maintain block header data.
"""

import random
from dataclasses import dataclass
from typing import Dict, Optional, TypeAlias

#: 32 bit uint UNIX UTC timestamp of the block
#:
#: TODO: We might need float because high throughput chains like
#: Solana have subsecond timestamps
from eth_typing import HexStr

Timestamp: TypeAlias = int


[docs]@dataclass(slots=True, frozen=True) class BlockHeader: """Describe block headers for a single block. - This data is used to check chain reorganisations - This data is used to store block number -> block timestamp map and resolve trades to their UTC time """ #: 32-bit uint of he block number block_number: int #: Block hash as 0x prefixed string #: #: Note that this might be converted to binary data later. block_hash: str #: 32 bit uint UNIX UTC timestamp of the block #: #: TODO: We might need float because high throughput chains like #: Solana have subsecond timestamps timestamp: Timestamp def __post_init__(self): assert type(self.block_number) == int assert type(self.block_hash) == str, f"Got {type(self.block_hash)}" assert type(self.timestamp) == int assert self.block_hash.startswith("0x")
[docs] @staticmethod def generate_headers(count: int, start_block: int = 1, start_time: float = 0, blocks_per_second: float = 12) -> Dict[str, list]: """Generate random block header data. Used for testing. :return: DataFrame indexed by block number """ # Columnar data block_number = [] block_hash = [] timestamp = [] clock = start_time for i in range(start_block, start_block + count): block_number.append(i) block_hash.append(hex(random.randint(2**31, 2**32))) timestamp.append(int(clock)) clock += blocks_per_second return { "block_number": block_number, "block_hash": block_hash, "timestamp": timestamp, }
[docs] @staticmethod def to_pandas(headers: Dict[str, list], partition_size: Optional[int] = None): """Convert columnar header data to Pandas. .. note :: Depends on Pandas, but because we have optional dependency do a lazy import. :param headers: Raw block data to write. :param partition_size: Create a key "partition" which each contains partitioning_size blocks. E.g. 100_000. :return: """ # Optional dependency import pandas as pd # https://stackoverflow.com/a/64537577/315168 df = pd.DataFrame.from_dict(headers, orient="columns") df.set_index(df["block_number"], inplace=True, drop=False) if partition_size: assert partition_size > 0 # First partition starts at 1, not 0 df["partition"] = df["block_number"].apply(lambda x: max((x // partition_size) * partition_size, 1)) return df
[docs] @staticmethod def from_pandas(df) -> Dict[int, "BlockHeader"]: """Decode saved Pandas input.""" # Optional dependency import pandas as pd assert isinstance(df, pd.DataFrame) map = {} for idx, row in df.iterrows(): record = BlockHeader(block_number=row.block_number, block_hash=row.block_hash, timestamp=row.timestamp) map[record.block_number] = record return map