VaultPostDatabase
Documentation for eth_defi.feed.database.VaultPostDatabase Python class.
- class VaultPostDatabase
Bases:
objectDuckDB database for tracked sources and collected posts.
Methods summary
__init__(path)close()Close the database connection.
fetch_recent_posts_by_feeder(feeder_ids[, ...])Fetch the most recent posts for each feeder across all source types.
get_known_post_ids([source_id])Return all known external_post_id values, optionally filtered by source.
Return stored posts for diagnostics.
get_source_last_post_timestamps(source_ids)Return the stored
last_post_published_atfor the given source IDs.get_sync_state(key)Read a value from the feed_sync_state table.
Return tracked source rows for diagnostics.
insert_posts(source_id, posts)Insert posts for a source and return the number of new rows.
mark_source_failure(source_id, error, *[, ...])Update sync state for a failed or skipped source fetch.
mark_source_success(source_id, *[, ...])Update sync state for a successful source fetch.
prune_posts(max_post_age_days)Delete posts older than the configured retention period.
save()Force a checkpoint.
set_sync_state(key, value)Write a value to the feed_sync_state table.
upsert_tracked_source(source)Insert or update one tracked source and return its source ID.
upsert_tracked_sources(sources)Insert or update tracked sources and return source IDs by logical key.
- __init__(path)
- Parameters
path (pathlib.Path) –
- close()
Close the database connection.
- Return type
None
- fetch_recent_posts_by_feeder(feeder_ids, max_per_feeder=10)
Fetch the most recent posts for each feeder across all source types.
Joins
tracked_sourcesandpostsonsource_id, ranks posts per feeder byCOALESCE(published_at, fetched_at) DESC, and returns the max_per_feeder newest posts per feeder.- Parameters
- Returns
Dict mapping
feeder_idto a list of post dicts with keystitle,short_description,full_text,post_url,source_type,published_at(always set via COALESCE fallback tofetched_at). Lists are ordered newest-first.- Return type
- get_known_post_ids(source_id=None)
Return all known external_post_id values, optionally filtered by source.
- get_posts_df()
Return stored posts for diagnostics.
- Return type
- get_source_last_post_timestamps(source_ids)
Return the stored
last_post_published_atfor the given source IDs.Used to gate backfill fallbacks: a source whose stored timestamp is not
Nonehas already been seen before and does not need a fallback individual timeline read.- Parameters
source_ids (Iterable[int]) – Iterable of numeric source IDs to look up.
- Returns
Mapping of
source_id → last_post_published_at(Nonewhen the column has never been set for that row).- Return type
dict[int, datetime.datetime | None]
- get_sync_state(key)
Read a value from the feed_sync_state table.
- get_tracked_sources_df()
Return tracked source rows for diagnostics.
- Return type
- insert_posts(source_id, posts)
Insert posts for a source and return the number of new rows.
- Parameters
source_id (int) –
posts (Iterable[eth_defi.feed.database.CollectedPost]) –
- Return type
- mark_source_failure(source_id, error, *, checked_at=None)
Update sync state for a failed or skipped source fetch.
- Parameters
source_id (int) –
error (str) –
checked_at (Optional[datetime.datetime]) –
- Return type
None
- mark_source_success(source_id, *, checked_at=None, last_post_published_at=None)
Update sync state for a successful source fetch.
- Parameters
source_id (int) –
checked_at (Optional[datetime.datetime]) –
last_post_published_at (Optional[datetime.datetime]) –
- Return type
None
- prune_posts(max_post_age_days)
Delete posts older than the configured retention period.
- save()
Force a checkpoint.
- Return type
None
- set_sync_state(key, value)
Write a value to the feed_sync_state table.
- upsert_tracked_source(source)
Insert or update one tracked source and return its source ID.
- Parameters
source (eth_defi.feed.sources.TrackedPostSource) –
- Return type