"""Persistent state management for the bridge. Saves and loads session ID, Matrix sync batch token, and processed event IDs so the bridge survives crashes and restarts without losing context. """ from __future__ import annotations import json import logging import os from typing import Optional log = logging.getLogger("matrix-bridge.state") class BridgeState: """Persistent state that survives bridge restarts.""" def __init__( self, session_id: Optional[str] = None, next_batch: Optional[str] = None, processed_event_ids: Optional[list[str]] = None, ): self.session_id = session_id self.next_batch = next_batch self.processed_event_ids = processed_event_ids or [] @classmethod def load(cls, path: str) -> "BridgeState": """Load state from disk, or return defaults.""" if os.path.exists(path): try: with open(path, "r") as f: data = json.load(f) return cls( session_id=data.get("session_id"), next_batch=data.get("next_batch"), processed_event_ids=data.get("processed_event_ids", []), ) except (json.JSONDecodeError, IOError) as e: log.warning(f"Failed to load state file: {e}") return cls() def save(self, path: str) -> None: """Persist state to disk.""" data = { "session_id": self.session_id, "next_batch": self.next_batch, "processed_event_ids": self.processed_event_ids, } try: with open(path, "w") as f: json.dump(data, f, indent=2) except IOError as e: log.error(f"Failed to save state: {e}") def mark_processed(self, event_ids: list[str], limit: int = 200) -> None: """Add event IDs to the processed set and trim to prevent bloat.""" processed = set(self.processed_event_ids) for eid in event_ids: processed.add(eid) if len(processed) > limit: processed = set(list(processed)[-limit:]) self.processed_event_ids = list(processed) def __repr__(self) -> str: return ( f"BridgeState(" f"session={self.session_id}, " f"batch={'set' if self.next_batch else 'unset'}, " f"processed={len(self.processed_event_ids)})" )