"""Main toxicity analysis orchestrator. Runs as a one-shot batch process: fetches unscored statuses, classifies them in batches with GPT-4o-mini, and stores scores in PostgreSQL. Usage: python -m app.analyzer ANALYZER_LIMIT=100 python -m app.analyzer # test on 100 statuses """ from __future__ import annotations import asyncio import logging import os import sys import time from .classifier import ToxicityClassifier from .config import AnalyzerConfig from .db import AnalyzerDB logger = logging.getLogger("analyzer") def make_batches(items: list, batch_size: int) -> list[list]: """Split a flat list into sublists of at most batch_size.""" return [items[i : i + batch_size] for i in range(0, len(items), batch_size)] async def classify_statuses( classifier: ToxicityClassifier, db: AnalyzerDB, statuses: list[dict], config: AnalyzerConfig, ) -> tuple[int, int, float]: """Classify statuses in batches, with concurrency control. Returns (scored_count, error_count, cost_usd). """ semaphore = asyncio.Semaphore(config.concurrency) scored = 0 errors = 0 total_input_tokens = 0 total_output_tokens = 0 batches = make_batches(statuses, config.batch_size) logger.info(" Split %d statuses into %d batches of ≤%d", len(statuses), len(batches), config.batch_size) async def process_batch(batch: list[dict]) -> None: nonlocal scored, errors, total_input_tokens, total_output_tokens async with semaphore: texts = [s["content"] for s in batch] try: results = await classifier.classify_batch(texts) for status, scores in zip(batch, results): try: score_dict = scores.to_dict() flagged = scores.is_flagged(config.flag_threshold) await db.store_status_score( status_id=status["id"], scores=score_dict, flagged=flagged, model=config.model, ) total_input_tokens += scores.input_tokens total_output_tokens += scores.output_tokens scored += 1 except Exception: errors += 1 logger.exception( "Failed to store score for status %d", status["id"] ) if scored % 100 < config.batch_size: logger.info(" Statuses scored: %d / %d", scored, len(statuses)) except Exception: # Whole batch failed (API error after retries) — count all as errors errors += len(batch) logger.exception( "Failed to classify batch of %d statuses", len(batch) ) tasks = [process_batch(b) for b in batches] await asyncio.gather(*tasks) cost = ( total_input_tokens * config.input_cost_per_m / 1_000_000 + total_output_tokens * config.output_cost_per_m / 1_000_000 ) return scored, errors, cost async def run() -> None: config = AnalyzerConfig.from_env() logging.basicConfig( level=getattr(logging, config.log_level.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) # Also log to file log_dir = "/app/logs" if os.path.isdir(log_dir): fh = logging.FileHandler(os.path.join(log_dir, "analyzer.log")) fh.setFormatter( logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") ) logging.getLogger().addHandler(fh) logger.info("=" * 60) logger.info("Toxicity Analyzer starting (model: %s, concurrency: %d, batch_size: %d)", config.model, config.concurrency, config.batch_size) db = AnalyzerDB(config.database_url) classifier = ToxicityClassifier( api_key=config.openai_api_key, model=config.model, ) try: await db.connect() await db.apply_migration() # Start analysis run run_id = await db.start_analysis_run(model=config.model) start_time = time.time() # Fetch unscored items limit = config.limit if config.limit > 0 else 0 statuses = await db.get_unscored_statuses(limit=limit) logger.info("Found %d unscored statuses", len(statuses)) if not statuses: logger.info("Nothing to score — exiting.") await db.finish_analysis_run( run_id, status="completed", statuses_scored=0, errors=0, cost_usd=0.0, ) return total_cost = 0.0 total_errors = 0 # Classify statuses logger.info("Classifying %d statuses in batches of %d...", len(statuses), config.batch_size) s_scored, s_errors, s_cost = await classify_statuses( classifier, db, statuses, config, ) logger.info(" Statuses done: %d scored, %d errors, $%.4f", s_scored, s_errors, s_cost) total_cost += s_cost total_errors += s_errors # Finalize run duration = time.time() - start_time status = "completed" if total_errors == 0 else "partial" await db.finish_analysis_run( run_id, status=status, statuses_scored=s_scored, errors=total_errors, cost_usd=total_cost, ) logger.info("=" * 60) logger.info("Analysis complete — status: %s", status) logger.info(" Statuses scored: %d, Errors: %d", s_scored, total_errors) logger.info(" Estimated cost: $%.4f", total_cost) logger.info(" Duration: %.1f seconds", duration) except Exception: logger.exception("Analyzer crashed") raise finally: await classifier.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()