Add Flask-based application for collecting and archiving Mastodon posts from configured accounts. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
186 lines
6.9 KiB
Python
186 lines
6.9 KiB
Python
"""Database models and session management."""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import (
|
|
create_engine,
|
|
Column,
|
|
Integer,
|
|
BigInteger,
|
|
String,
|
|
Text,
|
|
Boolean,
|
|
DateTime,
|
|
ForeignKey,
|
|
Index,
|
|
UniqueConstraint,
|
|
JSON,
|
|
)
|
|
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
|
|
|
|
DATABASE_URL = os.environ.get(
|
|
"DATABASE_URL", "postgresql://collector:collector_secret@localhost:5432/mastodon_collector"
|
|
)
|
|
|
|
engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=5, max_overflow=10)
|
|
SessionLocal = sessionmaker(bind=engine)
|
|
Base = declarative_base()
|
|
|
|
|
|
class MonitoredAccount(Base):
|
|
"""An account we are monitoring."""
|
|
|
|
__tablename__ = "monitored_accounts"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
username = Column(String(255), nullable=False) # e.g. "user"
|
|
instance = Column(String(255), nullable=False) # e.g. "mastodon.social"
|
|
account_id = Column(String(64), nullable=True) # Mastodon numeric account ID on that instance
|
|
display_name = Column(String(512), nullable=True)
|
|
avatar_url = Column(Text, nullable=True)
|
|
is_active = Column(Boolean, default=True, nullable=False)
|
|
last_collected_at = Column(DateTime(timezone=True), nullable=True)
|
|
last_status_id = Column(String(64), nullable=True) # For pagination: newest status ID we've seen
|
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
note = Column(Text, nullable=True) # Bio / description
|
|
|
|
statuses = relationship("Status", back_populates="account", lazy="dynamic")
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint("username", "instance", name="uq_account_handle"),
|
|
)
|
|
|
|
@property
|
|
def handle(self):
|
|
return f"@{self.username}@{self.instance}"
|
|
|
|
def __repr__(self):
|
|
return f"<MonitoredAccount {self.handle}>"
|
|
|
|
|
|
class Status(Base):
|
|
"""A single post / toot collected from Mastodon."""
|
|
|
|
__tablename__ = "statuses"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
status_id = Column(String(64), nullable=False) # Mastodon status ID
|
|
account_db_id = Column(Integer, ForeignKey("monitored_accounts.id"), nullable=False)
|
|
uri = Column(Text, nullable=False) # Canonical ActivityPub URI
|
|
url = Column(Text, nullable=True) # Human-readable URL
|
|
content = Column(Text, nullable=False) # HTML content
|
|
text_content = Column(Text, nullable=True) # Stripped plain-text content
|
|
visibility = Column(String(32), nullable=True) # public, unlisted, private, direct
|
|
created_at = Column(DateTime(timezone=True), nullable=False)
|
|
collected_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
language = Column(String(16), nullable=True)
|
|
sensitive = Column(Boolean, default=False)
|
|
spoiler_text = Column(Text, nullable=True)
|
|
|
|
# Reply / conversation tracking
|
|
in_reply_to_id = Column(String(64), nullable=True) # Status ID being replied to
|
|
in_reply_to_account_id = Column(String(64), nullable=True)
|
|
conversation_id = Column(String(64), nullable=True)
|
|
|
|
# Interaction counts
|
|
replies_count = Column(Integer, default=0)
|
|
reblogs_count = Column(Integer, default=0)
|
|
favourites_count = Column(Integer, default=0)
|
|
|
|
# Classification for your analysis pipeline
|
|
status_type = Column(String(32), nullable=False, default="post") # post, reply, mention, reblog
|
|
|
|
# Store the full JSON for future reference
|
|
raw_json = Column(JSON, nullable=True)
|
|
|
|
# Relationships
|
|
account = relationship("MonitoredAccount", back_populates="statuses")
|
|
mentions = relationship("Mention", back_populates="status", cascade="all, delete-orphan")
|
|
media_attachments = relationship("MediaAttachment", back_populates="status", cascade="all, delete-orphan")
|
|
tags = relationship("Tag", back_populates="status", cascade="all, delete-orphan")
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint("status_id", "account_db_id", name="uq_status_per_account"),
|
|
Index("ix_status_created", "created_at"),
|
|
Index("ix_status_type", "status_type"),
|
|
Index("ix_status_account", "account_db_id"),
|
|
Index("ix_status_conversation", "conversation_id"),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Status {self.status_id} type={self.status_type}>"
|
|
|
|
|
|
class Mention(Base):
|
|
"""A mention within a status (who was @-mentioned)."""
|
|
|
|
__tablename__ = "mentions"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
status_db_id = Column(Integer, ForeignKey("statuses.id", ondelete="CASCADE"), nullable=False)
|
|
mentioned_account_id = Column(String(64), nullable=True)
|
|
mentioned_username = Column(String(255), nullable=False)
|
|
mentioned_acct = Column(String(512), nullable=False) # full user@instance
|
|
mentioned_url = Column(Text, nullable=True)
|
|
|
|
status = relationship("Status", back_populates="mentions")
|
|
|
|
|
|
class MediaAttachment(Base):
|
|
"""Media attached to a status."""
|
|
|
|
__tablename__ = "media_attachments"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
status_db_id = Column(Integer, ForeignKey("statuses.id", ondelete="CASCADE"), nullable=False)
|
|
media_id = Column(String(64), nullable=True)
|
|
media_type = Column(String(32), nullable=True) # image, video, gifv, audio
|
|
url = Column(Text, nullable=True)
|
|
preview_url = Column(Text, nullable=True)
|
|
description = Column(Text, nullable=True) # alt text
|
|
|
|
status = relationship("Status", back_populates="media_attachments")
|
|
|
|
|
|
class Tag(Base):
|
|
"""A hashtag used in a status."""
|
|
|
|
__tablename__ = "tags"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
status_db_id = Column(Integer, ForeignKey("statuses.id", ondelete="CASCADE"), nullable=False)
|
|
name = Column(String(255), nullable=False)
|
|
url = Column(Text, nullable=True)
|
|
|
|
status = relationship("Status", back_populates="tags")
|
|
|
|
__table_args__ = (
|
|
Index("ix_tag_name", "name"),
|
|
)
|
|
|
|
|
|
class CollectionLog(Base):
|
|
"""Log of each collection run for monitoring."""
|
|
|
|
__tablename__ = "collection_logs"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
account_db_id = Column(Integer, ForeignKey("monitored_accounts.id"), nullable=True)
|
|
started_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
finished_at = Column(DateTime(timezone=True), nullable=True)
|
|
statuses_collected = Column(Integer, default=0)
|
|
error = Column(Text, nullable=True)
|
|
status = Column(String(32), default="running") # running, success, error
|
|
|
|
account = relationship("MonitoredAccount")
|
|
|
|
|
|
def init_db():
|
|
"""Create all tables."""
|
|
Base.metadata.create_all(engine)
|
|
|
|
|
|
def get_session():
|
|
"""Get a new database session."""
|
|
return SessionLocal()
|