mastodon-collector/app/analyzer/classifier.py

288 lines
11 KiB
Python
Raw Normal View History

"""OpenAI-powered toxicity classifier.
Sends batches of Mastodon statuses to GPT-4o-mini for multi-category toxicity scoring.
Returns a list of dicts of category score (0.01.0).
"""
from __future__ import annotations
import asyncio
import json
import logging
import random
from dataclasses import dataclass
from openai import AsyncOpenAI, RateLimitError, APITimeoutError, APIConnectionError
logger = logging.getLogger("analyzer.classifier")
# The 12 toxicity categories we score
CATEGORIES = [
"toxic",
"threat",
"hate_speech",
"racism",
"antisemitism",
"islamophobia",
"sexism",
"homophobia",
"insult",
"dehumanization",
"extremism",
"ableism",
]
# System prompt for batch classification — multiple numbered statuses per call
SYSTEM_PROMPT = """\
You are a toxicity classifier for Dutch and English social media posts about politics.
You will receive one or more numbered posts. Score EACH post on every category from 0.0 (none) to 1.0 (extreme).
Respond ONLY with a JSON object mapping post numbers (as strings) to their scores. No other text.
Categories:
- toxic: rude, disrespectful, or aggressive language
- threat: threats of violence, harm, intimidation, or calls to action against a person
- hate_speech: targeting people based on any protected characteristic (race, religion, gender, sexual orientation, disability, nationality)
- racism: specifically targeting race or ethnicity (e.g. anti-Black, anti-Asian, anti-Moroccan sentiment, "Zwarte Piet" debates when derogatory)
- antisemitism: targeting Jewish people, Holocaust denial or minimization, Jewish conspiracy theories, coded language like "globalists", "Rothschilds", triple parentheses
- islamophobia: anti-Muslim hate, mosque opposition framed as hate, "Islam is not a religion" rhetoric, "takeover/omvolking" narratives, halal/hijab targeting
- sexism: gender-based discrimination, harassment, misogyny, or misandry
- homophobia: targeting sexual orientation or gender identity, anti-LGBTQ+ rhetoric
- insult: personal attacks, name-calling, belittling
- dehumanization: comparing people to animals, vermin, disease, parasites, or other dehumanizing language
- extremism: far-right or far-left extremist rhetoric, Nazi symbolism or glorification, white supremacist language, Great Replacement theory ("omvolkingstheorie"), calls for political violence, fascist/authoritarian glorification
- ableism: targeting people with disabilities, using mental health conditions as insults (e.g. "gestoord", "autist" as slur, "mongool")
Important context:
- Many posts are in Dutch. Handle Dutch slang, insults, and coded political language.
- Dutch-specific coded terms: "gelukszoekers", "kutmarokkanen", "omvolking", "landverrader", "volksverrader", "linkse ratten", "wappie", "tokkie" score appropriately based on context.
- Political disagreement and criticism are NOT toxic only score actual hostility, hate, or threats.
- Satire and parody accounts may use irony consider context but still score the literal content.
- A score of 0.0 means the category is completely absent. A score of 1.0 means extreme/explicit.
- Most posts will score 0.0 on most categories. Only flag genuine toxicity.
Example for 2 posts:
{"1":{"toxic":0.0,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.0,"dehumanization":0.0,"extremism":0.0,"ableism":0.0},"2":{"toxic":0.3,"threat":0.0,"hate_speech":0.0,"racism":0.0,"antisemitism":0.0,"islamophobia":0.0,"sexism":0.0,"homophobia":0.0,"insult":0.2,"dehumanization":0.0,"extremism":0.0,"ableism":0.0}}"""
@dataclass
class ToxicityScores:
"""Classification result for a single status."""
toxic: float = 0.0
threat: float = 0.0
hate_speech: float = 0.0
racism: float = 0.0
antisemitism: float = 0.0
islamophobia: float = 0.0
sexism: float = 0.0
homophobia: float = 0.0
insult: float = 0.0
dehumanization: float = 0.0
extremism: float = 0.0
ableism: float = 0.0
@property
def overall(self) -> float:
"""Overall toxicity = max of all categories."""
return max(
self.toxic,
self.threat,
self.hate_speech,
self.racism,
self.antisemitism,
self.islamophobia,
self.sexism,
self.homophobia,
self.insult,
self.dehumanization,
self.extremism,
self.ableism,
)
def is_flagged(self, threshold: float = 0.5) -> bool:
return self.overall >= threshold
def to_dict(self) -> dict:
return {
"toxic": self.toxic,
"threat": self.threat,
"hate_speech": self.hate_speech,
"racism": self.racism,
"antisemitism": self.antisemitism,
"islamophobia": self.islamophobia,
"sexism": self.sexism,
"homophobia": self.homophobia,
"insult": self.insult,
"dehumanization": self.dehumanization,
"extremism": self.extremism,
"ableism": self.ableism,
"overall": self.overall,
}
# Approximate token counts for cost tracking
input_tokens: int = 0
output_tokens: int = 0
def parse_scores(raw: str) -> ToxicityScores:
"""Parse the JSON scores for a single status into ToxicityScores."""
try:
data = json.loads(raw) if isinstance(raw, str) else raw
except json.JSONDecodeError:
logger.warning("Failed to parse JSON response: %s", str(raw)[:200])
return ToxicityScores()
def clamp(val) -> float:
try:
f = float(val)
return max(0.0, min(1.0, f))
except (TypeError, ValueError):
return 0.0
return ToxicityScores(
toxic=clamp(data.get("toxic")),
threat=clamp(data.get("threat")),
hate_speech=clamp(data.get("hate_speech")),
racism=clamp(data.get("racism")),
antisemitism=clamp(data.get("antisemitism")),
islamophobia=clamp(data.get("islamophobia")),
sexism=clamp(data.get("sexism")),
homophobia=clamp(data.get("homophobia")),
insult=clamp(data.get("insult")),
dehumanization=clamp(data.get("dehumanization")),
extremism=clamp(data.get("extremism")),
ableism=clamp(data.get("ableism")),
)
def parse_batch_response(raw: str, batch_size: int) -> list[ToxicityScores]:
"""Parse a batched JSON response into a list of ToxicityScores.
Expected format: {"1": {...scores...}, "2": {...scores...}, ...}
Returns a list of ToxicityScores in the same order as the input batch.
"""
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.warning("Failed to parse batch JSON: %s", raw[:300])
return [ToxicityScores() for _ in range(batch_size)]
results = []
for i in range(1, batch_size + 1):
key = str(i)
if key in data and isinstance(data[key], dict):
results.append(parse_scores(data[key]))
else:
logger.warning("Missing scores for status %d in batch response", i)
results.append(ToxicityScores())
return results
class ToxicityClassifier:
"""Async OpenAI-based toxicity classifier with batch support."""
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
async def classify_batch(
self, texts: list[str], max_retries: int = 5
) -> list[ToxicityScores]:
"""Classify multiple statuses in a single API call.
Args:
texts: List of status texts to classify (1batch_size items).
max_retries: Number of retries on rate limit / transient errors.
Returns:
List of ToxicityScores, one per input text, in the same order.
"""
if not texts:
return []
# Handle single-item batches efficiently
batch_size = len(texts)
# Build the numbered user message
parts = []
for i, text in enumerate(texts, 1):
# Truncate very long statuses
t = text.strip() if text else ""
if len(t) > 2000:
t = t[:2000]
if not t:
t = "(empty)"
parts.append(f"[{i}] {t}")
user_message = "\n\n".join(parts)
# Scale max_tokens by batch size.
# Each status's JSON scores ≈ 60 tokens compact, but the model often
# outputs formatted JSON (whitespace/newlines) which can double that.
# Use a generous budget to avoid truncation.
max_tokens = max(300, batch_size * 200)
last_err = None
for attempt in range(max_retries):
try:
response = await self.client.chat.completions.create(
model=self.model,
temperature=0,
max_tokens=max_tokens,
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message},
],
)
content = response.choices[0].message.content or "{}"
results = parse_batch_response(content, batch_size)
# Distribute token usage evenly for cost tracking
if response.usage:
per_status_input = response.usage.prompt_tokens // batch_size
per_status_output = response.usage.completion_tokens // batch_size
for scores in results:
scores.input_tokens = per_status_input
scores.output_tokens = per_status_output
return results
except RateLimitError as e:
last_err = e
wait = min(2 ** attempt + random.uniform(0.5, 1.5), 30)
logger.debug(
"Rate limited (attempt %d/%d), waiting %.1fs",
attempt + 1, max_retries, wait,
)
await asyncio.sleep(wait)
except (APITimeoutError, APIConnectionError) as e:
last_err = e
wait = 2 ** attempt + random.uniform(0, 1)
logger.debug(
"Transient error (attempt %d/%d), retrying in %.1fs: %s",
attempt + 1, max_retries, wait, e,
)
await asyncio.sleep(wait)
except Exception:
logger.exception(
"Batch classification API call failed (%d statuses)", batch_size
)
raise
# All retries exhausted
logger.error("Rate limit retries exhausted for batch of %d statuses", batch_size)
raise last_err
async def classify(self, text: str, max_retries: int = 5) -> ToxicityScores:
"""Classify a single status (convenience wrapper around classify_batch)."""
results = await self.classify_batch([text], max_retries=max_retries)
return results[0]
async def close(self):
await self.client.close()