307 lines
9.7 KiB
Python
307 lines
9.7 KiB
Python
|
|
"""
|
||
|
|
Collector service — periodically polls Mastodon for new statuses from monitored accounts.
|
||
|
|
Runs as a standalone process via `python -m app.collector`.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
||
|
|
|
||
|
|
from app.db import (
|
||
|
|
init_db,
|
||
|
|
get_session,
|
||
|
|
MonitoredAccount,
|
||
|
|
Status,
|
||
|
|
Mention,
|
||
|
|
MediaAttachment,
|
||
|
|
Tag,
|
||
|
|
CollectionLog,
|
||
|
|
)
|
||
|
|
from app.mastodon_api import (
|
||
|
|
lookup_account,
|
||
|
|
get_account_statuses,
|
||
|
|
parse_status,
|
||
|
|
MastodonAPIError,
|
||
|
|
RateLimitError,
|
||
|
|
)
|
||
|
|
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
||
|
|
)
|
||
|
|
logger = logging.getLogger("collector")
|
||
|
|
|
||
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL_SECONDS", 14400))
|
||
|
|
ACCOUNTS_FILE = os.environ.get("ACCOUNTS_FILE", "/app/accounts.txt")
|
||
|
|
|
||
|
|
|
||
|
|
def load_accounts_from_file(filepath: str) -> list[tuple[str, str]]:
|
||
|
|
"""Parse accounts.txt and return list of (username, instance) tuples."""
|
||
|
|
accounts = []
|
||
|
|
path = Path(filepath)
|
||
|
|
if not path.exists():
|
||
|
|
logger.warning("Accounts file not found: %s", filepath)
|
||
|
|
return accounts
|
||
|
|
|
||
|
|
for line in path.read_text().splitlines():
|
||
|
|
line = line.strip()
|
||
|
|
if not line or line.startswith("#"):
|
||
|
|
continue
|
||
|
|
# Expected format: @user@instance.social or user@instance.social
|
||
|
|
line = line.lstrip("@")
|
||
|
|
if "@" not in line:
|
||
|
|
logger.warning("Skipping malformed account line: %s", line)
|
||
|
|
continue
|
||
|
|
parts = line.split("@", 1)
|
||
|
|
if len(parts) == 2 and parts[0] and parts[1]:
|
||
|
|
accounts.append((parts[0], parts[1]))
|
||
|
|
else:
|
||
|
|
logger.warning("Skipping malformed account line: %s", line)
|
||
|
|
return accounts
|
||
|
|
|
||
|
|
|
||
|
|
def sync_monitored_accounts(session) -> list[MonitoredAccount]:
|
||
|
|
"""
|
||
|
|
Sync accounts from the file + database.
|
||
|
|
Accounts added via web UI are already in the DB.
|
||
|
|
Accounts in the file get added if missing.
|
||
|
|
Returns all active monitored accounts.
|
||
|
|
"""
|
||
|
|
file_accounts = load_accounts_from_file(ACCOUNTS_FILE)
|
||
|
|
|
||
|
|
for username, instance in file_accounts:
|
||
|
|
existing = (
|
||
|
|
session.query(MonitoredAccount)
|
||
|
|
.filter_by(username=username, instance=instance)
|
||
|
|
.first()
|
||
|
|
)
|
||
|
|
if not existing:
|
||
|
|
logger.info("Adding account from file: @%s@%s", username, instance)
|
||
|
|
acct = MonitoredAccount(username=username, instance=instance, is_active=True)
|
||
|
|
session.add(acct)
|
||
|
|
|
||
|
|
session.commit()
|
||
|
|
return session.query(MonitoredAccount).filter_by(is_active=True).all()
|
||
|
|
|
||
|
|
|
||
|
|
def resolve_account(session, account: MonitoredAccount) -> bool:
|
||
|
|
"""Look up the Mastodon account ID if we don't have it yet."""
|
||
|
|
if account.account_id:
|
||
|
|
return True
|
||
|
|
|
||
|
|
try:
|
||
|
|
data = lookup_account(account.instance, account.username)
|
||
|
|
account.account_id = data["id"]
|
||
|
|
account.display_name = data.get("display_name", "")
|
||
|
|
account.avatar_url = data.get("avatar", "")
|
||
|
|
account.note = data.get("note", "")
|
||
|
|
session.commit()
|
||
|
|
logger.info("Resolved %s → account_id=%s", account.handle, account.account_id)
|
||
|
|
return True
|
||
|
|
except MastodonAPIError as e:
|
||
|
|
logger.error("Failed to resolve %s: %s", account.handle, e)
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def store_status(session, account: MonitoredAccount, parsed: dict) -> bool:
|
||
|
|
"""Store a parsed status in the database. Returns True if new, False if duplicate."""
|
||
|
|
# Check for duplicate
|
||
|
|
existing = (
|
||
|
|
session.query(Status)
|
||
|
|
.filter_by(status_id=parsed["status_id"], account_db_id=account.id)
|
||
|
|
.first()
|
||
|
|
)
|
||
|
|
if existing:
|
||
|
|
# Update interaction counts in case they changed
|
||
|
|
existing.replies_count = parsed["replies_count"]
|
||
|
|
existing.reblogs_count = parsed["reblogs_count"]
|
||
|
|
existing.favourites_count = parsed["favourites_count"]
|
||
|
|
return False
|
||
|
|
|
||
|
|
status = Status(
|
||
|
|
status_id=parsed["status_id"],
|
||
|
|
account_db_id=account.id,
|
||
|
|
uri=parsed["uri"],
|
||
|
|
url=parsed["url"],
|
||
|
|
content=parsed["content"],
|
||
|
|
text_content=parsed["text_content"],
|
||
|
|
visibility=parsed["visibility"],
|
||
|
|
created_at=parsed["created_at"],
|
||
|
|
language=parsed["language"],
|
||
|
|
sensitive=parsed["sensitive"],
|
||
|
|
spoiler_text=parsed["spoiler_text"],
|
||
|
|
in_reply_to_id=parsed["in_reply_to_id"],
|
||
|
|
in_reply_to_account_id=parsed["in_reply_to_account_id"],
|
||
|
|
conversation_id=parsed["conversation_id"],
|
||
|
|
replies_count=parsed["replies_count"],
|
||
|
|
reblogs_count=parsed["reblogs_count"],
|
||
|
|
favourites_count=parsed["favourites_count"],
|
||
|
|
status_type=parsed["status_type"],
|
||
|
|
raw_json=parsed["raw_json"],
|
||
|
|
)
|
||
|
|
session.add(status)
|
||
|
|
session.flush() # get status.id
|
||
|
|
|
||
|
|
# Store mentions
|
||
|
|
for m in parsed["mentions"]:
|
||
|
|
session.add(Mention(
|
||
|
|
status_db_id=status.id,
|
||
|
|
mentioned_account_id=m["mentioned_account_id"],
|
||
|
|
mentioned_username=m["mentioned_username"],
|
||
|
|
mentioned_acct=m["mentioned_acct"],
|
||
|
|
mentioned_url=m["mentioned_url"],
|
||
|
|
))
|
||
|
|
|
||
|
|
# Store media
|
||
|
|
for ma in parsed["media_attachments"]:
|
||
|
|
session.add(MediaAttachment(
|
||
|
|
status_db_id=status.id,
|
||
|
|
media_id=ma["media_id"],
|
||
|
|
media_type=ma["media_type"],
|
||
|
|
url=ma["url"],
|
||
|
|
preview_url=ma["preview_url"],
|
||
|
|
description=ma["description"],
|
||
|
|
))
|
||
|
|
|
||
|
|
# Store tags
|
||
|
|
for t in parsed["tags"]:
|
||
|
|
session.add(Tag(
|
||
|
|
status_db_id=status.id,
|
||
|
|
name=t["name"],
|
||
|
|
url=t["url"],
|
||
|
|
))
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def collect_account(session, account: MonitoredAccount) -> int:
|
||
|
|
"""Collect new statuses for a single account. Returns count of new statuses."""
|
||
|
|
log = CollectionLog(account_db_id=account.id, status="running")
|
||
|
|
session.add(log)
|
||
|
|
session.commit()
|
||
|
|
|
||
|
|
try:
|
||
|
|
if not resolve_account(session, account):
|
||
|
|
log.status = "error"
|
||
|
|
log.error = "Could not resolve account ID"
|
||
|
|
log.finished_at = datetime.now(timezone.utc)
|
||
|
|
session.commit()
|
||
|
|
return 0
|
||
|
|
|
||
|
|
logger.info("Collecting statuses for %s (since_id=%s)", account.handle, account.last_status_id)
|
||
|
|
|
||
|
|
raw_statuses = get_account_statuses(
|
||
|
|
instance=account.instance,
|
||
|
|
account_id=account.account_id,
|
||
|
|
since_id=account.last_status_id,
|
||
|
|
)
|
||
|
|
|
||
|
|
new_count = 0
|
||
|
|
newest_id = account.last_status_id
|
||
|
|
|
||
|
|
for raw in raw_statuses:
|
||
|
|
parsed = parse_status(raw, account.account_id)
|
||
|
|
is_new = store_status(session, account, parsed)
|
||
|
|
if is_new:
|
||
|
|
new_count += 1
|
||
|
|
|
||
|
|
# Track the newest status ID
|
||
|
|
sid = parsed["status_id"]
|
||
|
|
if newest_id is None or sid > newest_id:
|
||
|
|
newest_id = sid
|
||
|
|
|
||
|
|
if newest_id:
|
||
|
|
account.last_status_id = newest_id
|
||
|
|
account.last_collected_at = datetime.now(timezone.utc)
|
||
|
|
|
||
|
|
log.statuses_collected = new_count
|
||
|
|
log.status = "success"
|
||
|
|
log.finished_at = datetime.now(timezone.utc)
|
||
|
|
session.commit()
|
||
|
|
|
||
|
|
logger.info("Collected %d new statuses for %s (total fetched: %d)",
|
||
|
|
new_count, account.handle, len(raw_statuses))
|
||
|
|
return new_count
|
||
|
|
|
||
|
|
except RateLimitError as e:
|
||
|
|
log.status = "error"
|
||
|
|
log.error = f"Rate limited: {e}"
|
||
|
|
log.finished_at = datetime.now(timezone.utc)
|
||
|
|
session.commit()
|
||
|
|
logger.warning("Rate limited while collecting %s: %s", account.handle, e)
|
||
|
|
time.sleep(e.retry_after)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
except MastodonAPIError as e:
|
||
|
|
log.status = "error"
|
||
|
|
log.error = str(e)
|
||
|
|
log.finished_at = datetime.now(timezone.utc)
|
||
|
|
session.commit()
|
||
|
|
logger.error("API error collecting %s: %s", account.handle, e)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
log.status = "error"
|
||
|
|
log.error = str(e)
|
||
|
|
log.finished_at = datetime.now(timezone.utc)
|
||
|
|
session.commit()
|
||
|
|
logger.exception("Unexpected error collecting %s", account.handle)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def run_collection_cycle():
|
||
|
|
"""Run one full collection cycle across all monitored accounts."""
|
||
|
|
logger.info("=== Starting collection cycle ===")
|
||
|
|
session = get_session()
|
||
|
|
|
||
|
|
try:
|
||
|
|
accounts = sync_monitored_accounts(session)
|
||
|
|
logger.info("Monitoring %d active accounts", len(accounts))
|
||
|
|
|
||
|
|
total_new = 0
|
||
|
|
for account in accounts:
|
||
|
|
new = collect_account(session, account)
|
||
|
|
total_new += new
|
||
|
|
time.sleep(1) # Brief pause between accounts to be polite
|
||
|
|
|
||
|
|
logger.info("=== Collection cycle complete: %d new statuses across %d accounts ===",
|
||
|
|
total_new, len(accounts))
|
||
|
|
|
||
|
|
except Exception:
|
||
|
|
logger.exception("Fatal error in collection cycle")
|
||
|
|
finally:
|
||
|
|
session.close()
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Entry point: initialize DB and start the scheduler."""
|
||
|
|
logger.info("Mastodon Collector starting up...")
|
||
|
|
logger.info("Poll interval: %d seconds (%d hours)", POLL_INTERVAL, POLL_INTERVAL // 3600)
|
||
|
|
|
||
|
|
init_db()
|
||
|
|
logger.info("Database initialized")
|
||
|
|
|
||
|
|
# Run one collection immediately on startup
|
||
|
|
run_collection_cycle()
|
||
|
|
|
||
|
|
# Schedule recurring collection
|
||
|
|
scheduler = BlockingScheduler()
|
||
|
|
scheduler.add_job(run_collection_cycle, "interval", seconds=POLL_INTERVAL)
|
||
|
|
logger.info("Scheduler started — next run in %d seconds", POLL_INTERVAL)
|
||
|
|
|
||
|
|
try:
|
||
|
|
scheduler.start()
|
||
|
|
except (KeyboardInterrupt, SystemExit):
|
||
|
|
logger.info("Collector shutting down")
|
||
|
|
scheduler.shutdown()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|