Persistent Python daemon connecting Matrix DM room to AiAgent API. - Config-driven (JSON config file) - Extensible command system (/new_session, /help) - Typing indicators while agent processes - Session auto-naming for identification - Persistent state across restarts - Token refresh, retry logic, error handling - Python stdlib only — no external dependencies
75 lines
2.4 KiB
Python
75 lines
2.4 KiB
Python
"""Persistent state management for the bridge.
|
|
|
|
Saves and loads session ID, Matrix sync batch token, and processed event IDs
|
|
so the bridge survives crashes and restarts without losing context.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Optional
|
|
|
|
log = logging.getLogger("matrix-bridge.state")
|
|
|
|
|
|
class BridgeState:
|
|
"""Persistent state that survives bridge restarts."""
|
|
|
|
def __init__(
|
|
self,
|
|
session_id: Optional[str] = None,
|
|
next_batch: Optional[str] = None,
|
|
processed_event_ids: Optional[list[str]] = None,
|
|
):
|
|
self.session_id = session_id
|
|
self.next_batch = next_batch
|
|
self.processed_event_ids = processed_event_ids or []
|
|
|
|
@classmethod
|
|
def load(cls, path: str) -> "BridgeState":
|
|
"""Load state from disk, or return defaults."""
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, "r") as f:
|
|
data = json.load(f)
|
|
return cls(
|
|
session_id=data.get("session_id"),
|
|
next_batch=data.get("next_batch"),
|
|
processed_event_ids=data.get("processed_event_ids", []),
|
|
)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
log.warning(f"Failed to load state file: {e}")
|
|
return cls()
|
|
|
|
def save(self, path: str) -> None:
|
|
"""Persist state to disk."""
|
|
data = {
|
|
"session_id": self.session_id,
|
|
"next_batch": self.next_batch,
|
|
"processed_event_ids": self.processed_event_ids,
|
|
}
|
|
try:
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
except IOError as e:
|
|
log.error(f"Failed to save state: {e}")
|
|
|
|
def mark_processed(self, event_ids: list[str], limit: int = 200) -> None:
|
|
"""Add event IDs to the processed set and trim to prevent bloat."""
|
|
processed = set(self.processed_event_ids)
|
|
for eid in event_ids:
|
|
processed.add(eid)
|
|
if len(processed) > limit:
|
|
processed = set(list(processed)[-limit:])
|
|
self.processed_event_ids = list(processed)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"BridgeState("
|
|
f"session={self.session_id}, "
|
|
f"batch={'set' if self.next_batch else 'unset'}, "
|
|
f"processed={len(self.processed_event_ids)})"
|
|
)
|