mastodon-collector/app/collector.py
Pieter 72dbf0d2b6 Initial commit: Mastodon collector application
Add Flask-based application for collecting and archiving Mastodon posts from configured accounts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-09 08:05:54 +01:00

306 lines
9.7 KiB
Python

"""
Collector service — periodically polls Mastodon for new statuses from monitored accounts.
Runs as a standalone process via `python -m app.collector`.
"""
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from apscheduler.schedulers.blocking import BlockingScheduler
from app.db import (
init_db,
get_session,
MonitoredAccount,
Status,
Mention,
MediaAttachment,
Tag,
CollectionLog,
)
from app.mastodon_api import (
lookup_account,
get_account_statuses,
parse_status,
MastodonAPIError,
RateLimitError,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("collector")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL_SECONDS", 14400))
ACCOUNTS_FILE = os.environ.get("ACCOUNTS_FILE", "/app/accounts.txt")
def load_accounts_from_file(filepath: str) -> list[tuple[str, str]]:
"""Parse accounts.txt and return list of (username, instance) tuples."""
accounts = []
path = Path(filepath)
if not path.exists():
logger.warning("Accounts file not found: %s", filepath)
return accounts
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
# Expected format: @user@instance.social or user@instance.social
line = line.lstrip("@")
if "@" not in line:
logger.warning("Skipping malformed account line: %s", line)
continue
parts = line.split("@", 1)
if len(parts) == 2 and parts[0] and parts[1]:
accounts.append((parts[0], parts[1]))
else:
logger.warning("Skipping malformed account line: %s", line)
return accounts
def sync_monitored_accounts(session) -> list[MonitoredAccount]:
"""
Sync accounts from the file + database.
Accounts added via web UI are already in the DB.
Accounts in the file get added if missing.
Returns all active monitored accounts.
"""
file_accounts = load_accounts_from_file(ACCOUNTS_FILE)
for username, instance in file_accounts:
existing = (
session.query(MonitoredAccount)
.filter_by(username=username, instance=instance)
.first()
)
if not existing:
logger.info("Adding account from file: @%s@%s", username, instance)
acct = MonitoredAccount(username=username, instance=instance, is_active=True)
session.add(acct)
session.commit()
return session.query(MonitoredAccount).filter_by(is_active=True).all()
def resolve_account(session, account: MonitoredAccount) -> bool:
"""Look up the Mastodon account ID if we don't have it yet."""
if account.account_id:
return True
try:
data = lookup_account(account.instance, account.username)
account.account_id = data["id"]
account.display_name = data.get("display_name", "")
account.avatar_url = data.get("avatar", "")
account.note = data.get("note", "")
session.commit()
logger.info("Resolved %s → account_id=%s", account.handle, account.account_id)
return True
except MastodonAPIError as e:
logger.error("Failed to resolve %s: %s", account.handle, e)
return False
def store_status(session, account: MonitoredAccount, parsed: dict) -> bool:
"""Store a parsed status in the database. Returns True if new, False if duplicate."""
# Check for duplicate
existing = (
session.query(Status)
.filter_by(status_id=parsed["status_id"], account_db_id=account.id)
.first()
)
if existing:
# Update interaction counts in case they changed
existing.replies_count = parsed["replies_count"]
existing.reblogs_count = parsed["reblogs_count"]
existing.favourites_count = parsed["favourites_count"]
return False
status = Status(
status_id=parsed["status_id"],
account_db_id=account.id,
uri=parsed["uri"],
url=parsed["url"],
content=parsed["content"],
text_content=parsed["text_content"],
visibility=parsed["visibility"],
created_at=parsed["created_at"],
language=parsed["language"],
sensitive=parsed["sensitive"],
spoiler_text=parsed["spoiler_text"],
in_reply_to_id=parsed["in_reply_to_id"],
in_reply_to_account_id=parsed["in_reply_to_account_id"],
conversation_id=parsed["conversation_id"],
replies_count=parsed["replies_count"],
reblogs_count=parsed["reblogs_count"],
favourites_count=parsed["favourites_count"],
status_type=parsed["status_type"],
raw_json=parsed["raw_json"],
)
session.add(status)
session.flush() # get status.id
# Store mentions
for m in parsed["mentions"]:
session.add(Mention(
status_db_id=status.id,
mentioned_account_id=m["mentioned_account_id"],
mentioned_username=m["mentioned_username"],
mentioned_acct=m["mentioned_acct"],
mentioned_url=m["mentioned_url"],
))
# Store media
for ma in parsed["media_attachments"]:
session.add(MediaAttachment(
status_db_id=status.id,
media_id=ma["media_id"],
media_type=ma["media_type"],
url=ma["url"],
preview_url=ma["preview_url"],
description=ma["description"],
))
# Store tags
for t in parsed["tags"]:
session.add(Tag(
status_db_id=status.id,
name=t["name"],
url=t["url"],
))
return True
def collect_account(session, account: MonitoredAccount) -> int:
"""Collect new statuses for a single account. Returns count of new statuses."""
log = CollectionLog(account_db_id=account.id, status="running")
session.add(log)
session.commit()
try:
if not resolve_account(session, account):
log.status = "error"
log.error = "Could not resolve account ID"
log.finished_at = datetime.now(timezone.utc)
session.commit()
return 0
logger.info("Collecting statuses for %s (since_id=%s)", account.handle, account.last_status_id)
raw_statuses = get_account_statuses(
instance=account.instance,
account_id=account.account_id,
since_id=account.last_status_id,
)
new_count = 0
newest_id = account.last_status_id
for raw in raw_statuses:
parsed = parse_status(raw, account.account_id)
is_new = store_status(session, account, parsed)
if is_new:
new_count += 1
# Track the newest status ID
sid = parsed["status_id"]
if newest_id is None or sid > newest_id:
newest_id = sid
if newest_id:
account.last_status_id = newest_id
account.last_collected_at = datetime.now(timezone.utc)
log.statuses_collected = new_count
log.status = "success"
log.finished_at = datetime.now(timezone.utc)
session.commit()
logger.info("Collected %d new statuses for %s (total fetched: %d)",
new_count, account.handle, len(raw_statuses))
return new_count
except RateLimitError as e:
log.status = "error"
log.error = f"Rate limited: {e}"
log.finished_at = datetime.now(timezone.utc)
session.commit()
logger.warning("Rate limited while collecting %s: %s", account.handle, e)
time.sleep(e.retry_after)
return 0
except MastodonAPIError as e:
log.status = "error"
log.error = str(e)
log.finished_at = datetime.now(timezone.utc)
session.commit()
logger.error("API error collecting %s: %s", account.handle, e)
return 0
except Exception as e:
log.status = "error"
log.error = str(e)
log.finished_at = datetime.now(timezone.utc)
session.commit()
logger.exception("Unexpected error collecting %s", account.handle)
return 0
def run_collection_cycle():
"""Run one full collection cycle across all monitored accounts."""
logger.info("=== Starting collection cycle ===")
session = get_session()
try:
accounts = sync_monitored_accounts(session)
logger.info("Monitoring %d active accounts", len(accounts))
total_new = 0
for account in accounts:
new = collect_account(session, account)
total_new += new
time.sleep(1) # Brief pause between accounts to be polite
logger.info("=== Collection cycle complete: %d new statuses across %d accounts ===",
total_new, len(accounts))
except Exception:
logger.exception("Fatal error in collection cycle")
finally:
session.close()
def main():
"""Entry point: initialize DB and start the scheduler."""
logger.info("Mastodon Collector starting up...")
logger.info("Poll interval: %d seconds (%d hours)", POLL_INTERVAL, POLL_INTERVAL // 3600)
init_db()
logger.info("Database initialized")
# Run one collection immediately on startup
run_collection_cycle()
# Schedule recurring collection
scheduler = BlockingScheduler()
scheduler.add_job(run_collection_cycle, "interval", seconds=POLL_INTERVAL)
logger.info("Scheduler started — next run in %d seconds", POLL_INTERVAL)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.info("Collector shutting down")
scheduler.shutdown()
if __name__ == "__main__":
main()