Add Flask-based application for collecting and archiving Mastodon posts from configured accounts. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
383 lines
13 KiB
Python
383 lines
13 KiB
Python
"""Flask web application for managing monitored accounts and viewing collected data."""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
|
|
from sqlalchemy import func, desc
|
|
|
|
from app.db import (
|
|
init_db,
|
|
get_session,
|
|
MonitoredAccount,
|
|
Status,
|
|
Mention,
|
|
CollectionLog,
|
|
)
|
|
from app.mastodon_api import lookup_account, MastodonAPIError
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-secret-key")
|
|
|
|
# Initialize database on startup
|
|
with app.app_context():
|
|
init_db()
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
"""Dashboard overview."""
|
|
session = get_session()
|
|
try:
|
|
accounts = session.query(MonitoredAccount).order_by(MonitoredAccount.instance, MonitoredAccount.username).all()
|
|
total_statuses = session.query(func.count(Status.id)).scalar() or 0
|
|
total_posts = session.query(func.count(Status.id)).filter(Status.status_type == "post").scalar() or 0
|
|
total_replies = session.query(func.count(Status.id)).filter(Status.status_type == "reply").scalar() or 0
|
|
total_mentions = session.query(func.count(Status.id)).filter(Status.status_type == "mention").scalar() or 0
|
|
total_reblogs = session.query(func.count(Status.id)).filter(Status.status_type == "reblog").scalar() or 0
|
|
|
|
# Per-account stats
|
|
account_stats = []
|
|
for acct in accounts:
|
|
count = session.query(func.count(Status.id)).filter(Status.account_db_id == acct.id).scalar() or 0
|
|
last_log = (
|
|
session.query(CollectionLog)
|
|
.filter_by(account_db_id=acct.id)
|
|
.order_by(desc(CollectionLog.started_at))
|
|
.first()
|
|
)
|
|
account_stats.append({
|
|
"account": acct,
|
|
"status_count": count,
|
|
"last_log": last_log,
|
|
})
|
|
|
|
# Recent collection logs
|
|
recent_logs = (
|
|
session.query(CollectionLog)
|
|
.order_by(desc(CollectionLog.started_at))
|
|
.limit(20)
|
|
.all()
|
|
)
|
|
|
|
return render_template(
|
|
"index.html",
|
|
account_stats=account_stats,
|
|
total_statuses=total_statuses,
|
|
total_posts=total_posts,
|
|
total_replies=total_replies,
|
|
total_mentions=total_mentions,
|
|
total_reblogs=total_reblogs,
|
|
recent_logs=recent_logs,
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/accounts")
|
|
def accounts_list():
|
|
"""List all monitored accounts."""
|
|
session = get_session()
|
|
try:
|
|
accounts = session.query(MonitoredAccount).order_by(MonitoredAccount.instance, MonitoredAccount.username).all()
|
|
return render_template("accounts.html", accounts=accounts)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/accounts/add", methods=["POST"])
|
|
def accounts_add():
|
|
"""Add a new account to monitor."""
|
|
handle = request.form.get("handle", "").strip().lstrip("@")
|
|
if "@" not in handle:
|
|
flash("Invalid handle format. Use @user@instance.social", "error")
|
|
return redirect(url_for("accounts_list"))
|
|
|
|
username, instance = handle.split("@", 1)
|
|
if not username or not instance:
|
|
flash("Invalid handle format. Use @user@instance.social", "error")
|
|
return redirect(url_for("accounts_list"))
|
|
|
|
session = get_session()
|
|
try:
|
|
existing = session.query(MonitoredAccount).filter_by(username=username, instance=instance).first()
|
|
if existing:
|
|
if not existing.is_active:
|
|
existing.is_active = True
|
|
session.commit()
|
|
flash(f"Re-activated {existing.handle}", "success")
|
|
else:
|
|
flash(f"{existing.handle} is already being monitored", "info")
|
|
return redirect(url_for("accounts_list"))
|
|
|
|
# Try to resolve the account first
|
|
try:
|
|
data = lookup_account(instance, username)
|
|
acct = MonitoredAccount(
|
|
username=username,
|
|
instance=instance,
|
|
account_id=data["id"],
|
|
display_name=data.get("display_name", ""),
|
|
avatar_url=data.get("avatar", ""),
|
|
note=data.get("note", ""),
|
|
is_active=True,
|
|
)
|
|
except MastodonAPIError as e:
|
|
logger.warning("Could not resolve account @%s@%s: %s — adding anyway", username, instance, e)
|
|
acct = MonitoredAccount(
|
|
username=username,
|
|
instance=instance,
|
|
is_active=True,
|
|
)
|
|
|
|
session.add(acct)
|
|
session.commit()
|
|
flash(f"Added {acct.handle} to monitoring list", "success")
|
|
return redirect(url_for("accounts_list"))
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/accounts/<int:account_id>/toggle", methods=["POST"])
|
|
def accounts_toggle(account_id):
|
|
"""Toggle an account's active status."""
|
|
session = get_session()
|
|
try:
|
|
acct = session.query(MonitoredAccount).get(account_id)
|
|
if acct:
|
|
acct.is_active = not acct.is_active
|
|
session.commit()
|
|
state = "activated" if acct.is_active else "paused"
|
|
flash(f"{state.capitalize()} monitoring for {acct.handle}", "success")
|
|
return redirect(url_for("accounts_list"))
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/accounts/<int:account_id>/delete", methods=["POST"])
|
|
def accounts_delete(account_id):
|
|
"""Delete an account and all its collected data."""
|
|
session = get_session()
|
|
try:
|
|
acct = session.query(MonitoredAccount).get(account_id)
|
|
if acct:
|
|
handle = acct.handle
|
|
# Delete associated statuses (cascades to mentions, media, tags)
|
|
session.query(Status).filter_by(account_db_id=acct.id).delete()
|
|
session.query(CollectionLog).filter_by(account_db_id=acct.id).delete()
|
|
session.delete(acct)
|
|
session.commit()
|
|
flash(f"Deleted {handle} and all collected data", "success")
|
|
return redirect(url_for("accounts_list"))
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/statuses")
|
|
def statuses_list():
|
|
"""Browse collected statuses with filters."""
|
|
session = get_session()
|
|
try:
|
|
page = request.args.get("page", 1, type=int)
|
|
per_page = request.args.get("per_page", 50, type=int)
|
|
account_id = request.args.get("account_id", type=int)
|
|
status_type = request.args.get("type", "")
|
|
search = request.args.get("q", "").strip()
|
|
|
|
query = session.query(Status).join(MonitoredAccount)
|
|
|
|
if account_id:
|
|
query = query.filter(Status.account_db_id == account_id)
|
|
if status_type:
|
|
query = query.filter(Status.status_type == status_type)
|
|
if search:
|
|
query = query.filter(Status.text_content.ilike(f"%{search}%"))
|
|
|
|
total = query.count()
|
|
statuses = (
|
|
query.order_by(desc(Status.created_at))
|
|
.offset((page - 1) * per_page)
|
|
.limit(per_page)
|
|
.all()
|
|
)
|
|
|
|
accounts = session.query(MonitoredAccount).order_by(MonitoredAccount.username).all()
|
|
total_pages = max(1, (total + per_page - 1) // per_page)
|
|
|
|
return render_template(
|
|
"statuses.html",
|
|
statuses=statuses,
|
|
accounts=accounts,
|
|
page=page,
|
|
per_page=per_page,
|
|
total=total,
|
|
total_pages=total_pages,
|
|
current_account_id=account_id,
|
|
current_type=status_type,
|
|
search=search,
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/statuses/<int:status_db_id>")
|
|
def status_detail(status_db_id):
|
|
"""View a single status with all details."""
|
|
session = get_session()
|
|
try:
|
|
status = session.query(Status).get(status_db_id)
|
|
if not status:
|
|
flash("Status not found", "error")
|
|
return redirect(url_for("statuses_list"))
|
|
return render_template("status_detail.html", status=status)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/api/stats")
|
|
def api_stats():
|
|
"""JSON API endpoint for stats (useful for your analysis pipeline)."""
|
|
session = get_session()
|
|
try:
|
|
stats = {
|
|
"total_statuses": session.query(func.count(Status.id)).scalar() or 0,
|
|
"by_type": {},
|
|
"accounts": [],
|
|
}
|
|
for stype in ["post", "reply", "mention", "reblog"]:
|
|
stats["by_type"][stype] = (
|
|
session.query(func.count(Status.id)).filter(Status.status_type == stype).scalar() or 0
|
|
)
|
|
|
|
accounts = session.query(MonitoredAccount).filter_by(is_active=True).all()
|
|
for acct in accounts:
|
|
count = session.query(func.count(Status.id)).filter(Status.account_db_id == acct.id).scalar() or 0
|
|
stats["accounts"].append({
|
|
"handle": acct.handle,
|
|
"status_count": count,
|
|
"last_collected": acct.last_collected_at.isoformat() if acct.last_collected_at else None,
|
|
})
|
|
|
|
return jsonify(stats)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/api/statuses")
|
|
def api_statuses():
|
|
"""JSON API endpoint for statuses (for your analysis pipeline)."""
|
|
session = get_session()
|
|
try:
|
|
page = request.args.get("page", 1, type=int)
|
|
per_page = min(request.args.get("per_page", 100, type=int), 500)
|
|
account_id = request.args.get("account_id", type=int)
|
|
status_type = request.args.get("type", "")
|
|
since = request.args.get("since", "") # ISO datetime
|
|
|
|
query = session.query(Status)
|
|
|
|
if account_id:
|
|
query = query.filter(Status.account_db_id == account_id)
|
|
if status_type:
|
|
query = query.filter(Status.status_type == status_type)
|
|
if since:
|
|
query = query.filter(Status.created_at >= since)
|
|
|
|
total = query.count()
|
|
statuses = (
|
|
query.order_by(desc(Status.created_at))
|
|
.offset((page - 1) * per_page)
|
|
.limit(per_page)
|
|
.all()
|
|
)
|
|
|
|
return jsonify({
|
|
"total": total,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"statuses": [
|
|
{
|
|
"id": s.id,
|
|
"status_id": s.status_id,
|
|
"account": s.account.handle,
|
|
"url": s.url,
|
|
"content": s.content,
|
|
"text_content": s.text_content,
|
|
"visibility": s.visibility,
|
|
"created_at": s.created_at.isoformat() if s.created_at else None,
|
|
"language": s.language,
|
|
"status_type": s.status_type,
|
|
"in_reply_to_id": s.in_reply_to_id,
|
|
"replies_count": s.replies_count,
|
|
"reblogs_count": s.reblogs_count,
|
|
"favourites_count": s.favourites_count,
|
|
"mentions": [
|
|
{"acct": m.mentioned_acct, "url": m.mentioned_url}
|
|
for m in s.mentions
|
|
],
|
|
"tags": [t.name for t in s.tags],
|
|
}
|
|
for s in statuses
|
|
],
|
|
})
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
@app.route("/export")
|
|
def export_csv():
|
|
"""Export statuses as CSV for analysis."""
|
|
from io import StringIO
|
|
import csv
|
|
|
|
session = get_session()
|
|
try:
|
|
account_id = request.args.get("account_id", type=int)
|
|
status_type = request.args.get("type", "")
|
|
|
|
query = session.query(Status).join(MonitoredAccount)
|
|
if account_id:
|
|
query = query.filter(Status.account_db_id == account_id)
|
|
if status_type:
|
|
query = query.filter(Status.status_type == status_type)
|
|
|
|
statuses = query.order_by(desc(Status.created_at)).all()
|
|
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow([
|
|
"id", "account", "status_type", "created_at", "url",
|
|
"text_content", "language", "visibility", "in_reply_to_id",
|
|
"replies_count", "reblogs_count", "favourites_count",
|
|
"mentions", "tags", "sensitive", "spoiler_text",
|
|
])
|
|
|
|
for s in statuses:
|
|
mentions_str = "; ".join(m.mentioned_acct for m in s.mentions)
|
|
tags_str = "; ".join(t.name for t in s.tags)
|
|
writer.writerow([
|
|
s.status_id, s.account.handle, s.status_type,
|
|
s.created_at.isoformat() if s.created_at else "",
|
|
s.url, s.text_content, s.language, s.visibility,
|
|
s.in_reply_to_id, s.replies_count, s.reblogs_count,
|
|
s.favourites_count, mentions_str, tags_str,
|
|
s.sensitive, s.spoiler_text,
|
|
])
|
|
|
|
from flask import Response
|
|
return Response(
|
|
output.getvalue(),
|
|
mimetype="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=mastodon_statuses.csv"},
|
|
)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=True)
|