Add Flask-based application for collecting and archiving Mastodon posts from configured accounts. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
226 lines
7.8 KiB
Python
226 lines
7.8 KiB
Python
"""Mastodon public API client — no authentication required."""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
from html import unescape
|
|
from typing import Optional
|
|
from urllib.parse import urljoin
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Respect rate limits: Mastodon returns 300 requests per 5 min by default
|
|
DEFAULT_TIMEOUT = 30
|
|
MAX_RETRIES = 3
|
|
RETRY_BACKOFF = 5 # seconds
|
|
|
|
|
|
class MastodonAPIError(Exception):
|
|
pass
|
|
|
|
|
|
class RateLimitError(MastodonAPIError):
|
|
def __init__(self, retry_after: float = 60):
|
|
self.retry_after = retry_after
|
|
super().__init__(f"Rate limited, retry after {retry_after}s")
|
|
|
|
|
|
def _strip_html(html: str) -> str:
|
|
"""Strip HTML tags and decode entities to get plain text."""
|
|
# Replace <br> and </p> with newlines
|
|
text = re.sub(r"<br\s*/?>", "\n", html)
|
|
text = re.sub(r"</p>", "\n", text)
|
|
# Remove all remaining tags
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
return unescape(text).strip()
|
|
|
|
|
|
def _api_get(instance: str, path: str, params: Optional[dict] = None) -> requests.Response:
|
|
"""Make a GET request to a Mastodon instance's public API."""
|
|
url = f"https://{instance}{path}"
|
|
headers = {"Accept": "application/json", "User-Agent": "MastodonCollector/1.0"}
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
resp = requests.get(url, params=params, headers=headers, timeout=DEFAULT_TIMEOUT)
|
|
|
|
if resp.status_code == 429:
|
|
retry_after = float(resp.headers.get("X-RateLimit-Reset", 60))
|
|
# If it's an ISO timestamp, calculate delta
|
|
if retry_after > 1_000_000:
|
|
retry_after = 60
|
|
logger.warning("Rate limited by %s, waiting %.0fs", instance, retry_after)
|
|
raise RateLimitError(retry_after)
|
|
|
|
if resp.status_code == 404:
|
|
raise MastodonAPIError(f"Not found: {url}")
|
|
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
except RateLimitError:
|
|
raise
|
|
except requests.RequestException as e:
|
|
if attempt < MAX_RETRIES - 1:
|
|
wait = RETRY_BACKOFF * (attempt + 1)
|
|
logger.warning("Request to %s failed (attempt %d/%d): %s — retrying in %ds",
|
|
url, attempt + 1, MAX_RETRIES, e, wait)
|
|
time.sleep(wait)
|
|
else:
|
|
raise MastodonAPIError(f"Failed after {MAX_RETRIES} attempts: {e}") from e
|
|
|
|
raise MastodonAPIError("Unexpected retry exhaustion")
|
|
|
|
|
|
def lookup_account(instance: str, username: str) -> dict:
|
|
"""Look up an account on an instance by username. Returns the account JSON."""
|
|
# Try the v1 lookup endpoint first (available on most instances)
|
|
try:
|
|
resp = _api_get(instance, "/api/v1/accounts/lookup", {"acct": username})
|
|
return resp.json()
|
|
except MastodonAPIError:
|
|
pass
|
|
|
|
# Fallback: search for the account
|
|
resp = _api_get(instance, "/api/v2/search", {"q": f"@{username}@{instance}", "type": "accounts", "limit": 1})
|
|
data = resp.json()
|
|
accounts = data.get("accounts", [])
|
|
if not accounts:
|
|
raise MastodonAPIError(f"Account @{username} not found on {instance}")
|
|
return accounts[0]
|
|
|
|
|
|
def get_account_statuses(
|
|
instance: str,
|
|
account_id: str,
|
|
since_id: Optional[str] = None,
|
|
limit: int = 40,
|
|
exclude_reblogs: bool = False,
|
|
) -> list[dict]:
|
|
"""
|
|
Fetch statuses from an account. Handles pagination to get all new statuses.
|
|
Returns list of status dicts, oldest first.
|
|
"""
|
|
all_statuses = []
|
|
params = {"limit": min(limit, 40)}
|
|
if since_id:
|
|
params["since_id"] = since_id
|
|
if exclude_reblogs:
|
|
params["exclude_reblogs"] = "true"
|
|
|
|
path = f"/api/v1/accounts/{account_id}/statuses"
|
|
|
|
# Paginate through results
|
|
max_pages = 25 # safety limit
|
|
page = 0
|
|
|
|
while page < max_pages:
|
|
resp = _api_get(instance, path, params)
|
|
statuses = resp.json()
|
|
|
|
if not statuses:
|
|
break
|
|
|
|
all_statuses.extend(statuses)
|
|
page += 1
|
|
|
|
# Check Link header for next page
|
|
link_header = resp.headers.get("Link", "")
|
|
next_match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
|
|
if not next_match:
|
|
break
|
|
|
|
# Parse the next URL for max_id
|
|
next_url = next_match.group(1)
|
|
max_id_match = re.search(r"max_id=(\d+)", next_url)
|
|
if not max_id_match:
|
|
break
|
|
|
|
params["max_id"] = max_id_match.group(1)
|
|
# Remove since_id for subsequent pages — we're paginating backwards
|
|
# Actually we keep since_id as the floor
|
|
time.sleep(0.5) # Be polite between pages
|
|
|
|
# Return oldest first so we can process chronologically
|
|
all_statuses.reverse()
|
|
return all_statuses
|
|
|
|
|
|
def get_status_context(instance: str, status_id: str) -> dict:
|
|
"""Get the context (ancestors + descendants) of a status. Useful for threading."""
|
|
resp = _api_get(instance, f"/api/v1/statuses/{status_id}/context")
|
|
return resp.json()
|
|
|
|
|
|
def classify_status(status: dict, monitored_account_id: str) -> str:
|
|
"""
|
|
Classify a status as: post, reply, mention, or reblog.
|
|
- reblog: the status is a boost of another status
|
|
- reply: the status is in reply to another status
|
|
- mention: the status mentions other accounts (but is not a reply)
|
|
- post: a standalone original post
|
|
"""
|
|
if status.get("reblog"):
|
|
return "reblog"
|
|
if status.get("in_reply_to_id"):
|
|
return "reply"
|
|
mentions = status.get("mentions", [])
|
|
if mentions:
|
|
# Only classify as "mention" if it mentions someone other than self
|
|
other_mentions = [m for m in mentions if m.get("id") != monitored_account_id]
|
|
if other_mentions:
|
|
return "mention"
|
|
return "post"
|
|
|
|
|
|
def parse_status(status: dict, monitored_account_id: str) -> dict:
|
|
"""Parse a raw Mastodon status JSON into a flat dict for storage."""
|
|
# If it's a reblog, we store the original content but flag it
|
|
actual = status.get("reblog") or status
|
|
content_html = actual.get("content", "")
|
|
|
|
return {
|
|
"status_id": status["id"],
|
|
"uri": status.get("uri", ""),
|
|
"url": status.get("url") or actual.get("url", ""),
|
|
"content": content_html,
|
|
"text_content": _strip_html(content_html),
|
|
"visibility": status.get("visibility", "public"),
|
|
"created_at": status.get("created_at"),
|
|
"language": status.get("language") or actual.get("language"),
|
|
"sensitive": status.get("sensitive", False),
|
|
"spoiler_text": status.get("spoiler_text", ""),
|
|
"in_reply_to_id": status.get("in_reply_to_id"),
|
|
"in_reply_to_account_id": status.get("in_reply_to_account_id"),
|
|
"conversation_id": status.get("conversation", {}).get("id") if isinstance(status.get("conversation"), dict) else None,
|
|
"replies_count": status.get("replies_count", 0),
|
|
"reblogs_count": status.get("reblogs_count", 0),
|
|
"favourites_count": status.get("favourites_count", 0),
|
|
"status_type": classify_status(status, monitored_account_id),
|
|
"mentions": [
|
|
{
|
|
"mentioned_account_id": m.get("id"),
|
|
"mentioned_username": m.get("username", ""),
|
|
"mentioned_acct": m.get("acct", ""),
|
|
"mentioned_url": m.get("url", ""),
|
|
}
|
|
for m in (actual.get("mentions") or [])
|
|
],
|
|
"media_attachments": [
|
|
{
|
|
"media_id": ma.get("id"),
|
|
"media_type": ma.get("type"),
|
|
"url": ma.get("url"),
|
|
"preview_url": ma.get("preview_url"),
|
|
"description": ma.get("description"),
|
|
}
|
|
for ma in (actual.get("media_attachments") or [])
|
|
],
|
|
"tags": [
|
|
{"name": t.get("name", ""), "url": t.get("url", "")}
|
|
for t in (actual.get("tags") or [])
|
|
],
|
|
"raw_json": status,
|
|
}
|