Implements comprehensive toxicity analysis following the Bluesky collector architecture: - Analyzer module with async batch processing using GPT-4o-mini - Database schema for toxicity scores and analysis run tracking - 12 toxicity categories (toxic, threat, hate_speech, racism, antisemitism, islamophobia, sexism, homophobia, insult, dehumanization, extremism, ableism) - Web interface routes for analysis dashboard and flagged content review - Manual review API endpoint for human validation - Analysis helper functions for database queries - Dutch language support with coded political term recognition Usage: docker exec mastodon-collector-collector-1 python -m app.analyzer See TOXICITY_ANALYSIS.md for full documentation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
287 lines
11 KiB
Python
287 lines
11 KiB
Python
"""OpenAI-powered toxicity classifier.
|
||
|
||
Sends batches of Mastodon statuses to GPT-4o-mini for multi-category toxicity scoring.
|
||
Returns a list of dicts of category → score (0.0–1.0).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import random
|
||
from dataclasses import dataclass
|
||
|
||
from openai import AsyncOpenAI, RateLimitError, APITimeoutError, APIConnectionError
|
||
|
||
logger = logging.getLogger("analyzer.classifier")
|
||
|
||
# The 12 toxicity categories we score
|
||
CATEGORIES = [
|
||
"toxic",
|
||
"threat",
|
||
"hate_speech",
|
||
"racism",
|
||
"antisemitism",
|
||
"islamophobia",
|
||
"sexism",
|
||
"homophobia",
|
||
"insult",
|
||
"dehumanization",
|
||
"extremism",
|
||
"ableism",
|
||
]
|
||
|
||
# System prompt for batch classification — multiple numbered statuses per call
|
||
SYSTEM_PROMPT = """\
|
||
You are a toxicity classifier for Dutch and English social media posts about politics.
|
||
|
||
You will receive one or more numbered posts. Score EACH post on every category from 0.0 (none) to 1.0 (extreme).
|
||
Respond ONLY with a JSON object mapping post numbers (as strings) to their scores. No other text.
|
||
|
||
Categories:
|
||
- toxic: rude, disrespectful, or aggressive language
|
||
- threat: threats of violence, harm, intimidation, or calls to action against a person
|
||
- hate_speech: targeting people based on any protected characteristic (race, religion, gender, sexual orientation, disability, nationality)
|
||
- racism: specifically targeting race or ethnicity (e.g. anti-Black, anti-Asian, anti-Moroccan sentiment, "Zwarte Piet" debates when derogatory)
|
||
- antisemitism: targeting Jewish people, Holocaust denial or minimization, Jewish conspiracy theories, coded language like "globalists", "Rothschilds", triple parentheses
|
||
- islamophobia: anti-Muslim hate, mosque opposition framed as hate, "Islam is not a religion" rhetoric, "takeover/omvolking" narratives, halal/hijab targeting
|
||
- sexism: gender-based discrimination, harassment, misogyny, or misandry
|
||
- homophobia: targeting sexual orientation or gender identity, anti-LGBTQ+ rhetoric
|
||
- insult: personal attacks, name-calling, belittling
|
||
- dehumanization: comparing people to animals, vermin, disease, parasites, or other dehumanizing language
|
||
- extremism: far-right or far-left extremist rhetoric, Nazi symbolism or glorification, white supremacist language, Great Replacement theory ("omvolkingstheorie"), calls for political violence, fascist/authoritarian glorification
|
||
- ableism: targeting people with disabilities, using mental health conditions as insults (e.g. "gestoord", "autist" as slur, "mongool")
|
||
|
||
Important context:
|
||
- Many posts are in Dutch. Handle Dutch slang, insults, and coded political language.
|
||
- Dutch-specific coded terms: "gelukszoekers", "kutmarokkanen", "omvolking", "landverrader", "volksverrader", "linkse ratten", "wappie", "tokkie" — score appropriately based on context.
|
||
- Political disagreement and criticism are NOT toxic — only score actual hostility, hate, or threats.
|
||
- Satire and parody accounts may use irony — consider context but still score the literal content.
|
||
- A score of 0.0 means the category is completely absent. A score of 1.0 means extreme/explicit.
|
||
- Most posts will score 0.0 on most categories. Only flag genuine toxicity.
|
||
|
||
Example for 2 posts:
|
||
{"1":{"toxic":0.0,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.0,"dehumanization":0.0,"extremism":0.0,"ableism":0.0},"2":{"toxic":0.3,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.2,"dehumanization":0.0,"extremism":0.0,"ableism":0.0}}"""
|
||
|
||
|
||
@dataclass
|
||
class ToxicityScores:
|
||
"""Classification result for a single status."""
|
||
|
||
toxic: float = 0.0
|
||
threat: float = 0.0
|
||
hate_speech: float = 0.0
|
||
racism: float = 0.0
|
||
antisemitism: float = 0.0
|
||
islamophobia: float = 0.0
|
||
sexism: float = 0.0
|
||
homophobia: float = 0.0
|
||
insult: float = 0.0
|
||
dehumanization: float = 0.0
|
||
extremism: float = 0.0
|
||
ableism: float = 0.0
|
||
|
||
@property
|
||
def overall(self) -> float:
|
||
"""Overall toxicity = max of all categories."""
|
||
return max(
|
||
self.toxic,
|
||
self.threat,
|
||
self.hate_speech,
|
||
self.racism,
|
||
self.antisemitism,
|
||
self.islamophobia,
|
||
self.sexism,
|
||
self.homophobia,
|
||
self.insult,
|
||
self.dehumanization,
|
||
self.extremism,
|
||
self.ableism,
|
||
)
|
||
|
||
def is_flagged(self, threshold: float = 0.5) -> bool:
|
||
return self.overall >= threshold
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
"toxic": self.toxic,
|
||
"threat": self.threat,
|
||
"hate_speech": self.hate_speech,
|
||
"racism": self.racism,
|
||
"antisemitism": self.antisemitism,
|
||
"islamophobia": self.islamophobia,
|
||
"sexism": self.sexism,
|
||
"homophobia": self.homophobia,
|
||
"insult": self.insult,
|
||
"dehumanization": self.dehumanization,
|
||
"extremism": self.extremism,
|
||
"ableism": self.ableism,
|
||
"overall": self.overall,
|
||
}
|
||
|
||
# Approximate token counts for cost tracking
|
||
input_tokens: int = 0
|
||
output_tokens: int = 0
|
||
|
||
|
||
def parse_scores(raw: str) -> ToxicityScores:
|
||
"""Parse the JSON scores for a single status into ToxicityScores."""
|
||
try:
|
||
data = json.loads(raw) if isinstance(raw, str) else raw
|
||
except json.JSONDecodeError:
|
||
logger.warning("Failed to parse JSON response: %s", str(raw)[:200])
|
||
return ToxicityScores()
|
||
|
||
def clamp(val) -> float:
|
||
try:
|
||
f = float(val)
|
||
return max(0.0, min(1.0, f))
|
||
except (TypeError, ValueError):
|
||
return 0.0
|
||
|
||
return ToxicityScores(
|
||
toxic=clamp(data.get("toxic")),
|
||
threat=clamp(data.get("threat")),
|
||
hate_speech=clamp(data.get("hate_speech")),
|
||
racism=clamp(data.get("racism")),
|
||
antisemitism=clamp(data.get("antisemitism")),
|
||
islamophobia=clamp(data.get("islamophobia")),
|
||
sexism=clamp(data.get("sexism")),
|
||
homophobia=clamp(data.get("homophobia")),
|
||
insult=clamp(data.get("insult")),
|
||
dehumanization=clamp(data.get("dehumanization")),
|
||
extremism=clamp(data.get("extremism")),
|
||
ableism=clamp(data.get("ableism")),
|
||
)
|
||
|
||
|
||
def parse_batch_response(raw: str, batch_size: int) -> list[ToxicityScores]:
|
||
"""Parse a batched JSON response into a list of ToxicityScores.
|
||
|
||
Expected format: {"1": {...scores...}, "2": {...scores...}, ...}
|
||
Returns a list of ToxicityScores in the same order as the input batch.
|
||
"""
|
||
try:
|
||
data = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
logger.warning("Failed to parse batch JSON: %s", raw[:300])
|
||
return [ToxicityScores() for _ in range(batch_size)]
|
||
|
||
results = []
|
||
for i in range(1, batch_size + 1):
|
||
key = str(i)
|
||
if key in data and isinstance(data[key], dict):
|
||
results.append(parse_scores(data[key]))
|
||
else:
|
||
logger.warning("Missing scores for status %d in batch response", i)
|
||
results.append(ToxicityScores())
|
||
|
||
return results
|
||
|
||
|
||
class ToxicityClassifier:
|
||
"""Async OpenAI-based toxicity classifier with batch support."""
|
||
|
||
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
|
||
self.client = AsyncOpenAI(api_key=api_key)
|
||
self.model = model
|
||
|
||
async def classify_batch(
|
||
self, texts: list[str], max_retries: int = 5
|
||
) -> list[ToxicityScores]:
|
||
"""Classify multiple statuses in a single API call.
|
||
|
||
Args:
|
||
texts: List of status texts to classify (1–batch_size items).
|
||
max_retries: Number of retries on rate limit / transient errors.
|
||
|
||
Returns:
|
||
List of ToxicityScores, one per input text, in the same order.
|
||
"""
|
||
if not texts:
|
||
return []
|
||
|
||
# Handle single-item batches efficiently
|
||
batch_size = len(texts)
|
||
|
||
# Build the numbered user message
|
||
parts = []
|
||
for i, text in enumerate(texts, 1):
|
||
# Truncate very long statuses
|
||
t = text.strip() if text else ""
|
||
if len(t) > 2000:
|
||
t = t[:2000]
|
||
if not t:
|
||
t = "(empty)"
|
||
parts.append(f"[{i}] {t}")
|
||
user_message = "\n\n".join(parts)
|
||
|
||
# Scale max_tokens by batch size.
|
||
# Each status's JSON scores ≈ 60 tokens compact, but the model often
|
||
# outputs formatted JSON (whitespace/newlines) which can double that.
|
||
# Use a generous budget to avoid truncation.
|
||
max_tokens = max(300, batch_size * 200)
|
||
|
||
last_err = None
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = await self.client.chat.completions.create(
|
||
model=self.model,
|
||
temperature=0,
|
||
max_tokens=max_tokens,
|
||
response_format={"type": "json_object"},
|
||
messages=[
|
||
{"role": "system", "content": SYSTEM_PROMPT},
|
||
{"role": "user", "content": user_message},
|
||
],
|
||
)
|
||
|
||
content = response.choices[0].message.content or "{}"
|
||
results = parse_batch_response(content, batch_size)
|
||
|
||
# Distribute token usage evenly for cost tracking
|
||
if response.usage:
|
||
per_status_input = response.usage.prompt_tokens // batch_size
|
||
per_status_output = response.usage.completion_tokens // batch_size
|
||
for scores in results:
|
||
scores.input_tokens = per_status_input
|
||
scores.output_tokens = per_status_output
|
||
|
||
return results
|
||
|
||
except RateLimitError as e:
|
||
last_err = e
|
||
wait = min(2 ** attempt + random.uniform(0.5, 1.5), 30)
|
||
logger.debug(
|
||
"Rate limited (attempt %d/%d), waiting %.1fs",
|
||
attempt + 1, max_retries, wait,
|
||
)
|
||
await asyncio.sleep(wait)
|
||
|
||
except (APITimeoutError, APIConnectionError) as e:
|
||
last_err = e
|
||
wait = 2 ** attempt + random.uniform(0, 1)
|
||
logger.debug(
|
||
"Transient error (attempt %d/%d), retrying in %.1fs: %s",
|
||
attempt + 1, max_retries, wait, e,
|
||
)
|
||
await asyncio.sleep(wait)
|
||
|
||
except Exception:
|
||
logger.exception(
|
||
"Batch classification API call failed (%d statuses)", batch_size
|
||
)
|
||
raise
|
||
|
||
# All retries exhausted
|
||
logger.error("Rate limit retries exhausted for batch of %d statuses", batch_size)
|
||
raise last_err
|
||
|
||
async def classify(self, text: str, max_retries: int = 5) -> ToxicityScores:
|
||
"""Classify a single status (convenience wrapper around classify_batch)."""
|
||
results = await self.classify_batch([text], max_retries=max_retries)
|
||
return results[0]
|
||
|
||
async def close(self):
|
||
await self.client.close()
|