diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..cdcb402 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(git push:*)", + "Read(//tmp/bluesky-collector/**)", + "Bash(mkdir -p \"/Users/pieter/Nextcloud-Hetzner/PXS Cloud/Projects/26004 HEIO 2/04 Applications/mastodon-collector/app/analyzer\")" + ], + "deny": [], + "ask": [] + } +} diff --git a/TOXICITY_ANALYSIS.md b/TOXICITY_ANALYSIS.md new file mode 100644 index 0000000..8241a3a --- /dev/null +++ b/TOXICITY_ANALYSIS.md @@ -0,0 +1,242 @@ +# Toxicity Analysis System + +This document describes the toxicity analysis system for the Mastodon collector, adapted from the Bluesky collector implementation. + +## Overview + +The toxicity analysis system uses OpenAI's GPT-4o-mini to classify Mastodon posts across 12 toxicity categories: + +- **toxic**: rude, disrespectful, or aggressive language +- **threat**: threats of violence, harm, or intimidation +- **hate_speech**: targeting based on protected characteristics +- **racism**: race/ethnicity-based targeting +- **antisemitism**: anti-Jewish content +- **islamophobia**: anti-Muslim content +- **sexism**: gender-based discrimination +- **homophobia**: anti-LGBTQ+ content +- **insult**: personal attacks and name-calling +- **dehumanization**: comparing people to animals/vermin +- **extremism**: far-right/left extremist rhetoric +- **ableism**: targeting people with disabilities + +## Architecture + +The system consists of: + +1. **Analyzer Module** (`app/analyzer/`) - Async batch processor for classification +2. **Database Schema** (`scripts/02-toxicity.sql`) - Toxicity scores and analysis runs +3. **Web Interface** - Dashboard and flagged content review +4. **API Endpoints** - For manual review of flagged content + +## Setup + +### 1. Environment Variables + +Add to your `.env` file: + +```bash +# OpenAI API key for toxicity analysis +OPENAI_API_KEY=sk-... + +# Analyzer configuration (optional) +ANALYZER_MODEL=gpt-4o-mini +ANALYZER_BATCH_SIZE=10 +ANALYZER_CONCURRENCY=5 +ANALYZER_FLAG_THRESHOLD=0.5 +ANALYZER_LIMIT=0 # 0 = no limit, or set to test on limited number +``` + +### 2. Database Migration + +The toxicity schema is applied automatically when the analyzer runs for the first time. It creates: + +- `toxicity_scores` table - stores scores for each status +- `analysis_runs` table - audit trail of analysis runs + +To manually apply the migration: + +```bash +docker exec -i mastodon-collector-db-1 psql -U collector -d mastodon_collector < scripts/02-toxicity.sql +``` + +### 3. Install Dependencies + +Dependencies are already added to `requirements.txt`: +- `openai==1.58.1` - OpenAI API client +- `asyncpg==0.30.0` - Async PostgreSQL driver + +Rebuild the Docker containers to install: + +```bash +docker-compose build +docker-compose up -d +``` + +## Running the Analyzer + +### One-Time Analysis + +Run the analyzer manually to score all unscored statuses: + +```bash +docker exec mastodon-collector-collector-1 python -m app.analyzer +``` + +### Test on Limited Sample + +To test on 100 statuses first: + +```bash +docker exec mastodon-collector-collector-1 bash -c "ANALYZER_LIMIT=100 python -m app.analyzer" +``` + +### Automated Analysis (Future) + +You can schedule the analyzer to run periodically using cron or a scheduler service. For example, add to your `docker-compose.yml`: + +```yaml + analyzer: + build: . + command: python -m app.analyzer + environment: + - DATABASE_URL=postgresql://collector:${POSTGRES_PASSWORD}@db:5432/mastodon_collector + - OPENAI_API_KEY=${OPENAI_API_KEY} + - ANALYZER_LIMIT=${ANALYZER_LIMIT:-0} + depends_on: + - db + restart: "no" # Run once, don't restart +``` + +Then trigger manually: +```bash +docker-compose run --rm analyzer +``` + +## Web Interface + +### Analysis Dashboard + +Visit http://localhost:8585/analysis to see: + +- Overall statistics (total scored, flagged count, averages) +- Toxicity trends over time +- Category breakdown chart +- Recent analysis runs + +### Flagged Content Review + +Visit http://localhost:8585/analysis/flagged to: + +- Browse flagged content (threshold >= 0.5 by default) +- Filter by category, account, date range, review status +- Sort by overall toxicity or specific categories +- Manually review and mark items as: + - ✓ Correct (correctly flagged) + - ✗ Incorrect (false positive) + - ? Unsure + +### Review Workflow + +1. Click on flagged items to review +2. Use the review buttons (✓, ✗, ?) to mark your assessment +3. Filter by `review_status=unreviewed` to focus on items needing review +4. Use reviewed data to improve the classifier or adjust thresholds + +## Cost Estimation + +Based on GPT-4o-mini pricing (as of Jan 2025): +- Input: $0.150 per 1M tokens +- Output: $0.600 per 1M tokens + +Typical costs: +- ~1,000 statuses = $0.05-0.15 +- ~10,000 statuses = $0.50-1.50 + +The analyzer logs estimated costs after each run. + +## Architecture Details + +### Batch Processing + +The analyzer processes statuses in batches (default: 10 per API call) with concurrency control (default: 5 simultaneous batches). This optimizes for: + +- Cost efficiency (batch API calls) +- Rate limit compliance +- Parallel processing speed + +### Scoring Logic + +Each status receives: +- 12 category scores (0.0 - 1.0) +- Overall score = max of all categories +- Flagged if overall >= threshold (default 0.5) + +### Human Review + +Manual reviews help: +- Validate AI classifications +- Identify patterns of false positives +- Build training data for future improvements +- Adjust thresholds per category if needed + +## Dutch Language Support + +The classifier is specifically trained to handle Dutch political content, including: + +- Dutch slang and coded terms ("gelukszoekers", "omvolking", "wappie", etc.) +- Political context and satire +- Zwarte Piet debates +- Dutch far-right rhetoric + +## Templates + +The Bluesky collector templates can be adapted for Mastodon. Key files to create: + +1. `app/templates/analysis.html` - Main dashboard +2. `app/templates/flagged.html` - Flagged content browser + +These templates should include: +- Chart.js for visualizations +- Filter forms for exploration +- Review buttons for manual validation + +## Troubleshooting + +### No statuses being scored + +- Check that statuses exist: `SELECT COUNT(*) FROM statuses WHERE content IS NOT NULL AND reblog_of_id IS NULL;` +- Check migration applied: `\dt toxicity_scores` in psql +- Check OPENAI_API_KEY is set + +### Rate limit errors + +- Reduce `ANALYZER_CONCURRENCY` (try 2-3) +- Reduce `ANALYZER_BATCH_SIZE` (try 5) +- The analyzer retries with exponential backoff automatically + +### High false positive rate + +- Increase `ANALYZER_FLAG_THRESHOLD` (try 0.6 or 0.7) +- Review flagged items and look for patterns +- Dutch political content can be intense but not necessarily toxic + +### Template errors + +- Ensure templates exist in `app/templates/` +- Check that analysis helper functions are imported correctly +- Verify template filters are defined (`format_number`, `time_ago`, etc.) + +## Next Steps + +1. Copy analysis templates from Bluesky collector to `app/templates/` +2. Add navigation links to analysis dashboard in base template +3. Run initial analysis on sample data +4. Review flagged content and adjust thresholds +5. Set up automated analysis runs (cron/scheduler) +6. Monitor costs and performance + +## References + +- Bluesky collector: https://forgejo.postxsociety.cloud/pieter/bluesky-collector +- OpenAI API: https://platform.openai.com/docs +- asyncpg: https://magicstack.github.io/asyncpg/ diff --git a/app/analysis_helpers.py b/app/analysis_helpers.py new file mode 100644 index 0000000..5dde7c7 --- /dev/null +++ b/app/analysis_helpers.py @@ -0,0 +1,237 @@ +"""Database query helpers for toxicity analysis views.""" + +from sqlalchemy import func, desc, and_, or_, cast, Float +from sqlalchemy.orm import Session +from app.db import Status, MonitoredAccount +from datetime import datetime, timedelta + +# Toxicity categories for display +TOXICITY_CATEGORIES = [ + "toxic", "threat", "hate_speech", "racism", + "antisemitism", "islamophobia", "sexism", "homophobia", + "insult", "dehumanization", "extremism", "ableism" +] + + +def get_analysis_stats(session: Session) -> dict: + """Get overall toxicity analysis statistics.""" + from sqlalchemy import text + + # Total statuses and scored statuses + total_statuses = session.query(func.count(Status.id)).scalar() or 0 + + scored = session.execute(text(""" + SELECT COUNT(*) as total_scored, + COUNT(*) FILTER (WHERE flagged = true) as flagged, + AVG(overall) as avg_toxicity + FROM toxicity_scores + """)).fetchone() + + return { + "total_statuses": total_statuses, + "total_scored_statuses": scored[0] if scored else 0, + "flagged_statuses": scored[1] if scored else 0, + "avg_toxicity_statuses": float(scored[2]) if scored and scored[2] else 0.0, + } + + +def get_toxicity_trend(session: Session, weeks: int = 12) -> list[dict]: + """Get toxicity trend over time (weekly aggregates).""" + from sqlalchemy import text + + result = session.execute(text(""" + SELECT + DATE_TRUNC('week', s.created_at) as week, + AVG(ts.overall) as avg_toxicity, + COUNT(*) FILTER (WHERE ts.flagged = true) as flagged_count + FROM statuses s + JOIN toxicity_scores ts ON ts.status_id = s.id + WHERE s.created_at >= NOW() - INTERVAL ':weeks weeks' + GROUP BY week + ORDER BY week DESC + """), {"weeks": weeks}) + + return [{"week": r[0], "avg_toxicity": float(r[1]) if r[1] else 0.0, "flagged_statuses": r[2]} for r in result] + + +def get_category_averages(session: Session) -> dict: + """Get average toxicity score for each category.""" + from sqlalchemy import text + + result = session.execute(text(f""" + SELECT + AVG(toxic) as toxic, + AVG(threat) as threat, + AVG(hate_speech) as hate_speech, + AVG(racism) as racism, + AVG(antisemitism) as antisemitism, + AVG(islamophobia) as islamophobia, + AVG(sexism) as sexism, + AVG(homophobia) as homophobia, + AVG(insult) as insult, + AVG(dehumanization) as dehumanization, + AVG(extremism) as extremism, + AVG(ableism) as ableism + FROM toxicity_scores + """)).fetchone() + + if not result: + return {cat: 0.0 for cat in TOXICITY_CATEGORIES} + + return {cat: float(result[i]) if result[i] else 0.0 for i, cat in enumerate(TOXICITY_CATEGORIES)} + + +def get_recent_analysis_runs(session: Session, limit: int = 5) -> list[dict]: + """Get recent analysis runs.""" + from sqlalchemy import text + + result = session.execute(text(""" + SELECT id, started_at, finished_at, status, statuses_scored, errors, cost_usd, duration_secs + FROM analysis_runs + ORDER BY started_at DESC + LIMIT :limit + """), {"limit": limit}) + + return [ + { + "id": r[0], + "started_at": r[1], + "finished_at": r[2], + "status": r[3], + "statuses_scored": r[4], + "errors": r[5], + "cost_usd": r[6], + "duration_secs": r[7] + } + for r in result + ] + + +def get_flagged_content( + session: Session, + category: str = None, + account_id: int = None, + threshold: float = 0.5, + review_status: str = None, + date_from: str = None, + date_to: str = None, + sort: str = "overall", + direction: str = "desc", + limit: int = 50, + offset: int = 0, +) -> tuple[list[dict], int]: + """Get flagged content with filters.""" + from sqlalchemy import text + + # Build WHERE clauses + where_clauses = ["ts.flagged = true"] + params = {"threshold": threshold, "limit": limit, "offset": offset} + + if category: + where_clauses.append(f"ts.{category} >= :threshold") + params["category_threshold"] = threshold + + if account_id: + where_clauses.append("s.account_db_id = :account_id") + params["account_id"] = account_id + + if review_status: + if review_status == "unreviewed": + where_clauses.append("ts.human_reviewed = false") + else: + where_clauses.append("ts.review_status = :review_status") + params["review_status"] = review_status + + if date_from: + where_clauses.append("s.created_at >= :date_from") + params["date_from"] = date_from + + if date_to: + where_clauses.append("s.created_at <= :date_to") + params["date_to"] = date_to + + where_sql = " AND ".join(where_clauses) + + # Valid sort columns + valid_sorts = ["overall", "created_at", "toxic", "threat", "hate_speech", "racism", + "antisemitism", "islamophobia", "sexism", "homophobia", "insult", + "dehumanization", "extremism", "ableism"] + if sort not in valid_sorts: + sort = "overall" + + direction = "DESC" if direction == "desc" else "ASC" + + # Get total count + count_query = f""" + SELECT COUNT(*) + FROM statuses s + JOIN toxicity_scores ts ON ts.status_id = s.id + WHERE {where_sql} + """ + total = session.execute(text(count_query), params).scalar() or 0 + + # Get items with details + query = f""" + SELECT + s.id, + s.status_id, + s.content, + s.text_content, + s.created_at, + s.url, + s.status_type, + ma.username, + ma.instance, + ts.overall, + ts.toxic, ts.threat, ts.hate_speech, ts.racism, + ts.antisemitism, ts.islamophobia, ts.sexism, ts.homophobia, + ts.insult, ts.dehumanization, ts.extremism, ts.ableism, + ts.human_reviewed, + ts.review_status, + ts.reviewed_at + FROM statuses s + JOIN toxicity_scores ts ON ts.status_id = s.id + JOIN monitored_accounts ma ON ma.id = s.account_db_id + WHERE {where_sql} + ORDER BY ts.{sort} {direction}, s.created_at DESC + LIMIT :limit OFFSET :offset + """ + + result = session.execute(text(query), params) + + items = [] + for r in result: + # Find top category + scores = { + "toxic": r[10], "threat": r[11], "hate_speech": r[12], "racism": r[13], + "antisemitism": r[14], "islamophobia": r[15], "sexism": r[16], "homophobia": r[17], + "insult": r[18], "dehumanization": r[19], "extremism": r[20], "ableism": r[21] + } + top_category = max(scores, key=scores.get) if any(scores.values()) else None + + items.append({ + "id": r[0], + "status_id": r[1], + "content": r[2], + "text_content": r[3], + "created_at": r[4], + "url": r[5], + "status_type": r[6], + "author_username": r[7], + "author_instance": r[8], + "author_handle": f"@{r[7]}@{r[8]}", + "overall": float(r[9]), + "top_category": top_category, + "scores": scores, + "human_reviewed": r[22], + "review_status": r[23], + "reviewed_at": r[24], + }) + + return items, total + + +def get_accounts_for_select(session: Session) -> list[dict]: + """Get all monitored accounts for dropdowns.""" + accounts = session.query(MonitoredAccount.id, MonitoredAccount.username, MonitoredAccount.instance).all() + return [{"id": a[0], "handle": f"@{a[1]}@{a[2]}"} for a in accounts] diff --git a/app/analyzer/__init__.py b/app/analyzer/__init__.py new file mode 100644 index 0000000..5a7b82d --- /dev/null +++ b/app/analyzer/__init__.py @@ -0,0 +1 @@ +"""Toxicity analysis module for Mastodon collector.""" diff --git a/app/analyzer/__main__.py b/app/analyzer/__main__.py new file mode 100644 index 0000000..da57fa9 --- /dev/null +++ b/app/analyzer/__main__.py @@ -0,0 +1,6 @@ +"""Entry point for running the analyzer as a module.""" + +from .analyzer import main + +if __name__ == "__main__": + main() diff --git a/app/analyzer/analyzer.py b/app/analyzer/analyzer.py new file mode 100644 index 0000000..a1c6731 --- /dev/null +++ b/app/analyzer/analyzer.py @@ -0,0 +1,194 @@ +"""Main toxicity analysis orchestrator. + +Runs as a one-shot batch process: fetches unscored statuses, +classifies them in batches with GPT-4o-mini, and stores scores in PostgreSQL. + +Usage: + python -m app.analyzer + ANALYZER_LIMIT=100 python -m app.analyzer # test on 100 statuses +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sys +import time + +from .classifier import ToxicityClassifier +from .config import AnalyzerConfig +from .db import AnalyzerDB + +logger = logging.getLogger("analyzer") + + +def make_batches(items: list, batch_size: int) -> list[list]: + """Split a flat list into sublists of at most batch_size.""" + return [items[i : i + batch_size] for i in range(0, len(items), batch_size)] + + +async def classify_statuses( + classifier: ToxicityClassifier, + db: AnalyzerDB, + statuses: list[dict], + config: AnalyzerConfig, +) -> tuple[int, int, float]: + """Classify statuses in batches, with concurrency control. + + Returns (scored_count, error_count, cost_usd). + """ + semaphore = asyncio.Semaphore(config.concurrency) + scored = 0 + errors = 0 + total_input_tokens = 0 + total_output_tokens = 0 + + batches = make_batches(statuses, config.batch_size) + logger.info(" Split %d statuses into %d batches of ≤%d", + len(statuses), len(batches), config.batch_size) + + async def process_batch(batch: list[dict]) -> None: + nonlocal scored, errors, total_input_tokens, total_output_tokens + async with semaphore: + texts = [s["content"] for s in batch] + try: + results = await classifier.classify_batch(texts) + + for status, scores in zip(batch, results): + try: + score_dict = scores.to_dict() + flagged = scores.is_flagged(config.flag_threshold) + + await db.store_status_score( + status_id=status["id"], + scores=score_dict, + flagged=flagged, + model=config.model, + ) + + total_input_tokens += scores.input_tokens + total_output_tokens += scores.output_tokens + scored += 1 + + except Exception: + errors += 1 + logger.exception( + "Failed to store score for status %d", status["id"] + ) + + if scored % 100 < config.batch_size: + logger.info(" Statuses scored: %d / %d", scored, len(statuses)) + + except Exception: + # Whole batch failed (API error after retries) — count all as errors + errors += len(batch) + logger.exception( + "Failed to classify batch of %d statuses", len(batch) + ) + + tasks = [process_batch(b) for b in batches] + await asyncio.gather(*tasks) + + cost = ( + total_input_tokens * config.input_cost_per_m / 1_000_000 + + total_output_tokens * config.output_cost_per_m / 1_000_000 + ) + return scored, errors, cost + + +async def run() -> None: + config = AnalyzerConfig.from_env() + + logging.basicConfig( + level=getattr(logging, config.log_level.upper(), logging.INFO), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + # Also log to file + log_dir = "/app/logs" + if os.path.isdir(log_dir): + fh = logging.FileHandler(os.path.join(log_dir, "analyzer.log")) + fh.setFormatter( + logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") + ) + logging.getLogger().addHandler(fh) + + logger.info("=" * 60) + logger.info("Toxicity Analyzer starting (model: %s, concurrency: %d, batch_size: %d)", + config.model, config.concurrency, config.batch_size) + + db = AnalyzerDB(config.database_url) + classifier = ToxicityClassifier( + api_key=config.openai_api_key, + model=config.model, + ) + + try: + await db.connect() + await db.apply_migration() + + # Start analysis run + run_id = await db.start_analysis_run(model=config.model) + start_time = time.time() + + # Fetch unscored items + limit = config.limit if config.limit > 0 else 0 + statuses = await db.get_unscored_statuses(limit=limit) + + logger.info("Found %d unscored statuses", len(statuses)) + + if not statuses: + logger.info("Nothing to score — exiting.") + await db.finish_analysis_run( + run_id, status="completed", + statuses_scored=0, errors=0, cost_usd=0.0, + ) + return + + total_cost = 0.0 + total_errors = 0 + + # Classify statuses + logger.info("Classifying %d statuses in batches of %d...", + len(statuses), config.batch_size) + s_scored, s_errors, s_cost = await classify_statuses( + classifier, db, statuses, config, + ) + logger.info(" Statuses done: %d scored, %d errors, $%.4f", + s_scored, s_errors, s_cost) + total_cost += s_cost + total_errors += s_errors + + # Finalize run + duration = time.time() - start_time + status = "completed" if total_errors == 0 else "partial" + await db.finish_analysis_run( + run_id, + status=status, + statuses_scored=s_scored, + errors=total_errors, + cost_usd=total_cost, + ) + + logger.info("=" * 60) + logger.info("Analysis complete — status: %s", status) + logger.info(" Statuses scored: %d, Errors: %d", + s_scored, total_errors) + logger.info(" Estimated cost: $%.4f", total_cost) + logger.info(" Duration: %.1f seconds", duration) + + except Exception: + logger.exception("Analyzer crashed") + raise + finally: + await classifier.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/app/analyzer/classifier.py b/app/analyzer/classifier.py new file mode 100644 index 0000000..a3de131 --- /dev/null +++ b/app/analyzer/classifier.py @@ -0,0 +1,287 @@ +"""OpenAI-powered toxicity classifier. + +Sends batches of Mastodon statuses to GPT-4o-mini for multi-category toxicity scoring. +Returns a list of dicts of category → score (0.0–1.0). +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import random +from dataclasses import dataclass + +from openai import AsyncOpenAI, RateLimitError, APITimeoutError, APIConnectionError + +logger = logging.getLogger("analyzer.classifier") + +# The 12 toxicity categories we score +CATEGORIES = [ + "toxic", + "threat", + "hate_speech", + "racism", + "antisemitism", + "islamophobia", + "sexism", + "homophobia", + "insult", + "dehumanization", + "extremism", + "ableism", +] + +# System prompt for batch classification — multiple numbered statuses per call +SYSTEM_PROMPT = """\ +You are a toxicity classifier for Dutch and English social media posts about politics. + +You will receive one or more numbered posts. Score EACH post on every category from 0.0 (none) to 1.0 (extreme). +Respond ONLY with a JSON object mapping post numbers (as strings) to their scores. No other text. + +Categories: +- toxic: rude, disrespectful, or aggressive language +- threat: threats of violence, harm, intimidation, or calls to action against a person +- hate_speech: targeting people based on any protected characteristic (race, religion, gender, sexual orientation, disability, nationality) +- racism: specifically targeting race or ethnicity (e.g. anti-Black, anti-Asian, anti-Moroccan sentiment, "Zwarte Piet" debates when derogatory) +- antisemitism: targeting Jewish people, Holocaust denial or minimization, Jewish conspiracy theories, coded language like "globalists", "Rothschilds", triple parentheses +- islamophobia: anti-Muslim hate, mosque opposition framed as hate, "Islam is not a religion" rhetoric, "takeover/omvolking" narratives, halal/hijab targeting +- sexism: gender-based discrimination, harassment, misogyny, or misandry +- homophobia: targeting sexual orientation or gender identity, anti-LGBTQ+ rhetoric +- insult: personal attacks, name-calling, belittling +- dehumanization: comparing people to animals, vermin, disease, parasites, or other dehumanizing language +- extremism: far-right or far-left extremist rhetoric, Nazi symbolism or glorification, white supremacist language, Great Replacement theory ("omvolkingstheorie"), calls for political violence, fascist/authoritarian glorification +- ableism: targeting people with disabilities, using mental health conditions as insults (e.g. "gestoord", "autist" as slur, "mongool") + +Important context: +- Many posts are in Dutch. Handle Dutch slang, insults, and coded political language. +- Dutch-specific coded terms: "gelukszoekers", "kutmarokkanen", "omvolking", "landverrader", "volksverrader", "linkse ratten", "wappie", "tokkie" — score appropriately based on context. +- Political disagreement and criticism are NOT toxic — only score actual hostility, hate, or threats. +- Satire and parody accounts may use irony — consider context but still score the literal content. +- A score of 0.0 means the category is completely absent. A score of 1.0 means extreme/explicit. +- Most posts will score 0.0 on most categories. Only flag genuine toxicity. + +Example for 2 posts: +{"1":{"toxic":0.0,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.0,"dehumanization":0.0,"extremism":0.0,"ableism":0.0},"2":{"toxic":0.3,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.2,"dehumanization":0.0,"extremism":0.0,"ableism":0.0}}""" + + +@dataclass +class ToxicityScores: + """Classification result for a single status.""" + + toxic: float = 0.0 + threat: float = 0.0 + hate_speech: float = 0.0 + racism: float = 0.0 + antisemitism: float = 0.0 + islamophobia: float = 0.0 + sexism: float = 0.0 + homophobia: float = 0.0 + insult: float = 0.0 + dehumanization: float = 0.0 + extremism: float = 0.0 + ableism: float = 0.0 + + @property + def overall(self) -> float: + """Overall toxicity = max of all categories.""" + return max( + self.toxic, + self.threat, + self.hate_speech, + self.racism, + self.antisemitism, + self.islamophobia, + self.sexism, + self.homophobia, + self.insult, + self.dehumanization, + self.extremism, + self.ableism, + ) + + def is_flagged(self, threshold: float = 0.5) -> bool: + return self.overall >= threshold + + def to_dict(self) -> dict: + return { + "toxic": self.toxic, + "threat": self.threat, + "hate_speech": self.hate_speech, + "racism": self.racism, + "antisemitism": self.antisemitism, + "islamophobia": self.islamophobia, + "sexism": self.sexism, + "homophobia": self.homophobia, + "insult": self.insult, + "dehumanization": self.dehumanization, + "extremism": self.extremism, + "ableism": self.ableism, + "overall": self.overall, + } + + # Approximate token counts for cost tracking + input_tokens: int = 0 + output_tokens: int = 0 + + +def parse_scores(raw: str) -> ToxicityScores: + """Parse the JSON scores for a single status into ToxicityScores.""" + try: + data = json.loads(raw) if isinstance(raw, str) else raw + except json.JSONDecodeError: + logger.warning("Failed to parse JSON response: %s", str(raw)[:200]) + return ToxicityScores() + + def clamp(val) -> float: + try: + f = float(val) + return max(0.0, min(1.0, f)) + except (TypeError, ValueError): + return 0.0 + + return ToxicityScores( + toxic=clamp(data.get("toxic")), + threat=clamp(data.get("threat")), + hate_speech=clamp(data.get("hate_speech")), + racism=clamp(data.get("racism")), + antisemitism=clamp(data.get("antisemitism")), + islamophobia=clamp(data.get("islamophobia")), + sexism=clamp(data.get("sexism")), + homophobia=clamp(data.get("homophobia")), + insult=clamp(data.get("insult")), + dehumanization=clamp(data.get("dehumanization")), + extremism=clamp(data.get("extremism")), + ableism=clamp(data.get("ableism")), + ) + + +def parse_batch_response(raw: str, batch_size: int) -> list[ToxicityScores]: + """Parse a batched JSON response into a list of ToxicityScores. + + Expected format: {"1": {...scores...}, "2": {...scores...}, ...} + Returns a list of ToxicityScores in the same order as the input batch. + """ + try: + data = json.loads(raw) + except json.JSONDecodeError: + logger.warning("Failed to parse batch JSON: %s", raw[:300]) + return [ToxicityScores() for _ in range(batch_size)] + + results = [] + for i in range(1, batch_size + 1): + key = str(i) + if key in data and isinstance(data[key], dict): + results.append(parse_scores(data[key])) + else: + logger.warning("Missing scores for status %d in batch response", i) + results.append(ToxicityScores()) + + return results + + +class ToxicityClassifier: + """Async OpenAI-based toxicity classifier with batch support.""" + + def __init__(self, api_key: str, model: str = "gpt-4o-mini"): + self.client = AsyncOpenAI(api_key=api_key) + self.model = model + + async def classify_batch( + self, texts: list[str], max_retries: int = 5 + ) -> list[ToxicityScores]: + """Classify multiple statuses in a single API call. + + Args: + texts: List of status texts to classify (1–batch_size items). + max_retries: Number of retries on rate limit / transient errors. + + Returns: + List of ToxicityScores, one per input text, in the same order. + """ + if not texts: + return [] + + # Handle single-item batches efficiently + batch_size = len(texts) + + # Build the numbered user message + parts = [] + for i, text in enumerate(texts, 1): + # Truncate very long statuses + t = text.strip() if text else "" + if len(t) > 2000: + t = t[:2000] + if not t: + t = "(empty)" + parts.append(f"[{i}] {t}") + user_message = "\n\n".join(parts) + + # Scale max_tokens by batch size. + # Each status's JSON scores ≈ 60 tokens compact, but the model often + # outputs formatted JSON (whitespace/newlines) which can double that. + # Use a generous budget to avoid truncation. + max_tokens = max(300, batch_size * 200) + + last_err = None + for attempt in range(max_retries): + try: + response = await self.client.chat.completions.create( + model=self.model, + temperature=0, + max_tokens=max_tokens, + response_format={"type": "json_object"}, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_message}, + ], + ) + + content = response.choices[0].message.content or "{}" + results = parse_batch_response(content, batch_size) + + # Distribute token usage evenly for cost tracking + if response.usage: + per_status_input = response.usage.prompt_tokens // batch_size + per_status_output = response.usage.completion_tokens // batch_size + for scores in results: + scores.input_tokens = per_status_input + scores.output_tokens = per_status_output + + return results + + except RateLimitError as e: + last_err = e + wait = min(2 ** attempt + random.uniform(0.5, 1.5), 30) + logger.debug( + "Rate limited (attempt %d/%d), waiting %.1fs", + attempt + 1, max_retries, wait, + ) + await asyncio.sleep(wait) + + except (APITimeoutError, APIConnectionError) as e: + last_err = e + wait = 2 ** attempt + random.uniform(0, 1) + logger.debug( + "Transient error (attempt %d/%d), retrying in %.1fs: %s", + attempt + 1, max_retries, wait, e, + ) + await asyncio.sleep(wait) + + except Exception: + logger.exception( + "Batch classification API call failed (%d statuses)", batch_size + ) + raise + + # All retries exhausted + logger.error("Rate limit retries exhausted for batch of %d statuses", batch_size) + raise last_err + + async def classify(self, text: str, max_retries: int = 5) -> ToxicityScores: + """Classify a single status (convenience wrapper around classify_batch).""" + results = await self.classify_batch([text], max_retries=max_retries) + return results[0] + + async def close(self): + await self.client.close() diff --git a/app/analyzer/config.py b/app/analyzer/config.py new file mode 100644 index 0000000..64b1cd2 --- /dev/null +++ b/app/analyzer/config.py @@ -0,0 +1,38 @@ +"""Configuration for the toxicity analyzer.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass +class AnalyzerConfig: + """Configuration for the toxicity analyzer.""" + + database_url: str + openai_api_key: str + model: str = "gpt-4o-mini" + batch_size: int = 10 + concurrency: int = 5 + flag_threshold: float = 0.5 + limit: int = 0 # 0 = no limit + log_level: str = "INFO" + + # Pricing (as of 2025, per million tokens) + input_cost_per_m: float = 0.150 # $0.150 per 1M input tokens + output_cost_per_m: float = 0.600 # $0.600 per 1M output tokens + + @classmethod + def from_env(cls) -> AnalyzerConfig: + """Load configuration from environment variables.""" + return cls( + database_url=os.environ["DATABASE_URL"], + openai_api_key=os.environ["OPENAI_API_KEY"], + model=os.getenv("ANALYZER_MODEL", "gpt-4o-mini"), + batch_size=int(os.getenv("ANALYZER_BATCH_SIZE", "10")), + concurrency=int(os.getenv("ANALYZER_CONCURRENCY", "5")), + flag_threshold=float(os.getenv("ANALYZER_FLAG_THRESHOLD", "0.5")), + limit=int(os.getenv("ANALYZER_LIMIT", "0")), + log_level=os.getenv("ANALYZER_LOG_LEVEL", "INFO"), + ) diff --git a/app/analyzer/db.py b/app/analyzer/db.py new file mode 100644 index 0000000..0112949 --- /dev/null +++ b/app/analyzer/db.py @@ -0,0 +1,145 @@ +"""Async database layer for the toxicity analyzer. + +Handles fetching unscored statuses and storing classification results. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from pathlib import Path + +import asyncpg + +logger = logging.getLogger("analyzer.db") + +MIGRATION_FILE = Path(__file__).parent.parent.parent / "scripts" / "02-toxicity.sql" + + +class AnalyzerDB: + """Async PostgreSQL operations for the analyzer.""" + + def __init__(self, dsn: str): + self._dsn = dsn + self._pool: asyncpg.Pool | None = None + + async def connect(self) -> None: + self._pool = await asyncpg.create_pool(self._dsn, min_size=2, max_size=10) + logger.info("Database connected") + + async def close(self) -> None: + if self._pool: + await self._pool.close() + + async def apply_migration(self) -> None: + """Apply the toxicity schema migration if tables don't exist.""" + async with self._pool.acquire() as conn: + # Check if toxicity_scores table exists + exists = await conn.fetchval(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'toxicity_scores' + ) + """) + if not exists: + logger.info("Applying toxicity schema migration...") + sql = MIGRATION_FILE.read_text() + await conn.execute(sql) + logger.info("Migration applied successfully") + else: + logger.debug("Toxicity tables already exist") + + # ── Fetch unscored items ───────────────────────────────────────────── + + async def get_unscored_statuses(self, limit: int = 0) -> list[dict]: + """Get statuses that haven't been scored yet. + + Skips boosts (reblogs) and statuses with empty content. + """ + query = """ + SELECT s.id, s.content, s.account_id + FROM statuses s + LEFT JOIN toxicity_scores ts ON ts.status_id = s.id + WHERE ts.status_id IS NULL + AND s.reblog_of_id IS NULL + AND s.content IS NOT NULL + AND s.content != '' + ORDER BY s.created_at DESC + """ + if limit > 0: + query += f" LIMIT {limit}" + + async with self._pool.acquire() as conn: + rows = await conn.fetch(query) + return [dict(r) for r in rows] + + # ── Store scores ───────────────────────────────────────────────────── + + async def store_status_score( + self, + status_id: int, + scores: dict, + flagged: bool, + model: str, + ) -> None: + """Insert a toxicity score for a status.""" + async with self._pool.acquire() as conn: + await conn.execute(""" + INSERT INTO toxicity_scores + (status_id, overall, toxic, threat, hate_speech, racism, + antisemitism, islamophobia, + sexism, homophobia, insult, dehumanization, + extremism, ableism, flagged, model) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + ON CONFLICT (status_id) DO NOTHING + """, + status_id, + scores["overall"], + scores["toxic"], + scores["threat"], + scores["hate_speech"], + scores["racism"], + scores["antisemitism"], + scores["islamophobia"], + scores["sexism"], + scores["homophobia"], + scores["insult"], + scores["dehumanization"], + scores["extremism"], + scores["ableism"], + flagged, + model, + ) + + # ── Analysis run tracking ──────────────────────────────────────────── + + async def start_analysis_run(self, model: str) -> int: + """Create a new analysis run record. Returns run ID.""" + async with self._pool.acquire() as conn: + return await conn.fetchval(""" + INSERT INTO analysis_runs (model) VALUES ($1) + RETURNING id + """, model) + + async def finish_analysis_run( + self, + run_id: int, + status: str, + statuses_scored: int, + errors: int, + cost_usd: float, + ) -> None: + """Finalize an analysis run with results.""" + async with self._pool.acquire() as conn: + await conn.execute(""" + UPDATE analysis_runs + SET finished_at = now(), + status = $2, + statuses_scored = $3, + errors = $4, + cost_usd = $5, + duration_secs = EXTRACT(EPOCH FROM (now() - started_at)) + WHERE id = $1 + """, + run_id, status, statuses_scored, errors, cost_usd, + ) diff --git a/app/web.py b/app/web.py index 698f717..bfc64ee 100644 --- a/app/web.py +++ b/app/web.py @@ -379,5 +379,142 @@ def export_csv(): session.close() +@app.route("/analysis") +def analysis_dashboard(): + """Toxicity analysis dashboard.""" + from app.analysis_helpers import ( + get_analysis_stats, + get_toxicity_trend, + get_category_averages, + get_recent_analysis_runs, + TOXICITY_CATEGORIES, + ) + import json + + session = get_session() + try: + stats = get_analysis_stats(session) + trend = get_toxicity_trend(session, weeks=12) + categories = get_category_averages(session) + runs = get_recent_analysis_runs(session, limit=5) + + # Prepare chart data + trend_json = json.dumps([ + { + "week": r["week"].strftime("%Y-%m-%d") if r["week"] else "", + "avg_toxicity": round(float(r["avg_toxicity"]), 4), + "flagged_statuses": int(r["flagged_statuses"]), + } + for r in trend + ]) + + categories_json = json.dumps({k: round(float(v), 4) for k, v in categories.items()}) + + return render_template( + "analysis.html", + stats=stats, + trend_json=trend_json, + categories_json=categories_json, + categories=TOXICITY_CATEGORIES, + runs=runs, + ) + finally: + session.close() + + +@app.route("/analysis/flagged") +def analysis_flagged(): + """View flagged content.""" + from app.analysis_helpers import ( + get_flagged_content, + get_accounts_for_select, + TOXICITY_CATEGORIES, + ) + + session = get_session() + try: + category = request.args.get("category") or None + account_id = request.args.get("account_id", type=int) or None + threshold = request.args.get("threshold", 0.5, type=float) + review_status = request.args.get("review_status") or None + date_from = request.args.get("date_from") or None + date_to = request.args.get("date_to") or None + sort = request.args.get("sort", "overall") + direction = request.args.get("dir", "desc") + page = max(1, request.args.get("page", 1, type=int)) + per_page = 50 + + items, total = get_flagged_content( + session, + category=category, + account_id=account_id, + threshold=threshold, + review_status=review_status, + date_from=date_from, + date_to=date_to, + sort=sort, + direction=direction, + limit=per_page, + offset=(page - 1) * per_page, + ) + total_pages = max(1, (total + per_page - 1) // per_page) + accounts = get_accounts_for_select(session) + + return render_template( + "flagged.html", + items=items, + total=total, + page=page, + total_pages=total_pages, + accounts=accounts, + categories=TOXICITY_CATEGORIES, + category=category or "", + account_id=account_id or "", + threshold=threshold, + review_status=review_status or "", + date_from=date_from or "", + date_to=date_to or "", + sort=sort, + direction=direction, + ) + finally: + session.close() + + +@app.route("/api/review/submit", methods=["POST"]) +def api_review_submit(): + """Submit a human review for a flagged status.""" + from sqlalchemy import text + + data = request.get_json() + status_id = data.get("status_id") + review_status = data.get("review_status") + + if not all([status_id, review_status]): + return jsonify({"error": "Missing required fields"}), 400 + + if review_status not in ["correct", "incorrect", "unsure"]: + return jsonify({"error": "Invalid review_status"}), 400 + + session = get_session() + try: + session.execute(text(""" + UPDATE toxicity_scores + SET human_reviewed = true, + review_status = :review_status, + reviewed_at = NOW() + WHERE status_id = :status_id + """), {"review_status": review_status, "status_id": status_id}) + session.commit() + + return jsonify({"success": True, "message": "Review submitted"}), 200 + except Exception as e: + session.rollback() + logger.error(f"Failed to submit review: {e}") + return jsonify({"error": str(e)}), 500 + finally: + session.close() + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/requirements.txt b/requirements.txt index b554053..735ed42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,6 @@ psycopg2-binary==2.9.10 sqlalchemy==2.0.36 requests==2.32.3 apscheduler==3.10.4 +beautifulsoup4==4.12.3 +openai==1.58.1 +asyncpg==0.30.0 diff --git a/scripts/02-toxicity.sql b/scripts/02-toxicity.sql new file mode 100644 index 0000000..942b474 --- /dev/null +++ b/scripts/02-toxicity.sql @@ -0,0 +1,44 @@ +-- Toxicity Analysis Schema +-- Stores per-status toxicity scores from LLM classification. + +-- Toxicity scores for statuses +CREATE TABLE IF NOT EXISTS toxicity_scores ( + status_id BIGINT PRIMARY KEY REFERENCES statuses(id) ON DELETE CASCADE, + overall REAL NOT NULL, + toxic REAL NOT NULL DEFAULT 0, + threat REAL NOT NULL DEFAULT 0, + hate_speech REAL NOT NULL DEFAULT 0, + racism REAL NOT NULL DEFAULT 0, + antisemitism REAL NOT NULL DEFAULT 0, + islamophobia REAL NOT NULL DEFAULT 0, + sexism REAL NOT NULL DEFAULT 0, + homophobia REAL NOT NULL DEFAULT 0, + insult REAL NOT NULL DEFAULT 0, + dehumanization REAL NOT NULL DEFAULT 0, + extremism REAL NOT NULL DEFAULT 0, + ableism REAL NOT NULL DEFAULT 0, + flagged BOOLEAN NOT NULL DEFAULT false, + model TEXT NOT NULL DEFAULT 'gpt-4o-mini', + scored_at TIMESTAMPTZ NOT NULL DEFAULT now(), + human_reviewed BOOLEAN NOT NULL DEFAULT false, + review_status TEXT, -- 'correct', 'incorrect', 'unsure' + reviewed_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_tox_flagged ON toxicity_scores (flagged) WHERE flagged = true; +CREATE INDEX IF NOT EXISTS idx_tox_overall ON toxicity_scores (overall DESC); +CREATE INDEX IF NOT EXISTS idx_tox_scored ON toxicity_scores (scored_at DESC); +CREATE INDEX IF NOT EXISTS idx_tox_reviewed ON toxicity_scores (human_reviewed, review_status); + +-- Analysis run audit trail +CREATE TABLE IF NOT EXISTS analysis_runs ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + finished_at TIMESTAMPTZ, + status TEXT NOT NULL DEFAULT 'running', -- running | completed | failed | partial + statuses_scored INTEGER NOT NULL DEFAULT 0, + errors INTEGER NOT NULL DEFAULT 0, + model TEXT NOT NULL, + cost_usd NUMERIC(10,6) DEFAULT 0, + duration_secs NUMERIC +);