mastodon-collector/app/analysis_helpers.py
Pieter 2faf6c660b Complete toxicity analysis implementation with manual review
- Fixed review submission bug (item_id now uses internal database ID)
- Added comprehensive logging to review API endpoint
- Updated analysis report for Jan 1 - Mar 30, 2026 period
- Report includes all 44 manually reviewed posts
- 4 confirmed toxic, 40 false positives (90.9% FP rate)
- Improved table layout: reduced column widths, smaller text
- Fixed horizontal scrolling with max-width override
- All flagged posts now successfully reviewed and stored

Key findings:
- 7,506 posts collected, 3,938 analyzed
- Only 0.10% confirmed toxic (4 of 3,938)
- High false positive rate shows challenge of automated detection
- Most FPs were legitimate political discourse about extremism

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-31 17:50:23 +02:00

261 lines
9.6 KiB
Python

"""Database query helpers for toxicity analysis views."""
from sqlalchemy import func, desc, and_, or_, cast, Float
from sqlalchemy.orm import Session
from app.db import Status, MonitoredAccount
from datetime import datetime, timedelta
# Toxicity categories for display
TOXICITY_CATEGORIES = [
"toxic", "threat", "hate_speech", "racism",
"antisemitism", "islamophobia", "sexism", "homophobia",
"insult", "dehumanization", "extremism", "ableism"
]
def get_analysis_stats(session: Session) -> dict:
"""Get overall toxicity analysis statistics."""
from sqlalchemy import text
# Total statuses by type
total_posts = session.query(func.count(Status.id)).filter(Status.status_type == 'post').scalar() or 0
total_replies = session.query(func.count(Status.id)).filter(Status.status_type == 'reply').scalar() or 0
total_mentions = session.query(func.count(Status.id)).filter(Status.status_type == 'mention').scalar() or 0
# Scored statuses by type
scored_stats = session.execute(text("""
SELECT
COUNT(*) FILTER (WHERE s.status_type = 'post') as scored_posts,
COUNT(*) FILTER (WHERE s.status_type = 'reply') as scored_replies,
COUNT(*) FILTER (WHERE s.status_type = 'mention') as scored_mentions,
COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'post') as flagged_posts,
COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'reply') as flagged_replies,
COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'mention') as flagged_mentions,
AVG(ts.overall) FILTER (WHERE s.status_type = 'post') as avg_toxicity_posts,
AVG(ts.overall) FILTER (WHERE s.status_type = 'reply') as avg_toxicity_replies,
AVG(ts.overall) FILTER (WHERE s.status_type = 'mention') as avg_toxicity_mentions
FROM toxicity_scores ts
JOIN statuses s ON s.id = ts.status_id
""")).fetchone()
return {
"total_posts": total_posts,
"total_replies": total_replies,
"total_mentions": total_mentions,
"total_scored_posts": scored_stats[0] if scored_stats else 0,
"total_scored_replies": scored_stats[1] if scored_stats else 0,
"total_scored_mentions": scored_stats[2] if scored_stats else 0,
"flagged_posts": scored_stats[3] if scored_stats else 0,
"flagged_replies": scored_stats[4] if scored_stats else 0,
"flagged_mentions": scored_stats[5] if scored_stats else 0,
"avg_toxicity_posts": float(scored_stats[6]) if scored_stats and scored_stats[6] else 0.0,
"avg_toxicity_replies": float(scored_stats[7]) if scored_stats and scored_stats[7] else 0.0,
"avg_toxicity_mentions": float(scored_stats[8]) if scored_stats and scored_stats[8] else 0.0,
}
def get_toxicity_trend(session: Session, weeks: int = 12) -> list[dict]:
"""Get toxicity trend over time (weekly aggregates)."""
from sqlalchemy import text
result = session.execute(text(f"""
SELECT
DATE_TRUNC('week', s.created_at) as week,
AVG(ts.overall) as avg_toxicity,
COUNT(*) FILTER (WHERE ts.flagged = true) as flagged_count
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE s.created_at >= NOW() - INTERVAL '{weeks} weeks'
GROUP BY week
ORDER BY week ASC
"""))
return [{"week": r[0], "avg_toxicity": float(r[1]) if r[1] else 0.0, "flagged_posts": r[2], "flagged_mentions": 0} for r in result]
def get_category_averages(session: Session) -> dict:
"""Get average toxicity score for each category."""
from sqlalchemy import text
result = session.execute(text(f"""
SELECT
AVG(toxic) as toxic,
AVG(threat) as threat,
AVG(hate_speech) as hate_speech,
AVG(racism) as racism,
AVG(antisemitism) as antisemitism,
AVG(islamophobia) as islamophobia,
AVG(sexism) as sexism,
AVG(homophobia) as homophobia,
AVG(insult) as insult,
AVG(dehumanization) as dehumanization,
AVG(extremism) as extremism,
AVG(ableism) as ableism
FROM toxicity_scores
""")).fetchone()
if not result:
return {cat: 0.0 for cat in TOXICITY_CATEGORIES}
return {cat: float(result[i]) if result[i] else 0.0 for i, cat in enumerate(TOXICITY_CATEGORIES)}
def get_recent_analysis_runs(session: Session, limit: int = 5) -> list[dict]:
"""Get recent analysis runs."""
from sqlalchemy import text
result = session.execute(text("""
SELECT id, started_at, finished_at, status, statuses_scored, errors, cost_usd, duration_secs
FROM analysis_runs
ORDER BY started_at DESC
LIMIT :limit
"""), {"limit": limit})
return [
{
"id": r[0],
"started_at": r[1],
"finished_at": r[2],
"status": r[3],
"posts_scored": r[4], # Template expects posts_scored
"mentions_scored": 0, # Not tracked separately in Mastodon
"errors": r[5],
"cost_usd": r[6],
"duration_secs": r[7]
}
for r in result
]
def get_flagged_content(
session: Session,
category: str = None,
account_id: int = None,
threshold: float = 0.5,
review_status: str = None,
date_from: str = None,
date_to: str = None,
sort: str = "overall",
direction: str = "desc",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get flagged content with filters."""
from sqlalchemy import text
# Build WHERE clauses
where_clauses = ["ts.flagged = true"]
params = {"threshold": threshold, "limit": limit, "offset": offset}
if category:
where_clauses.append(f"ts.{category} >= :threshold")
params["category_threshold"] = threshold
if account_id:
where_clauses.append("s.account_db_id = :account_id")
params["account_id"] = account_id
if review_status:
if review_status == "unreviewed":
where_clauses.append("ts.human_reviewed = false")
else:
where_clauses.append("ts.review_status = :review_status")
params["review_status"] = review_status
if date_from:
where_clauses.append("s.created_at >= :date_from")
params["date_from"] = date_from
if date_to:
where_clauses.append("s.created_at <= :date_to")
params["date_to"] = date_to
where_sql = " AND ".join(where_clauses)
# Valid sort columns
valid_sorts = ["overall", "created_at", "toxic", "threat", "hate_speech", "racism",
"antisemitism", "islamophobia", "sexism", "homophobia", "insult",
"dehumanization", "extremism", "ableism"]
if sort not in valid_sorts:
sort = "overall"
direction = "DESC" if direction == "desc" else "ASC"
# Get total count
count_query = f"""
SELECT COUNT(*)
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
WHERE {where_sql}
"""
total = session.execute(text(count_query), params).scalar() or 0
# Get items with details
query = f"""
SELECT
s.id,
s.status_id,
s.content,
s.text_content,
s.created_at,
s.url,
s.status_type,
ma.username,
ma.instance,
ts.overall,
ts.toxic, ts.threat, ts.hate_speech, ts.racism,
ts.antisemitism, ts.islamophobia, ts.sexism, ts.homophobia,
ts.insult, ts.dehumanization, ts.extremism, ts.ableism,
ts.human_reviewed,
ts.review_status,
ts.reviewed_at
FROM statuses s
JOIN toxicity_scores ts ON ts.status_id = s.id
JOIN monitored_accounts ma ON ma.id = s.account_db_id
WHERE {where_sql}
ORDER BY ts.{sort} {direction}, s.created_at DESC
LIMIT :limit OFFSET :offset
"""
result = session.execute(text(query), params)
items = []
for r in result:
# Find top category
scores = {
"toxic": r[10], "threat": r[11], "hate_speech": r[12], "racism": r[13],
"antisemitism": r[14], "islamophobia": r[15], "sexism": r[16], "homophobia": r[17],
"insult": r[18], "dehumanization": r[19], "extremism": r[20], "ableism": r[21]
}
top_category = max(scores, key=scores.get) if any(scores.values()) else None
items.append({
"id": r[0],
"status_id": r[1],
"item_id": r[0], # Internal database ID for toxicity_scores FK
"content": r[2],
"text_content": r[3],
"text": r[3] or r[2], # Template compatibility
"created_at": r[4],
"url": r[5],
"status_type": r[6],
"item_type": r[6], # Template compatibility: post, reply, mention
"source_type": "status", # Template compatibility
"author_username": r[7],
"author_instance": r[8],
"author_handle": f"@{r[7]}@{r[8]}",
"overall": float(r[9]),
"top_category": top_category,
"scores": scores,
"human_reviewed": r[22],
"review_status": r[23],
"reviewed_at": r[24],
})
return items, total
def get_accounts_for_select(session: Session) -> list[dict]:
"""Get all monitored accounts for dropdowns."""
accounts = session.query(MonitoredAccount.id, MonitoredAccount.username, MonitoredAccount.instance).all()
return [{"id": a[0], "handle": f"@{a[1]}@{a[2]}"} for a in accounts]