Core3Database

Documentation for eth_defi.core3.database.Core3Database Python class.

class Core3Database

Bases: object

DuckDB database for storing Core3 risk intelligence data.

Stores project snapshots, PoL time-series, category breakdowns, and section details. Supports incremental sync using watermarks in the sync_state table.

Thread safety: all database operations are protected by _db_lock. Multiple threads can call insert methods concurrently — the API calls run in parallel while database writes are serialised.

Example:

from pathlib import Path
from eth_defi.core3.database import Core3Database

db = Core3Database(Path("/tmp/core3-risk.duckdb"))
db.save()
db.close()

Initialise the database connection.

Parameters

path – Path to the DuckDB file. Parent directories will be created if needed.

Methods summary

__init__(path)

Initialise the database connection.

close()

Close the database connection.

get_latest_pol_category(slug)

Get the latest per-category PoL breakdown for a project.

get_latest_project_snapshot_raw(slug)

Get the raw JSON payload and fetch timestamp for the latest snapshot of a project.

get_latest_project_snapshots()

Get the most recent snapshot for each project.

get_pol_category_daily(slug)

Get daily category PoL breakdown for a project.

get_pol_daily(slug)

Get daily PoL time-series for a project.

get_pol_daily_count()

Get total number of PoL daily records.

get_project_count()

Get number of unique projects in the database.

get_project_snapshot_history(slug)

Get all snapshots for a specific project over time.

get_snapshot_count()

Get total number of project snapshot records.

get_sync_state(slug, data_type)

Get sync state for a specific slug and data type.

insert_pol_category_daily_points(slug, ...)

Insert category PoL daily breakdown points.

insert_pol_daily_points(slug, points, fetched_at)

Insert PoL daily history points, deduplicating on (slug, ts).

insert_project_snapshot(slug, fetched_at, ...)

Insert a project snapshot, extracting key columns from the raw JSON.

insert_section_snapshot(slug, section, ...)

Insert a section snapshot.

save()

Force a checkpoint to ensure data is persisted to disk.

update_sync_state(slug, data_type, last_ts)

Update or insert sync state watermark.

__init__(path)

Initialise the database connection.

Parameters

path (pathlib.Path) – Path to the DuckDB file. Parent directories will be created if needed.

close()

Close the database connection.

DuckDB performs an implicit checkpoint on close, flushing the WAL to the main database file.

get_latest_pol_category(slug)

Get the latest per-category PoL breakdown for a project.

Returns the most recent row from pol_category_daily, which holds the API-native sub-scores for the five Core3 risk categories (security, financial, operational, reputational, regulatory). Used to embed the category breakdown in the vault metrics JSON export.

Parameters

slug (str) – Core3 project slug.

Returns

Dict with ts (naive UTC datetime) and security, financial, operational, reputational, regulatory float sub-scores (each may be None), or None if the slug has no category rows.

Return type

Optional[dict]

get_latest_project_snapshot_raw(slug)

Get the raw JSON payload and fetch timestamp for the latest snapshot of a project.

Parameters

slug (str) – Core3 project slug.

Returns

Tuple of (payload_json_string, fetched_at) or None if the slug has no snapshots.

Return type

Optional[tuple[str, datetime.datetime]]

get_latest_project_snapshots()

Get the most recent snapshot for each project.

Returns

DataFrame with the latest snapshot per slug, ordered by rank.

Return type

pandas.DataFrame

get_pol_category_daily(slug)

Get daily category PoL breakdown for a project.

Parameters

slug (str) – Project slug.

Returns

DataFrame with ts and category score columns.

Return type

pandas.DataFrame

get_pol_daily(slug)

Get daily PoL time-series for a project.

Parameters

slug (str) – Project slug (or INDEX_SLUG for the aggregate).

Returns

DataFrame with ts and pol_score columns, ordered by ts.

Return type

pandas.DataFrame

get_pol_daily_count()

Get total number of PoL daily records.

Returns

Total count of rows in pol_daily.

Return type

int

get_project_count()

Get number of unique projects in the database.

Returns

Count of unique slugs in project_snapshots.

Return type

int

get_project_snapshot_history(slug)

Get all snapshots for a specific project over time.

Parameters

slug (str) – Project slug.

Returns

DataFrame ordered by fetched_at.

Return type

pandas.DataFrame

get_snapshot_count()

Get total number of project snapshot records.

Returns

Total count of rows in project_snapshots.

Return type

int

get_sync_state(slug, data_type)

Get sync state for a specific slug and data type.

Parameters
  • slug (str) – Project slug.

  • data_type (str) – Data type key (e.g. 'pol_daily', 'pol_category_daily').

Returns

Dict with last_ts, backfill_done, and last_synced, or None if no state exists.

Return type

Optional[dict]

insert_pol_category_daily_points(slug, points, fetched_at)

Insert category PoL daily breakdown points.

Each point contains a timestamp and per-category PoL scores (security.score, financial.score, etc.).

Parameters
  • slug (str) – Project slug.

  • points (list[dict]) – List of point dicts from the category history API.

  • fetched_at (datetime.datetime) – When this data was fetched.

Returns

Number of new rows inserted.

Return type

int

insert_pol_daily_points(slug, points, fetched_at)

Insert PoL daily history points, deduplicating on (slug, ts).

Converts unix timestamps from the API to naive UTC datetimes. Deduplication uses a temp table with DELETE + INSERT instead of ON CONFLICT to avoid DuckDB 1.5.0 ART index crashes.

Parameters
  • slug (str) – Project slug (or INDEX_SLUG for the aggregate index).

  • points (list[dict]) – List of {score, timestamp} dicts from the API.

  • fetched_at (datetime.datetime) – When this data was fetched.

Returns

Number of new rows inserted.

Return type

int

insert_project_snapshot(slug, fetched_at, raw_json)

Insert a project snapshot, extracting key columns from the raw JSON.

Extracts name, rank, pol.score, pol.rating, and market_cap.in_usd from the raw JSON payload. Stores the full JSON as a VARCHAR for future re-extraction if the schema changes.

Parameters
  • slug (str) – Project slug.

  • fetched_at (datetime.datetime) – Timestamp of when the data was fetched.

  • raw_json (dict) – Full JSON response from /v1/{slug}.

Return type

None

insert_section_snapshot(slug, section, fetched_at, raw_json)

Insert a section snapshot.

Extracts pol.score from the section response as the section-level PoL sub-score.

Parameters
  • slug (str) – Project slug.

  • section (str) – Section name (security, financial, etc.).

  • fetched_at (datetime.datetime) – Fetch timestamp.

  • raw_json (dict) – Full section JSON response.

Return type

None

save()

Force a checkpoint to ensure data is persisted to disk.

update_sync_state(slug, data_type, last_ts, backfill_done=True)

Update or insert sync state watermark.

Always updates last_synced to now, even when last_ts is unchanged (zero new points). Sets backfill_done=TRUE after the initial chart backfill.

When backfill_done is TRUE but last_ts is NULL, subsequent runs use incremental with from=0 (epoch), which is effectively a full range but avoids re-calling the chart endpoint.

Parameters
  • slug (str) – Project slug.

  • data_type (str) – Data type key.

  • last_ts (Optional[int]) – Latest unix timestamp in the synced data, or None if empty.

  • backfill_done (bool) – Whether the initial backfill has been attempted.

Return type

None