mastodon-collector/app/analyzer/db.py
Pieter 27582c7b77 Add toxicity analysis system for Mastodon statuses
Implements comprehensive toxicity analysis following the Bluesky collector architecture:

- Analyzer module with async batch processing using GPT-4o-mini
- Database schema for toxicity scores and analysis run tracking
- 12 toxicity categories (toxic, threat, hate_speech, racism, antisemitism, islamophobia, sexism, homophobia, insult, dehumanization, extremism, ableism)
- Web interface routes for analysis dashboard and flagged content review
- Manual review API endpoint for human validation
- Analysis helper functions for database queries
- Dutch language support with coded political term recognition

Usage:
  docker exec mastodon-collector-collector-1 python -m app.analyzer

See TOXICITY_ANALYSIS.md for full documentation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-30 14:43:35 +02:00

145 lines
5.2 KiB
Python

"""Async database layer for the toxicity analyzer.
Handles fetching unscored statuses and storing classification results.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
import asyncpg
logger = logging.getLogger("analyzer.db")
MIGRATION_FILE = Path(__file__).parent.parent.parent / "scripts" / "02-toxicity.sql"
class AnalyzerDB:
"""Async PostgreSQL operations for the analyzer."""
def __init__(self, dsn: str):
self._dsn = dsn
self._pool: asyncpg.Pool | None = None
async def connect(self) -> None:
self._pool = await asyncpg.create_pool(self._dsn, min_size=2, max_size=10)
logger.info("Database connected")
async def close(self) -> None:
if self._pool:
await self._pool.close()
async def apply_migration(self) -> None:
"""Apply the toxicity schema migration if tables don't exist."""
async with self._pool.acquire() as conn:
# Check if toxicity_scores table exists
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'toxicity_scores'
)
""")
if not exists:
logger.info("Applying toxicity schema migration...")
sql = MIGRATION_FILE.read_text()
await conn.execute(sql)
logger.info("Migration applied successfully")
else:
logger.debug("Toxicity tables already exist")
# ── Fetch unscored items ─────────────────────────────────────────────
async def get_unscored_statuses(self, limit: int = 0) -> list[dict]:
"""Get statuses that haven't been scored yet.
Skips boosts (reblogs) and statuses with empty content.
"""
query = """
SELECT s.id, s.content, s.account_id
FROM statuses s
LEFT JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE ts.status_id IS NULL
AND s.reblog_of_id IS NULL
AND s.content IS NOT NULL
AND s.content != ''
ORDER BY s.created_at DESC
"""
if limit > 0:
query += f" LIMIT {limit}"
async with self._pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
# ── Store scores ─────────────────────────────────────────────────────
async def store_status_score(
self,
status_id: int,
scores: dict,
flagged: bool,
model: str,
) -> None:
"""Insert a toxicity score for a status."""
async with self._pool.acquire() as conn:
await conn.execute("""
INSERT INTO toxicity_scores
(status_id, overall, toxic, threat, hate_speech, racism,
antisemitism, islamophobia,
sexism, homophobia, insult, dehumanization,
extremism, ableism, flagged, model)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (status_id) DO NOTHING
""",
status_id,
scores["overall"],
scores["toxic"],
scores["threat"],
scores["hate_speech"],
scores["racism"],
scores["antisemitism"],
scores["islamophobia"],
scores["sexism"],
scores["homophobia"],
scores["insult"],
scores["dehumanization"],
scores["extremism"],
scores["ableism"],
flagged,
model,
)
# ── Analysis run tracking ────────────────────────────────────────────
async def start_analysis_run(self, model: str) -> int:
"""Create a new analysis run record. Returns run ID."""
async with self._pool.acquire() as conn:
return await conn.fetchval("""
INSERT INTO analysis_runs (model) VALUES ($1)
RETURNING id
""", model)
async def finish_analysis_run(
self,
run_id: int,
status: str,
statuses_scored: int,
errors: int,
cost_usd: float,
) -> None:
"""Finalize an analysis run with results."""
async with self._pool.acquire() as conn:
await conn.execute("""
UPDATE analysis_runs
SET finished_at = now(),
status = $2,
statuses_scored = $3,
errors = $4,
cost_usd = $5,
duration_secs = EXTRACT(EPOCH FROM (now() - started_at))
WHERE id = $1
""",
run_id, status, statuses_scored, errors, cost_usd,
)