"""Database query helpers for toxicity analysis views.""" from sqlalchemy import func, desc, and_, or_, cast, Float from sqlalchemy.orm import Session from app.db import Status, MonitoredAccount from datetime import datetime, timedelta # Toxicity categories for display TOXICITY_CATEGORIES = [ "toxic", "threat", "hate_speech", "racism", "antisemitism", "islamophobia", "sexism", "homophobia", "insult", "dehumanization", "extremism", "ableism" ] def get_analysis_stats(session: Session) -> dict: """Get overall toxicity analysis statistics.""" from sqlalchemy import text # Total statuses by type total_posts = session.query(func.count(Status.id)).filter(Status.status_type == 'post').scalar() or 0 total_replies = session.query(func.count(Status.id)).filter(Status.status_type == 'reply').scalar() or 0 total_mentions = session.query(func.count(Status.id)).filter(Status.status_type == 'mention').scalar() or 0 # Scored statuses by type scored_stats = session.execute(text(""" SELECT COUNT(*) FILTER (WHERE s.status_type = 'post') as scored_posts, COUNT(*) FILTER (WHERE s.status_type = 'reply') as scored_replies, COUNT(*) FILTER (WHERE s.status_type = 'mention') as scored_mentions, COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'post') as flagged_posts, COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'reply') as flagged_replies, COUNT(*) FILTER (WHERE ts.flagged = true AND s.status_type = 'mention') as flagged_mentions, AVG(ts.overall) FILTER (WHERE s.status_type = 'post') as avg_toxicity_posts, AVG(ts.overall) FILTER (WHERE s.status_type = 'reply') as avg_toxicity_replies, AVG(ts.overall) FILTER (WHERE s.status_type = 'mention') as avg_toxicity_mentions FROM toxicity_scores ts JOIN statuses s ON s.id = ts.status_id """)).fetchone() return { "total_posts": total_posts, "total_replies": total_replies, "total_mentions": total_mentions, "total_scored_posts": scored_stats[0] if scored_stats else 0, "total_scored_replies": scored_stats[1] if scored_stats else 0, "total_scored_mentions": scored_stats[2] if scored_stats else 0, "flagged_posts": scored_stats[3] if scored_stats else 0, "flagged_replies": scored_stats[4] if scored_stats else 0, "flagged_mentions": scored_stats[5] if scored_stats else 0, "avg_toxicity_posts": float(scored_stats[6]) if scored_stats and scored_stats[6] else 0.0, "avg_toxicity_replies": float(scored_stats[7]) if scored_stats and scored_stats[7] else 0.0, "avg_toxicity_mentions": float(scored_stats[8]) if scored_stats and scored_stats[8] else 0.0, } def get_toxicity_trend(session: Session, weeks: int = 12) -> list[dict]: """Get toxicity trend over time (weekly aggregates).""" from sqlalchemy import text result = session.execute(text(f""" SELECT DATE_TRUNC('week', s.created_at) as week, AVG(ts.overall) as avg_toxicity, COUNT(*) FILTER (WHERE ts.flagged = true) as flagged_count FROM statuses s JOIN toxicity_scores ts ON ts.status_id = s.id WHERE s.created_at >= NOW() - INTERVAL '{weeks} weeks' GROUP BY week ORDER BY week ASC """)) return [{"week": r[0], "avg_toxicity": float(r[1]) if r[1] else 0.0, "flagged_posts": r[2], "flagged_mentions": 0} for r in result] def get_category_averages(session: Session) -> dict: """Get average toxicity score for each category.""" from sqlalchemy import text result = session.execute(text(f""" SELECT AVG(toxic) as toxic, AVG(threat) as threat, AVG(hate_speech) as hate_speech, AVG(racism) as racism, AVG(antisemitism) as antisemitism, AVG(islamophobia) as islamophobia, AVG(sexism) as sexism, AVG(homophobia) as homophobia, AVG(insult) as insult, AVG(dehumanization) as dehumanization, AVG(extremism) as extremism, AVG(ableism) as ableism FROM toxicity_scores """)).fetchone() if not result: return {cat: 0.0 for cat in TOXICITY_CATEGORIES} return {cat: float(result[i]) if result[i] else 0.0 for i, cat in enumerate(TOXICITY_CATEGORIES)} def get_recent_analysis_runs(session: Session, limit: int = 5) -> list[dict]: """Get recent analysis runs.""" from sqlalchemy import text result = session.execute(text(""" SELECT id, started_at, finished_at, status, statuses_scored, errors, cost_usd, duration_secs FROM analysis_runs ORDER BY started_at DESC LIMIT :limit """), {"limit": limit}) return [ { "id": r[0], "started_at": r[1], "finished_at": r[2], "status": r[3], "posts_scored": r[4], # Template expects posts_scored "mentions_scored": 0, # Not tracked separately in Mastodon "errors": r[5], "cost_usd": r[6], "duration_secs": r[7] } for r in result ] def get_flagged_content( session: Session, category: str = None, account_id: int = None, threshold: float = 0.5, review_status: str = None, date_from: str = None, date_to: str = None, sort: str = "overall", direction: str = "desc", limit: int = 50, offset: int = 0, ) -> tuple[list[dict], int]: """Get flagged content with filters.""" from sqlalchemy import text # Build WHERE clauses where_clauses = ["ts.flagged = true"] params = {"threshold": threshold, "limit": limit, "offset": offset} if category: where_clauses.append(f"ts.{category} >= :threshold") params["category_threshold"] = threshold if account_id: where_clauses.append("s.account_db_id = :account_id") params["account_id"] = account_id if review_status: if review_status == "unreviewed": where_clauses.append("ts.human_reviewed = false") else: where_clauses.append("ts.review_status = :review_status") params["review_status"] = review_status if date_from: where_clauses.append("s.created_at >= :date_from") params["date_from"] = date_from if date_to: where_clauses.append("s.created_at <= :date_to") params["date_to"] = date_to where_sql = " AND ".join(where_clauses) # Valid sort columns valid_sorts = ["overall", "created_at", "toxic", "threat", "hate_speech", "racism", "antisemitism", "islamophobia", "sexism", "homophobia", "insult", "dehumanization", "extremism", "ableism"] if sort not in valid_sorts: sort = "overall" direction = "DESC" if direction == "desc" else "ASC" # Get total count count_query = f""" SELECT COUNT(*) FROM statuses s JOIN toxicity_scores ts ON ts.status_id = s.id WHERE {where_sql} """ total = session.execute(text(count_query), params).scalar() or 0 # Get items with details query = f""" SELECT s.id, s.status_id, s.content, s.text_content, s.created_at, s.url, s.status_type, ma.username, ma.instance, ts.overall, ts.toxic, ts.threat, ts.hate_speech, ts.racism, ts.antisemitism, ts.islamophobia, ts.sexism, ts.homophobia, ts.insult, ts.dehumanization, ts.extremism, ts.ableism, ts.human_reviewed, ts.review_status, ts.reviewed_at FROM statuses s JOIN toxicity_scores ts ON ts.status_id = s.id JOIN monitored_accounts ma ON ma.id = s.account_db_id WHERE {where_sql} ORDER BY ts.{sort} {direction}, s.created_at DESC LIMIT :limit OFFSET :offset """ result = session.execute(text(query), params) items = [] for r in result: # Find top category scores = { "toxic": r[10], "threat": r[11], "hate_speech": r[12], "racism": r[13], "antisemitism": r[14], "islamophobia": r[15], "sexism": r[16], "homophobia": r[17], "insult": r[18], "dehumanization": r[19], "extremism": r[20], "ableism": r[21] } top_category = max(scores, key=scores.get) if any(scores.values()) else None items.append({ "id": r[0], "status_id": r[1], "item_id": r[0], # Internal database ID for toxicity_scores FK "content": r[2], "text_content": r[3], "text": r[3] or r[2], # Template compatibility "created_at": r[4], "url": r[5], "status_type": r[6], "item_type": r[6], # Template compatibility: post, reply, mention "source_type": "status", # Template compatibility "author_username": r[7], "author_instance": r[8], "author_handle": f"@{r[7]}@{r[8]}", "overall": float(r[9]), "top_category": top_category, "scores": scores, "human_reviewed": r[22], "review_status": r[23], "reviewed_at": r[24], }) return items, total def get_accounts_for_select(session: Session) -> list[dict]: """Get all monitored accounts for dropdowns.""" accounts = session.query(MonitoredAccount.id, MonitoredAccount.username, MonitoredAccount.instance).all() return [{"id": a[0], "handle": f"@{a[1]}@{a[2]}"} for a in accounts]