Implements comprehensive toxicity analysis following the Bluesky collector architecture: - Analyzer module with async batch processing using GPT-4o-mini - Database schema for toxicity scores and analysis run tracking - 12 toxicity categories (toxic, threat, hate_speech, racism, antisemitism, islamophobia, sexism, homophobia, insult, dehumanization, extremism, ableism) - Web interface routes for analysis dashboard and flagged content review - Manual review API endpoint for human validation - Analysis helper functions for database queries - Dutch language support with coded political term recognition Usage: docker exec mastodon-collector-collector-1 python -m app.analyzer See TOXICITY_ANALYSIS.md for full documentation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
194 lines
6.1 KiB
Python
194 lines
6.1 KiB
Python
"""Main toxicity analysis orchestrator.
|
|
|
|
Runs as a one-shot batch process: fetches unscored statuses,
|
|
classifies them in batches with GPT-4o-mini, and stores scores in PostgreSQL.
|
|
|
|
Usage:
|
|
python -m app.analyzer
|
|
ANALYZER_LIMIT=100 python -m app.analyzer # test on 100 statuses
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
from .classifier import ToxicityClassifier
|
|
from .config import AnalyzerConfig
|
|
from .db import AnalyzerDB
|
|
|
|
logger = logging.getLogger("analyzer")
|
|
|
|
|
|
def make_batches(items: list, batch_size: int) -> list[list]:
|
|
"""Split a flat list into sublists of at most batch_size."""
|
|
return [items[i : i + batch_size] for i in range(0, len(items), batch_size)]
|
|
|
|
|
|
async def classify_statuses(
|
|
classifier: ToxicityClassifier,
|
|
db: AnalyzerDB,
|
|
statuses: list[dict],
|
|
config: AnalyzerConfig,
|
|
) -> tuple[int, int, float]:
|
|
"""Classify statuses in batches, with concurrency control.
|
|
|
|
Returns (scored_count, error_count, cost_usd).
|
|
"""
|
|
semaphore = asyncio.Semaphore(config.concurrency)
|
|
scored = 0
|
|
errors = 0
|
|
total_input_tokens = 0
|
|
total_output_tokens = 0
|
|
|
|
batches = make_batches(statuses, config.batch_size)
|
|
logger.info(" Split %d statuses into %d batches of ≤%d",
|
|
len(statuses), len(batches), config.batch_size)
|
|
|
|
async def process_batch(batch: list[dict]) -> None:
|
|
nonlocal scored, errors, total_input_tokens, total_output_tokens
|
|
async with semaphore:
|
|
texts = [s["content"] for s in batch]
|
|
try:
|
|
results = await classifier.classify_batch(texts)
|
|
|
|
for status, scores in zip(batch, results):
|
|
try:
|
|
score_dict = scores.to_dict()
|
|
flagged = scores.is_flagged(config.flag_threshold)
|
|
|
|
await db.store_status_score(
|
|
status_id=status["id"],
|
|
scores=score_dict,
|
|
flagged=flagged,
|
|
model=config.model,
|
|
)
|
|
|
|
total_input_tokens += scores.input_tokens
|
|
total_output_tokens += scores.output_tokens
|
|
scored += 1
|
|
|
|
except Exception:
|
|
errors += 1
|
|
logger.exception(
|
|
"Failed to store score for status %d", status["id"]
|
|
)
|
|
|
|
if scored % 100 < config.batch_size:
|
|
logger.info(" Statuses scored: %d / %d", scored, len(statuses))
|
|
|
|
except Exception:
|
|
# Whole batch failed (API error after retries) — count all as errors
|
|
errors += len(batch)
|
|
logger.exception(
|
|
"Failed to classify batch of %d statuses", len(batch)
|
|
)
|
|
|
|
tasks = [process_batch(b) for b in batches]
|
|
await asyncio.gather(*tasks)
|
|
|
|
cost = (
|
|
total_input_tokens * config.input_cost_per_m / 1_000_000
|
|
+ total_output_tokens * config.output_cost_per_m / 1_000_000
|
|
)
|
|
return scored, errors, cost
|
|
|
|
|
|
async def run() -> None:
|
|
config = AnalyzerConfig.from_env()
|
|
|
|
logging.basicConfig(
|
|
level=getattr(logging, config.log_level.upper(), logging.INFO),
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
# Also log to file
|
|
log_dir = "/app/logs"
|
|
if os.path.isdir(log_dir):
|
|
fh = logging.FileHandler(os.path.join(log_dir, "analyzer.log"))
|
|
fh.setFormatter(
|
|
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
)
|
|
logging.getLogger().addHandler(fh)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Toxicity Analyzer starting (model: %s, concurrency: %d, batch_size: %d)",
|
|
config.model, config.concurrency, config.batch_size)
|
|
|
|
db = AnalyzerDB(config.database_url)
|
|
classifier = ToxicityClassifier(
|
|
api_key=config.openai_api_key,
|
|
model=config.model,
|
|
)
|
|
|
|
try:
|
|
await db.connect()
|
|
await db.apply_migration()
|
|
|
|
# Start analysis run
|
|
run_id = await db.start_analysis_run(model=config.model)
|
|
start_time = time.time()
|
|
|
|
# Fetch unscored items
|
|
limit = config.limit if config.limit > 0 else 0
|
|
statuses = await db.get_unscored_statuses(limit=limit)
|
|
|
|
logger.info("Found %d unscored statuses", len(statuses))
|
|
|
|
if not statuses:
|
|
logger.info("Nothing to score — exiting.")
|
|
await db.finish_analysis_run(
|
|
run_id, status="completed",
|
|
statuses_scored=0, errors=0, cost_usd=0.0,
|
|
)
|
|
return
|
|
|
|
total_cost = 0.0
|
|
total_errors = 0
|
|
|
|
# Classify statuses
|
|
logger.info("Classifying %d statuses in batches of %d...",
|
|
len(statuses), config.batch_size)
|
|
s_scored, s_errors, s_cost = await classify_statuses(
|
|
classifier, db, statuses, config,
|
|
)
|
|
logger.info(" Statuses done: %d scored, %d errors, $%.4f",
|
|
s_scored, s_errors, s_cost)
|
|
total_cost += s_cost
|
|
total_errors += s_errors
|
|
|
|
# Finalize run
|
|
duration = time.time() - start_time
|
|
status = "completed" if total_errors == 0 else "partial"
|
|
await db.finish_analysis_run(
|
|
run_id,
|
|
status=status,
|
|
statuses_scored=s_scored,
|
|
errors=total_errors,
|
|
cost_usd=total_cost,
|
|
)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Analysis complete — status: %s", status)
|
|
logger.info(" Statuses scored: %d, Errors: %d",
|
|
s_scored, total_errors)
|
|
logger.info(" Estimated cost: $%.4f", total_cost)
|
|
logger.info(" Duration: %.1f seconds", duration)
|
|
|
|
except Exception:
|
|
logger.exception("Analyzer crashed")
|
|
raise
|
|
finally:
|
|
await classifier.close()
|
|
await db.close()
|
|
|
|
|
|
def main() -> None:
|
|
asyncio.run(run())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|