mastodon-collector/app/mastodon_api.py
Pieter 72dbf0d2b6 Initial commit: Mastodon collector application
Add Flask-based application for collecting and archiving Mastodon posts from configured accounts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-09 08:05:54 +01:00

226 lines
7.8 KiB
Python

"""Mastodon public API client — no authentication required."""
import logging
import re
import time
from html import unescape
from typing import Optional
from urllib.parse import urljoin
import requests
logger = logging.getLogger(__name__)
# Respect rate limits: Mastodon returns 300 requests per 5 min by default
DEFAULT_TIMEOUT = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 5 # seconds
class MastodonAPIError(Exception):
pass
class RateLimitError(MastodonAPIError):
def __init__(self, retry_after: float = 60):
self.retry_after = retry_after
super().__init__(f"Rate limited, retry after {retry_after}s")
def _strip_html(html: str) -> str:
"""Strip HTML tags and decode entities to get plain text."""
# Replace <br> and </p> with newlines
text = re.sub(r"<br\s*/?>", "\n", html)
text = re.sub(r"</p>", "\n", text)
# Remove all remaining tags
text = re.sub(r"<[^>]+>", "", text)
return unescape(text).strip()
def _api_get(instance: str, path: str, params: Optional[dict] = None) -> requests.Response:
"""Make a GET request to a Mastodon instance's public API."""
url = f"https://{instance}{path}"
headers = {"Accept": "application/json", "User-Agent": "MastodonCollector/1.0"}
for attempt in range(MAX_RETRIES):
try:
resp = requests.get(url, params=params, headers=headers, timeout=DEFAULT_TIMEOUT)
if resp.status_code == 429:
retry_after = float(resp.headers.get("X-RateLimit-Reset", 60))
# If it's an ISO timestamp, calculate delta
if retry_after > 1_000_000:
retry_after = 60
logger.warning("Rate limited by %s, waiting %.0fs", instance, retry_after)
raise RateLimitError(retry_after)
if resp.status_code == 404:
raise MastodonAPIError(f"Not found: {url}")
resp.raise_for_status()
return resp
except RateLimitError:
raise
except requests.RequestException as e:
if attempt < MAX_RETRIES - 1:
wait = RETRY_BACKOFF * (attempt + 1)
logger.warning("Request to %s failed (attempt %d/%d): %s — retrying in %ds",
url, attempt + 1, MAX_RETRIES, e, wait)
time.sleep(wait)
else:
raise MastodonAPIError(f"Failed after {MAX_RETRIES} attempts: {e}") from e
raise MastodonAPIError("Unexpected retry exhaustion")
def lookup_account(instance: str, username: str) -> dict:
"""Look up an account on an instance by username. Returns the account JSON."""
# Try the v1 lookup endpoint first (available on most instances)
try:
resp = _api_get(instance, "/api/v1/accounts/lookup", {"acct": username})
return resp.json()
except MastodonAPIError:
pass
# Fallback: search for the account
resp = _api_get(instance, "/api/v2/search", {"q": f"@{username}@{instance}", "type": "accounts", "limit": 1})
data = resp.json()
accounts = data.get("accounts", [])
if not accounts:
raise MastodonAPIError(f"Account @{username} not found on {instance}")
return accounts[0]
def get_account_statuses(
instance: str,
account_id: str,
since_id: Optional[str] = None,
limit: int = 40,
exclude_reblogs: bool = False,
) -> list[dict]:
"""
Fetch statuses from an account. Handles pagination to get all new statuses.
Returns list of status dicts, oldest first.
"""
all_statuses = []
params = {"limit": min(limit, 40)}
if since_id:
params["since_id"] = since_id
if exclude_reblogs:
params["exclude_reblogs"] = "true"
path = f"/api/v1/accounts/{account_id}/statuses"
# Paginate through results
max_pages = 25 # safety limit
page = 0
while page < max_pages:
resp = _api_get(instance, path, params)
statuses = resp.json()
if not statuses:
break
all_statuses.extend(statuses)
page += 1
# Check Link header for next page
link_header = resp.headers.get("Link", "")
next_match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if not next_match:
break
# Parse the next URL for max_id
next_url = next_match.group(1)
max_id_match = re.search(r"max_id=(\d+)", next_url)
if not max_id_match:
break
params["max_id"] = max_id_match.group(1)
# Remove since_id for subsequent pages — we're paginating backwards
# Actually we keep since_id as the floor
time.sleep(0.5) # Be polite between pages
# Return oldest first so we can process chronologically
all_statuses.reverse()
return all_statuses
def get_status_context(instance: str, status_id: str) -> dict:
"""Get the context (ancestors + descendants) of a status. Useful for threading."""
resp = _api_get(instance, f"/api/v1/statuses/{status_id}/context")
return resp.json()
def classify_status(status: dict, monitored_account_id: str) -> str:
"""
Classify a status as: post, reply, mention, or reblog.
- reblog: the status is a boost of another status
- reply: the status is in reply to another status
- mention: the status mentions other accounts (but is not a reply)
- post: a standalone original post
"""
if status.get("reblog"):
return "reblog"
if status.get("in_reply_to_id"):
return "reply"
mentions = status.get("mentions", [])
if mentions:
# Only classify as "mention" if it mentions someone other than self
other_mentions = [m for m in mentions if m.get("id") != monitored_account_id]
if other_mentions:
return "mention"
return "post"
def parse_status(status: dict, monitored_account_id: str) -> dict:
"""Parse a raw Mastodon status JSON into a flat dict for storage."""
# If it's a reblog, we store the original content but flag it
actual = status.get("reblog") or status
content_html = actual.get("content", "")
return {
"status_id": status["id"],
"uri": status.get("uri", ""),
"url": status.get("url") or actual.get("url", ""),
"content": content_html,
"text_content": _strip_html(content_html),
"visibility": status.get("visibility", "public"),
"created_at": status.get("created_at"),
"language": status.get("language") or actual.get("language"),
"sensitive": status.get("sensitive", False),
"spoiler_text": status.get("spoiler_text", ""),
"in_reply_to_id": status.get("in_reply_to_id"),
"in_reply_to_account_id": status.get("in_reply_to_account_id"),
"conversation_id": status.get("conversation", {}).get("id") if isinstance(status.get("conversation"), dict) else None,
"replies_count": status.get("replies_count", 0),
"reblogs_count": status.get("reblogs_count", 0),
"favourites_count": status.get("favourites_count", 0),
"status_type": classify_status(status, monitored_account_id),
"mentions": [
{
"mentioned_account_id": m.get("id"),
"mentioned_username": m.get("username", ""),
"mentioned_acct": m.get("acct", ""),
"mentioned_url": m.get("url", ""),
}
for m in (actual.get("mentions") or [])
],
"media_attachments": [
{
"media_id": ma.get("id"),
"media_type": ma.get("type"),
"url": ma.get("url"),
"preview_url": ma.get("preview_url"),
"description": ma.get("description"),
}
for ma in (actual.get("media_attachments") or [])
],
"tags": [
{"name": t.get("name", ""), "url": t.get("url", "")}
for t in (actual.get("tags") or [])
],
"raw_json": status,
}