Source code for eth_defi.event_reader.progress_update

"""Different reporters for event reading progress.

- Stdout printing

- TQDM progress bars

- Python logging based notifications

"""
import datetime

from tqdm_loggable.auto import tqdm

from eth_defi.event_reader.reader import ProgressUpdate


[docs]class PrintProgressUpdate(ProgressUpdate): """Print to stdout the read progress.""" def __call__( self, current_block: int, start_block: int, end_block: int, chunk_size: int, total_events: int, last_timestamp: int, context, ): done = (current_block - start_block) / (end_block - start_block) print(f"Scanning blocks {current_block:,} - {current_block + chunk_size:,}, done {done * 100:.1f}%")
[docs]class TQDMProgressUpdate(ProgressUpdate): """Use TQDM progress bars to display the progress. - Works in console - Works in Jupyter Notebook with HTML progress bars - Can be set to loggable output for headless process You need to have `tqdm-loggable module installed <https://github.com/tradingstrategy-ai/tqdm-loggable>`__. `See more info <https://github.com/tradingstrategy-ai/tqdm-loggable>`__. Example: .. code-block:: python reader = MultithreadEventReader( provider.endpoint_uri, max_threads=16, notify=TQDMProgressUpdate("Scanning Enzyme Asset List"), max_blocks_once=10_000, reorg_mon=None, ) logger.info(f"Scanning for Enzyme price feed events {start_block:,} - {end_block:,}") feeds = fetch_updated_price_feed( deployment, start_block=start_block, end_block=end_block, read_events=reader, ) reader.close() """
[docs] def __init__(self, name: str, colour="green"): """ :param name: Progress bar label :param colour: Used in Jupyter notebooks """ self.name = name self.colour = colour self.progress_bar = None
def create_progress_bar(self, start_block, end_block): blocks = end_block - start_block progress_bar = tqdm(total=blocks, colour=self.colour) return progress_bar def __call__( self, current_block: int, start_block: int, end_block: int, chunk_size: int, total_events: int, last_timestamp: int, context, ): if self.progress_bar is None: self.progress_bar = self.create_progress_bar(start_block, end_block) if last_timestamp: friendly_time = datetime.datetime.utcfromtimestamp(last_timestamp).strftime("%Y-%m-%d %H:%M") else: friendly_time = "NA" self.progress_bar.set_description(f"{self.name}, {start_block:,} - {end_block:,}, {chunk_size:,} block per request, last block time {friendly_time}") self.progress_bar.update(chunk_size) self.progress_bar.set_postfix( { "Events found": total_events, } ) def close(self): self.progress_bar.close()