Add toxicity analysis system for Mastodon statuses

Implements comprehensive toxicity analysis following the Bluesky collector architecture:

- Analyzer module with async batch processing using GPT-4o-mini
- Database schema for toxicity scores and analysis run tracking
- 12 toxicity categories (toxic, threat, hate_speech, racism, antisemitism, islamophobia, sexism, homophobia, insult, dehumanization, extremism, ableism)
- Web interface routes for analysis dashboard and flagged content review
- Manual review API endpoint for human validation
- Analysis helper functions for database queries
- Dutch language support with coded political term recognition

Usage:
  docker exec mastodon-collector-collector-1 python -m app.analyzer

See TOXICITY_ANALYSIS.md for full documentation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Pieter 2026-03-30 14:43:35 +02:00
parent 72dbf0d2b6
commit 0aa4a16fab
12 changed files with 1345 additions and 0 deletions

View file

@ -0,0 +1,11 @@
{
"permissions": {
"allow": [
"Bash(git push:*)",
"Read(//tmp/bluesky-collector/**)",
"Bash(mkdir -p \"/Users/pieter/Nextcloud-Hetzner/PXS Cloud/Projects/26004 HEIO 2/04 Applications/mastodon-collector/app/analyzer\")"
],
"deny": [],
"ask": []
}
}

242
TOXICITY_ANALYSIS.md Normal file
View file

@ -0,0 +1,242 @@
# Toxicity Analysis System
This document describes the toxicity analysis system for the Mastodon collector, adapted from the Bluesky collector implementation.
## Overview
The toxicity analysis system uses OpenAI's GPT-4o-mini to classify Mastodon posts across 12 toxicity categories:
- **toxic**: rude, disrespectful, or aggressive language
- **threat**: threats of violence, harm, or intimidation
- **hate_speech**: targeting based on protected characteristics
- **racism**: race/ethnicity-based targeting
- **antisemitism**: anti-Jewish content
- **islamophobia**: anti-Muslim content
- **sexism**: gender-based discrimination
- **homophobia**: anti-LGBTQ+ content
- **insult**: personal attacks and name-calling
- **dehumanization**: comparing people to animals/vermin
- **extremism**: far-right/left extremist rhetoric
- **ableism**: targeting people with disabilities
## Architecture
The system consists of:
1. **Analyzer Module** (`app/analyzer/`) - Async batch processor for classification
2. **Database Schema** (`scripts/02-toxicity.sql`) - Toxicity scores and analysis runs
3. **Web Interface** - Dashboard and flagged content review
4. **API Endpoints** - For manual review of flagged content
## Setup
### 1. Environment Variables
Add to your `.env` file:
```bash
# OpenAI API key for toxicity analysis
OPENAI_API_KEY=sk-...
# Analyzer configuration (optional)
ANALYZER_MODEL=gpt-4o-mini
ANALYZER_BATCH_SIZE=10
ANALYZER_CONCURRENCY=5
ANALYZER_FLAG_THRESHOLD=0.5
ANALYZER_LIMIT=0 # 0 = no limit, or set to test on limited number
```
### 2. Database Migration
The toxicity schema is applied automatically when the analyzer runs for the first time. It creates:
- `toxicity_scores` table - stores scores for each status
- `analysis_runs` table - audit trail of analysis runs
To manually apply the migration:
```bash
docker exec -i mastodon-collector-db-1 psql -U collector -d mastodon_collector < scripts/02-toxicity.sql
```
### 3. Install Dependencies
Dependencies are already added to `requirements.txt`:
- `openai==1.58.1` - OpenAI API client
- `asyncpg==0.30.0` - Async PostgreSQL driver
Rebuild the Docker containers to install:
```bash
docker-compose build
docker-compose up -d
```
## Running the Analyzer
### One-Time Analysis
Run the analyzer manually to score all unscored statuses:
```bash
docker exec mastodon-collector-collector-1 python -m app.analyzer
```
### Test on Limited Sample
To test on 100 statuses first:
```bash
docker exec mastodon-collector-collector-1 bash -c "ANALYZER_LIMIT=100 python -m app.analyzer"
```
### Automated Analysis (Future)
You can schedule the analyzer to run periodically using cron or a scheduler service. For example, add to your `docker-compose.yml`:
```yaml
analyzer:
build: .
command: python -m app.analyzer
environment:
- DATABASE_URL=postgresql://collector:${POSTGRES_PASSWORD}@db:5432/mastodon_collector
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANALYZER_LIMIT=${ANALYZER_LIMIT:-0}
depends_on:
- db
restart: "no" # Run once, don't restart
```
Then trigger manually:
```bash
docker-compose run --rm analyzer
```
## Web Interface
### Analysis Dashboard
Visit http://localhost:8585/analysis to see:
- Overall statistics (total scored, flagged count, averages)
- Toxicity trends over time
- Category breakdown chart
- Recent analysis runs
### Flagged Content Review
Visit http://localhost:8585/analysis/flagged to:
- Browse flagged content (threshold >= 0.5 by default)
- Filter by category, account, date range, review status
- Sort by overall toxicity or specific categories
- Manually review and mark items as:
- ✓ Correct (correctly flagged)
- ✗ Incorrect (false positive)
- ? Unsure
### Review Workflow
1. Click on flagged items to review
2. Use the review buttons (✓, ✗, ?) to mark your assessment
3. Filter by `review_status=unreviewed` to focus on items needing review
4. Use reviewed data to improve the classifier or adjust thresholds
## Cost Estimation
Based on GPT-4o-mini pricing (as of Jan 2025):
- Input: $0.150 per 1M tokens
- Output: $0.600 per 1M tokens
Typical costs:
- ~1,000 statuses = $0.05-0.15
- ~10,000 statuses = $0.50-1.50
The analyzer logs estimated costs after each run.
## Architecture Details
### Batch Processing
The analyzer processes statuses in batches (default: 10 per API call) with concurrency control (default: 5 simultaneous batches). This optimizes for:
- Cost efficiency (batch API calls)
- Rate limit compliance
- Parallel processing speed
### Scoring Logic
Each status receives:
- 12 category scores (0.0 - 1.0)
- Overall score = max of all categories
- Flagged if overall >= threshold (default 0.5)
### Human Review
Manual reviews help:
- Validate AI classifications
- Identify patterns of false positives
- Build training data for future improvements
- Adjust thresholds per category if needed
## Dutch Language Support
The classifier is specifically trained to handle Dutch political content, including:
- Dutch slang and coded terms ("gelukszoekers", "omvolking", "wappie", etc.)
- Political context and satire
- Zwarte Piet debates
- Dutch far-right rhetoric
## Templates
The Bluesky collector templates can be adapted for Mastodon. Key files to create:
1. `app/templates/analysis.html` - Main dashboard
2. `app/templates/flagged.html` - Flagged content browser
These templates should include:
- Chart.js for visualizations
- Filter forms for exploration
- Review buttons for manual validation
## Troubleshooting
### No statuses being scored
- Check that statuses exist: `SELECT COUNT(*) FROM statuses WHERE content IS NOT NULL AND reblog_of_id IS NULL;`
- Check migration applied: `\dt toxicity_scores` in psql
- Check OPENAI_API_KEY is set
### Rate limit errors
- Reduce `ANALYZER_CONCURRENCY` (try 2-3)
- Reduce `ANALYZER_BATCH_SIZE` (try 5)
- The analyzer retries with exponential backoff automatically
### High false positive rate
- Increase `ANALYZER_FLAG_THRESHOLD` (try 0.6 or 0.7)
- Review flagged items and look for patterns
- Dutch political content can be intense but not necessarily toxic
### Template errors
- Ensure templates exist in `app/templates/`
- Check that analysis helper functions are imported correctly
- Verify template filters are defined (`format_number`, `time_ago`, etc.)
## Next Steps
1. Copy analysis templates from Bluesky collector to `app/templates/`
2. Add navigation links to analysis dashboard in base template
3. Run initial analysis on sample data
4. Review flagged content and adjust thresholds
5. Set up automated analysis runs (cron/scheduler)
6. Monitor costs and performance
## References
- Bluesky collector: https://forgejo.postxsociety.cloud/pieter/bluesky-collector
- OpenAI API: https://platform.openai.com/docs
- asyncpg: https://magicstack.github.io/asyncpg/

237
app/analysis_helpers.py Normal file
View file

@ -0,0 +1,237 @@
"""Database query helpers for toxicity analysis views."""
from sqlalchemy import func, desc, and_, or_, cast, Float
from sqlalchemy.orm import Session
from app.db import Status, MonitoredAccount
from datetime import datetime, timedelta
# Toxicity categories for display
TOXICITY_CATEGORIES = [
"toxic", "threat", "hate_speech", "racism",
"antisemitism", "islamophobia", "sexism", "homophobia",
"insult", "dehumanization", "extremism", "ableism"
]
def get_analysis_stats(session: Session) -> dict:
"""Get overall toxicity analysis statistics."""
from sqlalchemy import text
# Total statuses and scored statuses
total_statuses = session.query(func.count(Status.id)).scalar() or 0
scored = session.execute(text("""
SELECT COUNT(*) as total_scored,
COUNT(*) FILTER (WHERE flagged = true) as flagged,
AVG(overall) as avg_toxicity
FROM toxicity_scores
""")).fetchone()
return {
"total_statuses": total_statuses,
"total_scored_statuses": scored[0] if scored else 0,
"flagged_statuses": scored[1] if scored else 0,
"avg_toxicity_statuses": float(scored[2]) if scored and scored[2] else 0.0,
}
def get_toxicity_trend(session: Session, weeks: int = 12) -> list[dict]:
"""Get toxicity trend over time (weekly aggregates)."""
from sqlalchemy import text
result = session.execute(text("""
SELECT
DATE_TRUNC('week', s.created_at) as week,
AVG(ts.overall) as avg_toxicity,
COUNT(*) FILTER (WHERE ts.flagged = true) as flagged_count
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE s.created_at >= NOW() - INTERVAL ':weeks weeks'
GROUP BY week
ORDER BY week DESC
"""), {"weeks": weeks})
return [{"week": r[0], "avg_toxicity": float(r[1]) if r[1] else 0.0, "flagged_statuses": r[2]} for r in result]
def get_category_averages(session: Session) -> dict:
"""Get average toxicity score for each category."""
from sqlalchemy import text
result = session.execute(text(f"""
SELECT
AVG(toxic) as toxic,
AVG(threat) as threat,
AVG(hate_speech) as hate_speech,
AVG(racism) as racism,
AVG(antisemitism) as antisemitism,
AVG(islamophobia) as islamophobia,
AVG(sexism) as sexism,
AVG(homophobia) as homophobia,
AVG(insult) as insult,
AVG(dehumanization) as dehumanization,
AVG(extremism) as extremism,
AVG(ableism) as ableism
FROM toxicity_scores
""")).fetchone()
if not result:
return {cat: 0.0 for cat in TOXICITY_CATEGORIES}
return {cat: float(result[i]) if result[i] else 0.0 for i, cat in enumerate(TOXICITY_CATEGORIES)}
def get_recent_analysis_runs(session: Session, limit: int = 5) -> list[dict]:
"""Get recent analysis runs."""
from sqlalchemy import text
result = session.execute(text("""
SELECT id, started_at, finished_at, status, statuses_scored, errors, cost_usd, duration_secs
FROM analysis_runs
ORDER BY started_at DESC
LIMIT :limit
"""), {"limit": limit})
return [
{
"id": r[0],
"started_at": r[1],
"finished_at": r[2],
"status": r[3],
"statuses_scored": r[4],
"errors": r[5],
"cost_usd": r[6],
"duration_secs": r[7]
}
for r in result
]
def get_flagged_content(
session: Session,
category: str = None,
account_id: int = None,
threshold: float = 0.5,
review_status: str = None,
date_from: str = None,
date_to: str = None,
sort: str = "overall",
direction: str = "desc",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get flagged content with filters."""
from sqlalchemy import text
# Build WHERE clauses
where_clauses = ["ts.flagged = true"]
params = {"threshold": threshold, "limit": limit, "offset": offset}
if category:
where_clauses.append(f"ts.{category} >= :threshold")
params["category_threshold"] = threshold
if account_id:
where_clauses.append("s.account_db_id = :account_id")
params["account_id"] = account_id
if review_status:
if review_status == "unreviewed":
where_clauses.append("ts.human_reviewed = false")
else:
where_clauses.append("ts.review_status = :review_status")
params["review_status"] = review_status
if date_from:
where_clauses.append("s.created_at >= :date_from")
params["date_from"] = date_from
if date_to:
where_clauses.append("s.created_at <= :date_to")
params["date_to"] = date_to
where_sql = " AND ".join(where_clauses)
# Valid sort columns
valid_sorts = ["overall", "created_at", "toxic", "threat", "hate_speech", "racism",
"antisemitism", "islamophobia", "sexism", "homophobia", "insult",
"dehumanization", "extremism", "ableism"]
if sort not in valid_sorts:
sort = "overall"
direction = "DESC" if direction == "desc" else "ASC"
# Get total count
count_query = f"""
SELECT COUNT(*)
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE {where_sql}
"""
total = session.execute(text(count_query), params).scalar() or 0
# Get items with details
query = f"""
SELECT
s.id,
s.status_id,
s.content,
s.text_content,
s.created_at,
s.url,
s.status_type,
ma.username,
ma.instance,
ts.overall,
ts.toxic, ts.threat, ts.hate_speech, ts.racism,
ts.antisemitism, ts.islamophobia, ts.sexism, ts.homophobia,
ts.insult, ts.dehumanization, ts.extremism, ts.ableism,
ts.human_reviewed,
ts.review_status,
ts.reviewed_at
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
JOIN monitored_accounts ma ON ma.id = s.account_db_id
WHERE {where_sql}
ORDER BY ts.{sort} {direction}, s.created_at DESC
LIMIT :limit OFFSET :offset
"""
result = session.execute(text(query), params)
items = []
for r in result:
# Find top category
scores = {
"toxic": r[10], "threat": r[11], "hate_speech": r[12], "racism": r[13],
"antisemitism": r[14], "islamophobia": r[15], "sexism": r[16], "homophobia": r[17],
"insult": r[18], "dehumanization": r[19], "extremism": r[20], "ableism": r[21]
}
top_category = max(scores, key=scores.get) if any(scores.values()) else None
items.append({
"id": r[0],
"status_id": r[1],
"content": r[2],
"text_content": r[3],
"created_at": r[4],
"url": r[5],
"status_type": r[6],
"author_username": r[7],
"author_instance": r[8],
"author_handle": f"@{r[7]}@{r[8]}",
"overall": float(r[9]),
"top_category": top_category,
"scores": scores,
"human_reviewed": r[22],
"review_status": r[23],
"reviewed_at": r[24],
})
return items, total
def get_accounts_for_select(session: Session) -> list[dict]:
"""Get all monitored accounts for dropdowns."""
accounts = session.query(MonitoredAccount.id, MonitoredAccount.username, MonitoredAccount.instance).all()
return [{"id": a[0], "handle": f"@{a[1]}@{a[2]}"} for a in accounts]

1
app/analyzer/__init__.py Normal file
View file

@ -0,0 +1 @@
"""Toxicity analysis module for Mastodon collector."""

6
app/analyzer/__main__.py Normal file
View file

@ -0,0 +1,6 @@
"""Entry point for running the analyzer as a module."""
from .analyzer import main
if __name__ == "__main__":
main()

194
app/analyzer/analyzer.py Normal file
View file

@ -0,0 +1,194 @@
"""Main toxicity analysis orchestrator.
Runs as a one-shot batch process: fetches unscored statuses,
classifies them in batches with GPT-4o-mini, and stores scores in PostgreSQL.
Usage:
python -m app.analyzer
ANALYZER_LIMIT=100 python -m app.analyzer # test on 100 statuses
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import time
from .classifier import ToxicityClassifier
from .config import AnalyzerConfig
from .db import AnalyzerDB
logger = logging.getLogger("analyzer")
def make_batches(items: list, batch_size: int) -> list[list]:
"""Split a flat list into sublists of at most batch_size."""
return [items[i : i + batch_size] for i in range(0, len(items), batch_size)]
async def classify_statuses(
classifier: ToxicityClassifier,
db: AnalyzerDB,
statuses: list[dict],
config: AnalyzerConfig,
) -> tuple[int, int, float]:
"""Classify statuses in batches, with concurrency control.
Returns (scored_count, error_count, cost_usd).
"""
semaphore = asyncio.Semaphore(config.concurrency)
scored = 0
errors = 0
total_input_tokens = 0
total_output_tokens = 0
batches = make_batches(statuses, config.batch_size)
logger.info(" Split %d statuses into %d batches of ≤%d",
len(statuses), len(batches), config.batch_size)
async def process_batch(batch: list[dict]) -> None:
nonlocal scored, errors, total_input_tokens, total_output_tokens
async with semaphore:
texts = [s["content"] for s in batch]
try:
results = await classifier.classify_batch(texts)
for status, scores in zip(batch, results):
try:
score_dict = scores.to_dict()
flagged = scores.is_flagged(config.flag_threshold)
await db.store_status_score(
status_id=status["id"],
scores=score_dict,
flagged=flagged,
model=config.model,
)
total_input_tokens += scores.input_tokens
total_output_tokens += scores.output_tokens
scored += 1
except Exception:
errors += 1
logger.exception(
"Failed to store score for status %d", status["id"]
)
if scored % 100 < config.batch_size:
logger.info(" Statuses scored: %d / %d", scored, len(statuses))
except Exception:
# Whole batch failed (API error after retries) — count all as errors
errors += len(batch)
logger.exception(
"Failed to classify batch of %d statuses", len(batch)
)
tasks = [process_batch(b) for b in batches]
await asyncio.gather(*tasks)
cost = (
total_input_tokens * config.input_cost_per_m / 1_000_000
+ total_output_tokens * config.output_cost_per_m / 1_000_000
)
return scored, errors, cost
async def run() -> None:
config = AnalyzerConfig.from_env()
logging.basicConfig(
level=getattr(logging, config.log_level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# Also log to file
log_dir = "/app/logs"
if os.path.isdir(log_dir):
fh = logging.FileHandler(os.path.join(log_dir, "analyzer.log"))
fh.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
logging.getLogger().addHandler(fh)
logger.info("=" * 60)
logger.info("Toxicity Analyzer starting (model: %s, concurrency: %d, batch_size: %d)",
config.model, config.concurrency, config.batch_size)
db = AnalyzerDB(config.database_url)
classifier = ToxicityClassifier(
api_key=config.openai_api_key,
model=config.model,
)
try:
await db.connect()
await db.apply_migration()
# Start analysis run
run_id = await db.start_analysis_run(model=config.model)
start_time = time.time()
# Fetch unscored items
limit = config.limit if config.limit > 0 else 0
statuses = await db.get_unscored_statuses(limit=limit)
logger.info("Found %d unscored statuses", len(statuses))
if not statuses:
logger.info("Nothing to score — exiting.")
await db.finish_analysis_run(
run_id, status="completed",
statuses_scored=0, errors=0, cost_usd=0.0,
)
return
total_cost = 0.0
total_errors = 0
# Classify statuses
logger.info("Classifying %d statuses in batches of %d...",
len(statuses), config.batch_size)
s_scored, s_errors, s_cost = await classify_statuses(
classifier, db, statuses, config,
)
logger.info(" Statuses done: %d scored, %d errors, $%.4f",
s_scored, s_errors, s_cost)
total_cost += s_cost
total_errors += s_errors
# Finalize run
duration = time.time() - start_time
status = "completed" if total_errors == 0 else "partial"
await db.finish_analysis_run(
run_id,
status=status,
statuses_scored=s_scored,
errors=total_errors,
cost_usd=total_cost,
)
logger.info("=" * 60)
logger.info("Analysis complete — status: %s", status)
logger.info(" Statuses scored: %d, Errors: %d",
s_scored, total_errors)
logger.info(" Estimated cost: $%.4f", total_cost)
logger.info(" Duration: %.1f seconds", duration)
except Exception:
logger.exception("Analyzer crashed")
raise
finally:
await classifier.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()

287
app/analyzer/classifier.py Normal file
View file

@ -0,0 +1,287 @@
"""OpenAI-powered toxicity classifier.
Sends batches of Mastodon statuses to GPT-4o-mini for multi-category toxicity scoring.
Returns a list of dicts of category score (0.01.0).
"""
from __future__ import annotations
import asyncio
import json
import logging
import random
from dataclasses import dataclass
from openai import AsyncOpenAI, RateLimitError, APITimeoutError, APIConnectionError
logger = logging.getLogger("analyzer.classifier")
# The 12 toxicity categories we score
CATEGORIES = [
"toxic",
"threat",
"hate_speech",
"racism",
"antisemitism",
"islamophobia",
"sexism",
"homophobia",
"insult",
"dehumanization",
"extremism",
"ableism",
]
# System prompt for batch classification — multiple numbered statuses per call
SYSTEM_PROMPT = """\
You are a toxicity classifier for Dutch and English social media posts about politics.
You will receive one or more numbered posts. Score EACH post on every category from 0.0 (none) to 1.0 (extreme).
Respond ONLY with a JSON object mapping post numbers (as strings) to their scores. No other text.
Categories:
- toxic: rude, disrespectful, or aggressive language
- threat: threats of violence, harm, intimidation, or calls to action against a person
- hate_speech: targeting people based on any protected characteristic (race, religion, gender, sexual orientation, disability, nationality)
- racism: specifically targeting race or ethnicity (e.g. anti-Black, anti-Asian, anti-Moroccan sentiment, "Zwarte Piet" debates when derogatory)
- antisemitism: targeting Jewish people, Holocaust denial or minimization, Jewish conspiracy theories, coded language like "globalists", "Rothschilds", triple parentheses
- islamophobia: anti-Muslim hate, mosque opposition framed as hate, "Islam is not a religion" rhetoric, "takeover/omvolking" narratives, halal/hijab targeting
- sexism: gender-based discrimination, harassment, misogyny, or misandry
- homophobia: targeting sexual orientation or gender identity, anti-LGBTQ+ rhetoric
- insult: personal attacks, name-calling, belittling
- dehumanization: comparing people to animals, vermin, disease, parasites, or other dehumanizing language
- extremism: far-right or far-left extremist rhetoric, Nazi symbolism or glorification, white supremacist language, Great Replacement theory ("omvolkingstheorie"), calls for political violence, fascist/authoritarian glorification
- ableism: targeting people with disabilities, using mental health conditions as insults (e.g. "gestoord", "autist" as slur, "mongool")
Important context:
- Many posts are in Dutch. Handle Dutch slang, insults, and coded political language.
- Dutch-specific coded terms: "gelukszoekers", "kutmarokkanen", "omvolking", "landverrader", "volksverrader", "linkse ratten", "wappie", "tokkie" score appropriately based on context.
- Political disagreement and criticism are NOT toxic only score actual hostility, hate, or threats.
- Satire and parody accounts may use irony consider context but still score the literal content.
- A score of 0.0 means the category is completely absent. A score of 1.0 means extreme/explicit.
- Most posts will score 0.0 on most categories. Only flag genuine toxicity.
Example for 2 posts:
{"1":{"toxic":0.0,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.0,"dehumanization":0.0,"extremism":0.0,"ableism":0.0},"2":{"toxic":0.3,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.2,"dehumanization":0.0,"extremism":0.0,"ableism":0.0}}"""
@dataclass
class ToxicityScores:
"""Classification result for a single status."""
toxic: float = 0.0
threat: float = 0.0
hate_speech: float = 0.0
racism: float = 0.0
antisemitism: float = 0.0
islamophobia: float = 0.0
sexism: float = 0.0
homophobia: float = 0.0
insult: float = 0.0
dehumanization: float = 0.0
extremism: float = 0.0
ableism: float = 0.0
@property
def overall(self) -> float:
"""Overall toxicity = max of all categories."""
return max(
self.toxic,
self.threat,
self.hate_speech,
self.racism,
self.antisemitism,
self.islamophobia,
self.sexism,
self.homophobia,
self.insult,
self.dehumanization,
self.extremism,
self.ableism,
)
def is_flagged(self, threshold: float = 0.5) -> bool:
return self.overall >= threshold
def to_dict(self) -> dict:
return {
"toxic": self.toxic,
"threat": self.threat,
"hate_speech": self.hate_speech,
"racism": self.racism,
"antisemitism": self.antisemitism,
"islamophobia": self.islamophobia,
"sexism": self.sexism,
"homophobia": self.homophobia,
"insult": self.insult,
"dehumanization": self.dehumanization,
"extremism": self.extremism,
"ableism": self.ableism,
"overall": self.overall,
}
# Approximate token counts for cost tracking
input_tokens: int = 0
output_tokens: int = 0
def parse_scores(raw: str) -> ToxicityScores:
"""Parse the JSON scores for a single status into ToxicityScores."""
try:
data = json.loads(raw) if isinstance(raw, str) else raw
except json.JSONDecodeError:
logger.warning("Failed to parse JSON response: %s", str(raw)[:200])
return ToxicityScores()
def clamp(val) -> float:
try:
f = float(val)
return max(0.0, min(1.0, f))
except (TypeError, ValueError):
return 0.0
return ToxicityScores(
toxic=clamp(data.get("toxic")),
threat=clamp(data.get("threat")),
hate_speech=clamp(data.get("hate_speech")),
racism=clamp(data.get("racism")),
antisemitism=clamp(data.get("antisemitism")),
islamophobia=clamp(data.get("islamophobia")),
sexism=clamp(data.get("sexism")),
homophobia=clamp(data.get("homophobia")),
insult=clamp(data.get("insult")),
dehumanization=clamp(data.get("dehumanization")),
extremism=clamp(data.get("extremism")),
ableism=clamp(data.get("ableism")),
)
def parse_batch_response(raw: str, batch_size: int) -> list[ToxicityScores]:
"""Parse a batched JSON response into a list of ToxicityScores.
Expected format: {"1": {...scores...}, "2": {...scores...}, ...}
Returns a list of ToxicityScores in the same order as the input batch.
"""
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.warning("Failed to parse batch JSON: %s", raw[:300])
return [ToxicityScores() for _ in range(batch_size)]
results = []
for i in range(1, batch_size + 1):
key = str(i)
if key in data and isinstance(data[key], dict):
results.append(parse_scores(data[key]))
else:
logger.warning("Missing scores for status %d in batch response", i)
results.append(ToxicityScores())
return results
class ToxicityClassifier:
"""Async OpenAI-based toxicity classifier with batch support."""
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
async def classify_batch(
self, texts: list[str], max_retries: int = 5
) -> list[ToxicityScores]:
"""Classify multiple statuses in a single API call.
Args:
texts: List of status texts to classify (1batch_size items).
max_retries: Number of retries on rate limit / transient errors.
Returns:
List of ToxicityScores, one per input text, in the same order.
"""
if not texts:
return []
# Handle single-item batches efficiently
batch_size = len(texts)
# Build the numbered user message
parts = []
for i, text in enumerate(texts, 1):
# Truncate very long statuses
t = text.strip() if text else ""
if len(t) > 2000:
t = t[:2000]
if not t:
t = "(empty)"
parts.append(f"[{i}] {t}")
user_message = "\n\n".join(parts)
# Scale max_tokens by batch size.
# Each status's JSON scores ≈ 60 tokens compact, but the model often
# outputs formatted JSON (whitespace/newlines) which can double that.
# Use a generous budget to avoid truncation.
max_tokens = max(300, batch_size * 200)
last_err = None
for attempt in range(max_retries):
try:
response = await self.client.chat.completions.create(
model=self.model,
temperature=0,
max_tokens=max_tokens,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message},
],
)
content = response.choices[0].message.content or "{}"
results = parse_batch_response(content, batch_size)
# Distribute token usage evenly for cost tracking
if response.usage:
per_status_input = response.usage.prompt_tokens // batch_size
per_status_output = response.usage.completion_tokens // batch_size
for scores in results:
scores.input_tokens = per_status_input
scores.output_tokens = per_status_output
return results
except RateLimitError as e:
last_err = e
wait = min(2 ** attempt + random.uniform(0.5, 1.5), 30)
logger.debug(
"Rate limited (attempt %d/%d), waiting %.1fs",
attempt + 1, max_retries, wait,
)
await asyncio.sleep(wait)
except (APITimeoutError, APIConnectionError) as e:
last_err = e
wait = 2 ** attempt + random.uniform(0, 1)
logger.debug(
"Transient error (attempt %d/%d), retrying in %.1fs: %s",
attempt + 1, max_retries, wait, e,
)
await asyncio.sleep(wait)
except Exception:
logger.exception(
"Batch classification API call failed (%d statuses)", batch_size
)
raise
# All retries exhausted
logger.error("Rate limit retries exhausted for batch of %d statuses", batch_size)
raise last_err
async def classify(self, text: str, max_retries: int = 5) -> ToxicityScores:
"""Classify a single status (convenience wrapper around classify_batch)."""
results = await self.classify_batch([text], max_retries=max_retries)
return results[0]
async def close(self):
await self.client.close()

38
app/analyzer/config.py Normal file
View file

@ -0,0 +1,38 @@
"""Configuration for the toxicity analyzer."""
from __future__ import annotations
import os
from dataclasses import dataclass
@dataclass
class AnalyzerConfig:
"""Configuration for the toxicity analyzer."""
database_url: str
openai_api_key: str
model: str = "gpt-4o-mini"
batch_size: int = 10
concurrency: int = 5
flag_threshold: float = 0.5
limit: int = 0 # 0 = no limit
log_level: str = "INFO"
# Pricing (as of 2025, per million tokens)
input_cost_per_m: float = 0.150 # $0.150 per 1M input tokens
output_cost_per_m: float = 0.600 # $0.600 per 1M output tokens
@classmethod
def from_env(cls) -> AnalyzerConfig:
"""Load configuration from environment variables."""
return cls(
database_url=os.environ["DATABASE_URL"],
openai_api_key=os.environ["OPENAI_API_KEY"],
model=os.getenv("ANALYZER_MODEL", "gpt-4o-mini"),
batch_size=int(os.getenv("ANALYZER_BATCH_SIZE", "10")),
concurrency=int(os.getenv("ANALYZER_CONCURRENCY", "5")),
flag_threshold=float(os.getenv("ANALYZER_FLAG_THRESHOLD", "0.5")),
limit=int(os.getenv("ANALYZER_LIMIT", "0")),
log_level=os.getenv("ANALYZER_LOG_LEVEL", "INFO"),
)

145
app/analyzer/db.py Normal file
View file

@ -0,0 +1,145 @@
"""Async database layer for the toxicity analyzer.
Handles fetching unscored statuses and storing classification results.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
import asyncpg
logger = logging.getLogger("analyzer.db")
MIGRATION_FILE = Path(__file__).parent.parent.parent / "scripts" / "02-toxicity.sql"
class AnalyzerDB:
"""Async PostgreSQL operations for the analyzer."""
def __init__(self, dsn: str):
self._dsn = dsn
self._pool: asyncpg.Pool | None = None
async def connect(self) -> None:
self._pool = await asyncpg.create_pool(self._dsn, min_size=2, max_size=10)
logger.info("Database connected")
async def close(self) -> None:
if self._pool:
await self._pool.close()
async def apply_migration(self) -> None:
"""Apply the toxicity schema migration if tables don't exist."""
async with self._pool.acquire() as conn:
# Check if toxicity_scores table exists
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'toxicity_scores'
)
""")
if not exists:
logger.info("Applying toxicity schema migration...")
sql = MIGRATION_FILE.read_text()
await conn.execute(sql)
logger.info("Migration applied successfully")
else:
logger.debug("Toxicity tables already exist")
# ── Fetch unscored items ─────────────────────────────────────────────
async def get_unscored_statuses(self, limit: int = 0) -> list[dict]:
"""Get statuses that haven't been scored yet.
Skips boosts (reblogs) and statuses with empty content.
"""
query = """
SELECT s.id, s.content, s.account_id
FROM statuses s
LEFT JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE ts.status_id IS NULL
AND s.reblog_of_id IS NULL
AND s.content IS NOT NULL
AND s.content != ''
ORDER BY s.created_at DESC
"""
if limit > 0:
query += f" LIMIT {limit}"
async with self._pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
# ── Store scores ─────────────────────────────────────────────────────
async def store_status_score(
self,
status_id: int,
scores: dict,
flagged: bool,
model: str,
) -> None:
"""Insert a toxicity score for a status."""
async with self._pool.acquire() as conn:
await conn.execute("""
INSERT INTO toxicity_scores
(status_id, overall, toxic, threat, hate_speech, racism,
antisemitism, islamophobia,
sexism, homophobia, insult, dehumanization,
extremism, ableism, flagged, model)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (status_id) DO NOTHING
""",
status_id,
scores["overall"],
scores["toxic"],
scores["threat"],
scores["hate_speech"],
scores["racism"],
scores["antisemitism"],
scores["islamophobia"],
scores["sexism"],
scores["homophobia"],
scores["insult"],
scores["dehumanization"],
scores["extremism"],
scores["ableism"],
flagged,
model,
)
# ── Analysis run tracking ────────────────────────────────────────────
async def start_analysis_run(self, model: str) -> int:
"""Create a new analysis run record. Returns run ID."""
async with self._pool.acquire() as conn:
return await conn.fetchval("""
INSERT INTO analysis_runs (model) VALUES ($1)
RETURNING id
""", model)
async def finish_analysis_run(
self,
run_id: int,
status: str,
statuses_scored: int,
errors: int,
cost_usd: float,
) -> None:
"""Finalize an analysis run with results."""
async with self._pool.acquire() as conn:
await conn.execute("""
UPDATE analysis_runs
SET finished_at = now(),
status = $2,
statuses_scored = $3,
errors = $4,
cost_usd = $5,
duration_secs = EXTRACT(EPOCH FROM (now() - started_at))
WHERE id = $1
""",
run_id, status, statuses_scored, errors, cost_usd,
)

View file

@ -379,5 +379,142 @@ def export_csv():
session.close()
@app.route("/analysis")
def analysis_dashboard():
"""Toxicity analysis dashboard."""
from app.analysis_helpers import (
get_analysis_stats,
get_toxicity_trend,
get_category_averages,
get_recent_analysis_runs,
TOXICITY_CATEGORIES,
)
import json
session = get_session()
try:
stats = get_analysis_stats(session)
trend = get_toxicity_trend(session, weeks=12)
categories = get_category_averages(session)
runs = get_recent_analysis_runs(session, limit=5)
# Prepare chart data
trend_json = json.dumps([
{
"week": r["week"].strftime("%Y-%m-%d") if r["week"] else "",
"avg_toxicity": round(float(r["avg_toxicity"]), 4),
"flagged_statuses": int(r["flagged_statuses"]),
}
for r in trend
])
categories_json = json.dumps({k: round(float(v), 4) for k, v in categories.items()})
return render_template(
"analysis.html",
stats=stats,
trend_json=trend_json,
categories_json=categories_json,
categories=TOXICITY_CATEGORIES,
runs=runs,
)
finally:
session.close()
@app.route("/analysis/flagged")
def analysis_flagged():
"""View flagged content."""
from app.analysis_helpers import (
get_flagged_content,
get_accounts_for_select,
TOXICITY_CATEGORIES,
)
session = get_session()
try:
category = request.args.get("category") or None
account_id = request.args.get("account_id", type=int) or None
threshold = request.args.get("threshold", 0.5, type=float)
review_status = request.args.get("review_status") or None
date_from = request.args.get("date_from") or None
date_to = request.args.get("date_to") or None
sort = request.args.get("sort", "overall")
direction = request.args.get("dir", "desc")
page = max(1, request.args.get("page", 1, type=int))
per_page = 50
items, total = get_flagged_content(
session,
category=category,
account_id=account_id,
threshold=threshold,
review_status=review_status,
date_from=date_from,
date_to=date_to,
sort=sort,
direction=direction,
limit=per_page,
offset=(page - 1) * per_page,
)
total_pages = max(1, (total + per_page - 1) // per_page)
accounts = get_accounts_for_select(session)
return render_template(
"flagged.html",
items=items,
total=total,
page=page,
total_pages=total_pages,
accounts=accounts,
categories=TOXICITY_CATEGORIES,
category=category or "",
account_id=account_id or "",
threshold=threshold,
review_status=review_status or "",
date_from=date_from or "",
date_to=date_to or "",
sort=sort,
direction=direction,
)
finally:
session.close()
@app.route("/api/review/submit", methods=["POST"])
def api_review_submit():
"""Submit a human review for a flagged status."""
from sqlalchemy import text
data = request.get_json()
status_id = data.get("status_id")
review_status = data.get("review_status")
if not all([status_id, review_status]):
return jsonify({"error": "Missing required fields"}), 400
if review_status not in ["correct", "incorrect", "unsure"]:
return jsonify({"error": "Invalid review_status"}), 400
session = get_session()
try:
session.execute(text("""
UPDATE toxicity_scores
SET human_reviewed = true,
review_status = :review_status,
reviewed_at = NOW()
WHERE status_id = :status_id
"""), {"review_status": review_status, "status_id": status_id})
session.commit()
return jsonify({"success": True, "message": "Review submitted"}), 200
except Exception as e:
session.rollback()
logger.error(f"Failed to submit review: {e}")
return jsonify({"error": str(e)}), 500
finally:
session.close()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)

View file

@ -4,3 +4,6 @@ psycopg2-binary==2.9.10
sqlalchemy==2.0.36
requests==2.32.3
apscheduler==3.10.4
beautifulsoup4==4.12.3
openai==1.58.1
asyncpg==0.30.0

44
scripts/02-toxicity.sql Normal file
View file

@ -0,0 +1,44 @@
-- Toxicity Analysis Schema
-- Stores per-status toxicity scores from LLM classification.
-- Toxicity scores for statuses
CREATE TABLE IF NOT EXISTS toxicity_scores (
status_id BIGINT PRIMARY KEY REFERENCES statuses(id) ON DELETE CASCADE,
overall REAL NOT NULL,
toxic REAL NOT NULL DEFAULT 0,
threat REAL NOT NULL DEFAULT 0,
hate_speech REAL NOT NULL DEFAULT 0,
racism REAL NOT NULL DEFAULT 0,
antisemitism REAL NOT NULL DEFAULT 0,
islamophobia REAL NOT NULL DEFAULT 0,
sexism REAL NOT NULL DEFAULT 0,
homophobia REAL NOT NULL DEFAULT 0,
insult REAL NOT NULL DEFAULT 0,
dehumanization REAL NOT NULL DEFAULT 0,
extremism REAL NOT NULL DEFAULT 0,
ableism REAL NOT NULL DEFAULT 0,
flagged BOOLEAN NOT NULL DEFAULT false,
model TEXT NOT NULL DEFAULT 'gpt-4o-mini',
scored_at TIMESTAMPTZ NOT NULL DEFAULT now(),
human_reviewed BOOLEAN NOT NULL DEFAULT false,
review_status TEXT, -- 'correct', 'incorrect', 'unsure'
reviewed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_tox_flagged ON toxicity_scores (flagged) WHERE flagged = true;
CREATE INDEX IF NOT EXISTS idx_tox_overall ON toxicity_scores (overall DESC);
CREATE INDEX IF NOT EXISTS idx_tox_scored ON toxicity_scores (scored_at DESC);
CREATE INDEX IF NOT EXISTS idx_tox_reviewed ON toxicity_scores (human_reviewed, review_status);
-- Analysis run audit trail
CREATE TABLE IF NOT EXISTS analysis_runs (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'running', -- running | completed | failed | partial
statuses_scored INTEGER NOT NULL DEFAULT 0,
errors INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL,
cost_usd NUMERIC(10,6) DEFAULT 0,
duration_secs NUMERIC
);