"""Async database layer for the toxicity analyzer. Handles fetching unscored statuses and storing classification results. """ from __future__ import annotations import logging from datetime import datetime, timezone from pathlib import Path import asyncpg logger = logging.getLogger("analyzer.db") MIGRATION_FILE = Path(__file__).parent.parent.parent / "scripts" / "02-toxicity.sql" class AnalyzerDB: """Async PostgreSQL operations for the analyzer.""" def __init__(self, dsn: str): self._dsn = dsn self._pool: asyncpg.Pool | None = None async def connect(self) -> None: self._pool = await asyncpg.create_pool(self._dsn, min_size=2, max_size=10) logger.info("Database connected") async def close(self) -> None: if self._pool: await self._pool.close() async def apply_migration(self) -> None: """Apply the toxicity schema migration if tables don't exist.""" async with self._pool.acquire() as conn: # Check if toxicity_scores table exists exists = await conn.fetchval(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'toxicity_scores' ) """) if not exists: logger.info("Applying toxicity schema migration...") sql = MIGRATION_FILE.read_text() await conn.execute(sql) logger.info("Migration applied successfully") else: logger.debug("Toxicity tables already exist") # ── Fetch unscored items ───────────────────────────────────────────── async def get_unscored_statuses(self, limit: int = 0) -> list[dict]: """Get statuses that haven't been scored yet. Skips boosts (reblogs) and statuses with empty content. """ query = """ SELECT s.id, s.content, s.account_db_id FROM statuses s LEFT JOIN toxicity_scores ts ON ts.status_id = s.id WHERE ts.status_id IS NULL AND s.status_type != 'reblog' AND s.content IS NOT NULL AND s.content != '' ORDER BY s.created_at DESC """ if limit > 0: query += f" LIMIT {limit}" async with self._pool.acquire() as conn: rows = await conn.fetch(query) return [dict(r) for r in rows] # ── Store scores ───────────────────────────────────────────────────── async def store_status_score( self, status_id: int, scores: dict, flagged: bool, model: str, ) -> None: """Insert a toxicity score for a status.""" async with self._pool.acquire() as conn: await conn.execute(""" INSERT INTO toxicity_scores (status_id, overall, toxic, threat, hate_speech, racism, antisemitism, islamophobia, sexism, homophobia, insult, dehumanization, extremism, ableism, flagged, model) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (status_id) DO NOTHING """, status_id, scores["overall"], scores["toxic"], scores["threat"], scores["hate_speech"], scores["racism"], scores["antisemitism"], scores["islamophobia"], scores["sexism"], scores["homophobia"], scores["insult"], scores["dehumanization"], scores["extremism"], scores["ableism"], flagged, model, ) # ── Analysis run tracking ──────────────────────────────────────────── async def start_analysis_run(self, model: str) -> int: """Create a new analysis run record. Returns run ID.""" async with self._pool.acquire() as conn: return await conn.fetchval(""" INSERT INTO analysis_runs (model) VALUES ($1) RETURNING id """, model) async def finish_analysis_run( self, run_id: int, status: str, statuses_scored: int, errors: int, cost_usd: float, ) -> None: """Finalize an analysis run with results.""" async with self._pool.acquire() as conn: await conn.execute(""" UPDATE analysis_runs SET finished_at = now(), status = $2, statuses_scored = $3, errors = $4, cost_usd = $5, duration_secs = EXTRACT(EPOCH FROM (now() - started_at)) WHERE id = $1 """, run_id, status, statuses_scored, errors, cost_usd, )