mastodon-collector/app/analyzer/analyzer.py

195 lines
6.1 KiB
Python
Raw Normal View History

"""Main toxicity analysis orchestrator.
Runs as a one-shot batch process: fetches unscored statuses,
classifies them in batches with LLM API, and stores scores in PostgreSQL.
Usage:
python -m app.analyzer
ANALYZER_LIMIT=100 python -m app.analyzer # test on 100 statuses
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import time
from .classifier import ToxicityClassifier
from .config import AnalyzerConfig
from .db import AnalyzerDB
logger = logging.getLogger("analyzer")
def make_batches(items: list, batch_size: int) -> list[list]:
"""Split a flat list into sublists of at most batch_size."""
return [items[i : i + batch_size] for i in range(0, len(items), batch_size)]
async def classify_statuses(
classifier: ToxicityClassifier,
db: AnalyzerDB,
statuses: list[dict],
config: AnalyzerConfig,
) -> tuple[int, int, float]:
"""Classify statuses in batches, with concurrency control.
Returns (scored_count, error_count, cost_usd).
"""
semaphore = asyncio.Semaphore(config.concurrency)
scored = 0
errors = 0
total_input_tokens = 0
total_output_tokens = 0
batches = make_batches(statuses, config.batch_size)
logger.info(" Split %d statuses into %d batches of ≤%d",
len(statuses), len(batches), config.batch_size)
async def process_batch(batch: list[dict]) -> None:
nonlocal scored, errors, total_input_tokens, total_output_tokens
async with semaphore:
texts = [s["content"] for s in batch]
try:
results = await classifier.classify_batch(texts)
for status, scores in zip(batch, results):
try:
score_dict = scores.to_dict()
flagged = scores.is_flagged(config.flag_threshold)
await db.store_status_score(
status_id=status["id"],
scores=score_dict,
flagged=flagged,
model=config.model,
)
total_input_tokens += scores.input_tokens
total_output_tokens += scores.output_tokens
scored += 1
except Exception:
errors += 1
logger.exception(
"Failed to store score for status %d", status["id"]
)
if scored % 100 < config.batch_size:
logger.info(" Statuses scored: %d / %d", scored, len(statuses))
except Exception:
# Whole batch failed (API error after retries) — count all as errors
errors += len(batch)
logger.exception(
"Failed to classify batch of %d statuses", len(batch)
)
tasks = [process_batch(b) for b in batches]
await asyncio.gather(*tasks)
cost = (
total_input_tokens * config.input_cost_per_m / 1_000_000
+ total_output_tokens * config.output_cost_per_m / 1_000_000
)
return scored, errors, cost
async def run() -> None:
config = AnalyzerConfig.from_env()
logging.basicConfig(
level=getattr(logging, config.log_level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# Also log to file
log_dir = "/app/logs"
if os.path.isdir(log_dir):
fh = logging.FileHandler(os.path.join(log_dir, "analyzer.log"))
fh.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
logging.getLogger().addHandler(fh)
logger.info("=" * 60)
logger.info("Toxicity Analyzer starting (model: %s, concurrency: %d, batch_size: %d)",
config.model, config.concurrency, config.batch_size)
db = AnalyzerDB(config.database_url)
classifier = ToxicityClassifier(
api_key=config.llm_api_key,
model=config.model,
)
try:
await db.connect()
await db.apply_migration()
# Start analysis run
run_id = await db.start_analysis_run(model=config.model)
start_time = time.time()
# Fetch unscored items
limit = config.limit if config.limit > 0 else 0
statuses = await db.get_unscored_statuses(limit=limit)
logger.info("Found %d unscored statuses", len(statuses))
if not statuses:
logger.info("Nothing to score — exiting.")
await db.finish_analysis_run(
run_id, status="completed",
statuses_scored=0, errors=0, cost_usd=0.0,
)
return
total_cost = 0.0
total_errors = 0
# Classify statuses
logger.info("Classifying %d statuses in batches of %d...",
len(statuses), config.batch_size)
s_scored, s_errors, s_cost = await classify_statuses(
classifier, db, statuses, config,
)
logger.info(" Statuses done: %d scored, %d errors, $%.4f",
s_scored, s_errors, s_cost)
total_cost += s_cost
total_errors += s_errors
# Finalize run
duration = time.time() - start_time
status = "completed" if total_errors == 0 else "partial"
await db.finish_analysis_run(
run_id,
status=status,
statuses_scored=s_scored,
errors=total_errors,
cost_usd=total_cost,
)
logger.info("=" * 60)
logger.info("Analysis complete — status: %s", status)
logger.info(" Statuses scored: %d, Errors: %d",
s_scored, total_errors)
logger.info(" Estimated cost: $%.4f", total_cost)
logger.info(" Duration: %.1f seconds", duration)
except Exception:
logger.exception("Analyzer crashed")
raise
finally:
await classifier.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()