From 751584c99d86b2a9c65c6c2994d96c78ca9fbc31 Mon Sep 17 00:00:00 2001 From: Lucy Date: Fri, 15 May 2026 13:45:24 +0200 Subject: [PATCH] Initial commit: Matrix Bridge Daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persistent Python daemon connecting Matrix DM room to AiAgent API. - Config-driven (JSON config file) - Extensible command system (/new_session, /help) - Typing indicators while agent processes - Session auto-naming for identification - Persistent state across restarts - Token refresh, retry logic, error handling - Python stdlib only — no external dependencies --- .gitignore | 23 ++++ README.md | 57 ++++++++ agent_client.py | 179 ++++++++++++++++++++++++++ bridge.py | 329 +++++++++++++++++++++++++++++++++++++++++++++++ config.json | 21 +++ config.py | 59 +++++++++ main.py | 74 +++++++++++ matrix_client.py | 206 +++++++++++++++++++++++++++++ requirements.txt | 2 + state.py | 74 +++++++++++ 10 files changed, 1024 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 agent_client.py create mode 100644 bridge.py create mode 100644 config.json create mode 100644 config.py create mode 100644 main.py create mode 100644 matrix_client.py create mode 100644 requirements.txt create mode 100644 state.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5dc0fbb --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Python +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +*.egg +dist/ +build/ + +# Editor +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Project-specific +bridge-state.json +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..17a1d4e --- /dev/null +++ b/README.md @@ -0,0 +1,57 @@ +# Matrix Bridge — Lucy's Messaging Bridge + +A persistent Python daemon that connects a Matrix DM room to the local AI agent framework (`AiAgent`). Listens for incoming messages, forwards them to the agent, and sends responses back — all using only Python stdlib. + +## Architecture + +``` +Matrix ──sync──> MatrixClient ──events──> Bridge ──message──> AgentClient ──> AiAgent API + ^ │ │ + └─────────── response ──────────────────┘ │ + │ + typing indicator, session management, command dispatch polling loop +``` + +## Files + +| File | Purpose | +|------|---------| +| `main.py` | Entry point, CLI argument parsing | +| `config.py` | Configuration loader (JSON → dataclass) | +| `bridge.py` | Main loop, event processing, command dispatch | +| `matrix_client.py` | Matrix API client (sync, send, typing, auth) | +| `agent_client.py` | Agent API client (sessions, messages, polling) | +| `state.py` | Persistent state (session ID, batch token, dedup) | +| `config.json` | Configuration file | + +## Configuration + +Edit `config.json` to set: + +- **Matrix** — server, user ID, room ID, credentials file path +- **Agent** — API base URL +- **Bridge** — poll interval, timeouts, limits + +## Commands + +| Command | Description | +|---------|-------------| +| `/new_session` | Create a fresh agent session (clears conversation) | +| `/help` | Show available commands | + +Adding new commands: add an entry to `Bridge.COMMANDS` and a `_cmd_` method. + +## Deployment + +```bash +# Install +sudo cp -r . /opt/MatrixBridge +sudo cp matrix-bridge.service /etc/systemd/system/ + +# Start +sudo systemctl enable --now matrix-bridge +``` + +## Requirements + +Python 3.10+ with stdlib only — no external packages needed. diff --git a/agent_client.py b/agent_client.py new file mode 100644 index 0000000..c34bfba --- /dev/null +++ b/agent_client.py @@ -0,0 +1,179 @@ +"""Agent API client — communicates with the local AI agent framework. + +Provides: session management, message sending, state polling, response fetching. +Uses only Python stdlib — no external dependencies. +""" + +from __future__ import annotations + +import json +import logging +import time +import urllib.error +import urllib.request +from typing import Optional + +from config import Config + +log = logging.getLogger("matrix-bridge.agent") + + +class AgentClient: + """Client for the local agent HTTP API.""" + + def __init__(self, config: Config): + self.base_url = config.agent.base_url + self._timeout = config.bridge.agent_timeout_seconds + self._retries = config.bridge.agent_retries + + # ── Raw Request ───────────────────────────────────────────────────── + + def _request( + self, method: str, path: str, body: Optional[dict] = None + ) -> Optional[dict]: + """Make a request to the agent API with retry logic. + + Retries on transient connection errors (connection reset, timeout). + HTTP errors (4xx, 5xx) are not retried. + """ + url = f"{self.base_url}{path}" + last_error = None + + for attempt in range(self._retries): + data = json.dumps(body).encode("utf-8") if body else None + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"} if data else {}, + method=method, + ) + try: + with urllib.request.urlopen(req, timeout=self._timeout) as resp: + raw = resp.read().decode("utf-8") + return json.loads(raw) if raw.strip() else {} + except urllib.error.HTTPError as e: + err_body = e.read().decode("utf-8", errors="replace") + log.error(f"Agent API {method} {path} -> {e.code}: {err_body[:200]}") + return None + except (urllib.error.URLError, ConnectionError, TimeoutError, OSError) as e: + last_error = e + if attempt < self._retries - 1: + log.warning( + f"Agent API {method} {path} transient " + f"(attempt {attempt + 1}/{self._retries}): {e}" + ) + time.sleep(1) + else: + log.error( + f"Agent API {method} {path} failed " + f"after {self._retries} attempts: {e}" + ) + except Exception as e: + log.error(f"Agent API {method} {path} unexpected error: {e}") + return None + + return None + + # ── Session Management ────────────────────────────────────────────── + + def create_session(self) -> Optional[str]: + """Create a new agent session. Returns session ID or None.""" + result = self._request("POST", "/api/session/new") + if result and "id" in result: + log.info(f"Created session: {result['id']}") + return result["id"] + log.error("Failed to create agent session") + return None + + def rename_session(self, session_id: str, new_name: str) -> bool: + """Rename a session to identify its purpose.""" + result = self._request( + "PATCH", f"/api/session/{session_id}", {"name": new_name} + ) + if result is not None and result.get("success") is not False: + log.info(f"Renamed {session_id} -> '{new_name}'") + return True + log.warning(f"Failed to rename session {session_id}") + return False + + # ── Messaging ─────────────────────────────────────────────────────── + + def send_message(self, session_id: str, content: str) -> bool: + """Send a message to the agent. Returns True if accepted.""" + result = self._request( + "POST", + f"/api/chat/{session_id}/message", + {"message": content}, + ) + if result and result.get("ok"): + log.info(f"Forwarded to session {session_id}") + return True + log.error(f"Agent rejected message: {result}") + return False + + # ── State & Response Polling ──────────────────────────────────────── + + def get_session_state(self, session_id: str) -> Optional[dict]: + """Get the current processing state of a session.""" + return self._request("GET", f"/api/session/{session_id}/state") + + def get_session_messages(self, session_id: str) -> Optional[list]: + """Get the element list for a session.""" + result = self._request("GET", f"/api/session/{session_id}/messages") + return result if isinstance(result, list) else None + + def get_message(self, message_id: str) -> Optional[dict]: + """Get the full content of a message.""" + return self._request("GET", f"/api/message/{message_id}") + + def wait_for_response( + self, session_id: str, timeout: int = 300 + ) -> Optional[str]: + """Poll until the agent finishes, then return the last assistant message. + + **Phase 1:** Wait for ``{"streaming": true}`` to appear (agent started). + **Phase 2:** Wait for streaming to end (agent finished). + **Phase 3:** Fetch and return the last assistant message content. + + Intermediate tool calls are invisible — only the final response is returned. + """ + log.info(f"Waiting for agent (session {session_id})...") + start = time.time() + + # Phase 1: Wait for agent to start streaming + while time.time() - start < timeout: + state = self.get_session_state(session_id) + if state is None: + log.warning("Session disappeared") + return None + if state.get("streaming"): + log.info("Agent is now streaming") + break + if time.time() - start >= 3: + # Could be a very fast response (no streaming phase) + break + time.sleep(0.3) + + # Phase 2: Wait for streaming to end + while time.time() - start < timeout: + state = self.get_session_state(session_id) + if state is None: + log.warning("Session disappeared during processing") + return None + if not state.get("streaming"): + break + time.sleep(0.5) + + # Phase 3: Fetch the last assistant message + messages = self.get_session_messages(session_id) + if not messages: + log.warning("No messages in session after agent finished") + return None + + for elem in reversed(messages): + if elem.get("type") == "message": + msg = self.get_message(elem["id"]) + if msg and msg.get("role") == "assistant": + return msg.get("content", "") + + log.warning("No assistant message found") + return None diff --git a/bridge.py b/bridge.py new file mode 100644 index 0000000..2d940d1 --- /dev/null +++ b/bridge.py @@ -0,0 +1,329 @@ +"""Core bridge logic — connects Matrix messages to the agent API. + +Orchestrates the polling loop, message dispatching, command handling, +and response delivery. Designed to be extensible: add new commands by +adding entries to the COMMANDS registry. +""" + +from __future__ import annotations + +import logging +import signal +import time +from typing import Optional + +from config import Config +from state import BridgeState +from matrix_client import MatrixClient +from agent_client import AgentClient + +log = logging.getLogger("matrix-bridge.bridge") + +# ── User identity ───────────────────────────────────────────────────── +# Messages from this user are forwarded to the agent. +DANIEL_USER_ID = "@daniel:matrix.daedalus706.de" + + +class Bridge: + """Main bridge that connects Matrix messages to the agent API.""" + + def __init__(self, config: Config): + self.config = config + self.matrix = MatrixClient(config) + self.agent = AgentClient(config) + self.state = BridgeState.load(config.bridge.state_file) + self._running = True + + # Register signal handlers + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + + # ── Lifecycle ─────────────────────────────────────────────────────── + + def _signal_handler(self, signum, frame) -> None: + """Handle shutdown signals gracefully.""" + log.info(f"Received signal {signum}, shutting down...") + self._running = False + + def run(self) -> None: + """Enter the main polling loop. Blocks until shutdown.""" + log.info("=" * 50) + log.info("Matrix Bridge Daemon starting") + log.info("=" * 50) + log.info(f"State: {self.state}") + log.info(f"Room: {self.config.matrix.room_id}") + log.info(f"Agent: {self.config.agent.base_url}") + + # Bootstrap the Matrix sync batch token if needed + if not self.state.next_batch: + self._bootstrap_sync() + + # Validate saved session (framework may have restarted) + self._validate_session() + + log.info( + f"Entering main loop (poll every " + f"{self.config.bridge.poll_interval_seconds}s)" + ) + consecutive_errors = 0 + + while self._running: + try: + nb, events = self.matrix.sync( + since=self.state.next_batch, + timeout=self.config.bridge.matrix_poll_timeout, + ) + + if nb: + self.state.next_batch = nb + consecutive_errors = 0 + + # Deduplicate events + known = set(self.state.processed_event_ids) + new_events = [ + e for e in events if e.get("event_id") not in known + ] + + if new_events: + log.debug( + f"Sync: {len(events)} event(s), " + f"{len(new_events)} new" + ) + for e in new_events: + log.debug( + f" Event: type={e.get('type')} " + f"sender={e.get('sender')} " + f"id={e.get('event_id')}" + ) + if e.get("type") == "m.room.message": + log.debug( + f" Body: {e.get('content', {}).get('body', '')[:80]}" + ) + + self.state.mark_processed( + [e["event_id"] for e in new_events if e.get("event_id")], + limit=self.config.bridge.processed_ids_limit, + ) + self.state.save(self.config.bridge.state_file) + + self._process_events(new_events) + else: + log.debug( + f"Sync OK, 0 new events (batch: {nb[:35]}...)" + ) + else: + consecutive_errors += 1 + if consecutive_errors > 10: + log.warning( + f"{consecutive_errors} consecutive errors, " + f"backing off" + ) + time.sleep(10) + consecutive_errors = 0 + + except Exception as e: + log.error(f"Unexpected error in main loop: {e}", exc_info=True) + consecutive_errors += 1 + + time.sleep(self.config.bridge.poll_interval_seconds) + + log.info("Bridge stopped gracefully") + + # ── Initialization Helpers ────────────────────────────────────────── + + def _bootstrap_sync(self) -> None: + """Perform an initial sync to get the first batch token.""" + log.info("No batch token — performing initial sync...") + nb, _ = self.matrix.sync(timeout=self.config.bridge.matrix_poll_timeout) + if nb: + self.state.next_batch = nb + self.state.save(self.config.bridge.state_file) + log.info(f"Got initial batch: {nb[:40]}...") + else: + log.warning("Initial sync failed, will retry in main loop") + + def _validate_session(self) -> None: + """Check if the saved session is still valid on the agent.""" + sid = self.state.session_id + if sid: + state = self.agent.get_session_state(sid) + if state is None: + log.warning( + f"Saved session {sid} no longer valid, clearing" + ) + self.state.session_id = None + self.state.processed_event_ids = [] + self.state.save(self.config.bridge.state_file) + + # ── Event Processing ──────────────────────────────────────────────── + + DANIEL_USER_ID = DANIEL_USER_ID + + @staticmethod + def _is_from_daniel(event: dict) -> bool: + """Check if a Matrix event is a text message from Daniel.""" + return ( + event.get("type") == "m.room.message" + and event.get("content", {}).get("msgtype") == "m.text" + and event.get("sender") == DANIEL_USER_ID + ) + + def _process_events(self, events: list[dict]) -> None: + """Process a batch of new Matrix events. + + Extracts Daniel's text messages, checks for commands, + bundles them, and forwards to the agent. + """ + daniel_msgs = [ + e for e in events if self._is_from_daniel(e) + ] + if not daniel_msgs: + return + + # Sort chronologically + daniel_msgs.sort(key=lambda e: e.get("origin_server_ts", 0)) + texts = [e.get("content", {}).get("body", "") for e in daniel_msgs] + log.info(f"Received {len(texts)} message(s) from Daniel") + + # Check for commands first + remaining = self._dispatch_commands(texts) + if not remaining: + return + + # Forward remaining texts to the agent + self._forward_to_agent(remaining) + + # ── Command System ────────────────────────────────────────────────── + + COMMANDS = { + "/new_session": "Create a fresh agent session (clears conversation history)", + "/help": "Show this list of available commands", + } + + def _dispatch_commands(self, texts: list[str]) -> list[str]: + """Check messages for commands and handle them. + + Returns the list of texts that were NOT handled as commands, + so they can be forwarded to the agent. + """ + remaining = [] + for text in texts: + stripped = text.strip() + if stripped == "/new_session": + self._cmd_new_session() + elif stripped == "/help": + self._cmd_help() + else: + remaining.append(text) + return remaining + + def _cmd_new_session(self) -> None: + """Handle /new_session: create a fresh session and notify the user.""" + log.info("Executing /new_session command") + session_id = self.agent.create_session() + if session_id: + self.agent.rename_session(session_id, f"Matrix {session_id}") + self.state.session_id = session_id + self.state.save(self.config.bridge.state_file) + self.matrix.send_message("🔄 New session created!") + else: + self.matrix.send_message( + "❌ Failed to create new session. Check logs." + ) + + def _cmd_help(self) -> None: + """Handle /help: list available commands.""" + lines = ["**Available commands:**"] + for cmd, desc in self.COMMANDS.items(): + lines.append(f"- `{cmd}` — {desc}") + self.matrix.send_message("\n".join(lines)) + + # ── Agent Forwarding ──────────────────────────────────────────────── + + def _ensure_session(self) -> Optional[str]: + """Get or create an agent session. Returns session ID or None.""" + session_id = self.state.session_id + + if not session_id: + log.info("No active session, creating one") + session_id = self.agent.create_session() + if not session_id: + return None + self.agent.rename_session(session_id, f"Matrix {session_id}") + self.state.session_id = session_id + self.state.save(self.config.bridge.state_file) + return session_id + + # Verify the session still exists + if self.agent.get_session_state(session_id) is None: + log.warning(f"Session {session_id} gone, creating new one") + session_id = self.agent.create_session() + if not session_id: + return None + self.agent.rename_session(session_id, f"Matrix {session_id}") + self.state.session_id = session_id + self.state.save(self.config.bridge.state_file) + + return session_id + + @staticmethod + def _bundle_messages(texts: list[str]) -> str: + """Combine multiple messages into a single agent prompt.""" + if len(texts) == 1: + return texts[0] + + parts = [ + f"--- Message {i} ---\n{t}" + for i, t in enumerate(texts, 1) + ] + return "Your human sent these messages at once:\n\n" + "\n\n".join(parts) + + def _forward_to_agent(self, texts: list[str]) -> None: + """Send texts to the agent and relay the response back to Matrix.""" + session_id = self._ensure_session() + if not session_id: + self.matrix.send_message( + "❌ Failed to create agent session. Check logs." + ) + return + + prompt = self._bundle_messages(texts) + log.info( + f"Sending {len(texts)} message(s) to agent: " + f"{prompt[:120]}..." + ) + + # Show typing indicator while processing + self.matrix.set_typing(True) + + try: + if not self.agent.send_message(session_id, prompt): + self.matrix.send_message( + "❌ Failed to send message to agent." + ) + return + + response = self.agent.wait_for_response( + session_id, + timeout=self.config.bridge.agent_response_timeout, + ) + finally: + self.matrix.set_typing(False) + + if response: + self._send_response(response) + else: + log.warning("Agent returned no response — not sending to Matrix") + + def _send_response(self, text: str) -> None: + """Send the agent's response to Matrix, splitting long messages.""" + max_chunk = self.config.bridge.max_message_length + if len(text) <= max_chunk: + self.matrix.send_message(text) + else: + for i in range(0, len(text), max_chunk): + chunk = text[i : i + max_chunk] + if i == 0: + self.matrix.send_message(chunk) + else: + self.matrix.send_message(f"(continued)\n{chunk}") diff --git a/config.json b/config.json new file mode 100644 index 0000000..152421d --- /dev/null +++ b/config.json @@ -0,0 +1,21 @@ +{ + "matrix": { + "server": "matrix.daedalus706.de", + "user_id": "@lucy:matrix.daedalus706.de", + "room_id": "!8zLJacfaL2WYoYVp:matrix.daedalus706.de", + "credentials_file": "/home/admin/auth/matrix-credentials.txt" + }, + "agent": { + "base_url": "http://10.0.1.2:8080" + }, + "bridge": { + "poll_interval_seconds": 1, + "state_file": "/home/admin/agent-dir/bridge-state.json", + "matrix_poll_timeout": 0, + "agent_timeout_seconds": 30, + "agent_retries": 3, + "max_message_length": 40000, + "processed_ids_limit": 200, + "agent_response_timeout": 300 + } +} diff --git a/config.py b/config.py new file mode 100644 index 0000000..92ef16f --- /dev/null +++ b/config.py @@ -0,0 +1,59 @@ +"""Configuration loader for the Matrix Bridge. + +Loads settings from config.json and provides typed access via dataclasses. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass, field +from typing import Optional + + +DEFAULT_CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json") + + +@dataclass +class MatrixConfig: + server: str + user_id: str + room_id: str + credentials_file: str + + +@dataclass +class AgentConfig: + base_url: str + + +@dataclass +class BridgeConfig: + poll_interval_seconds: int = 1 + state_file: str = "/home/admin/agent-dir/bridge-state.json" + matrix_poll_timeout: int = 0 + agent_timeout_seconds: int = 30 + agent_retries: int = 3 + max_message_length: int = 40000 + processed_ids_limit: int = 200 + agent_response_timeout: int = 300 + + +@dataclass +class Config: + matrix: MatrixConfig + agent: AgentConfig + bridge: BridgeConfig = field(default_factory=BridgeConfig) + + @classmethod + def load(cls, path: str = DEFAULT_CONFIG_PATH) -> "Config": + """Load configuration from a JSON file.""" + with open(path, "r") as f: + data = json.load(f) + + matrix = MatrixConfig(**data.get("matrix", {})) + agent = AgentConfig(**data.get("agent", {})) + bridge_data = data.get("bridge", {}) + bridge = BridgeConfig(**bridge_data) + + return cls(matrix=matrix, agent=agent, bridge=bridge) diff --git a/main.py b/main.py new file mode 100644 index 0000000..50eab59 --- /dev/null +++ b/main.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +""" +Matrix Bridge Daemon — connects Lucy's Matrix DM to the local agent API. + +Polling loop that listens for new messages in a Matrix room, forwards them +to the AI agent framework, and sends responses back. + +Usage: + python3 main.py # Normal mode + python3 main.py --config path/to.json # Custom config + python3 main.py --debug # Verbose logging +""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys + +from config import Config +from bridge import Bridge + + +def setup_logging(debug: bool = False) -> None: + """Configure logging format and level.""" + level = logging.DEBUG if debug else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Matrix Bridge Daemon — connects Matrix DMs to the agent API", + ) + parser.add_argument( + "--config", + default=os.path.join(os.path.dirname(__file__), "config.json"), + help="Path to configuration file (default: config.json)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable verbose DEBUG-level logging", + ) + args = parser.parse_args() + + setup_logging(debug=args.debug) + log = logging.getLogger("matrix-bridge") + + # Load configuration + try: + config = Config.load(args.config) + except Exception as e: + log.error(f"Failed to load config from {args.config}: {e}") + sys.exit(1) + + # Create and run the bridge + bridge = Bridge(config) + + # Verify agent API is reachable + if bridge.agent._request("GET", "/api/sessions") is not None: + log.info("Agent API is reachable") + else: + log.warning("Agent API not reachable — will retry in main loop") + + bridge.run() + + +if __name__ == "__main__": + main() diff --git a/matrix_client.py b/matrix_client.py new file mode 100644 index 0000000..27c545d --- /dev/null +++ b/matrix_client.py @@ -0,0 +1,206 @@ +"""Matrix API client — handles all communication with the Matrix server. + +Provides: sync, send messages, typing indicators, token refresh. +Uses only Python stdlib — no external dependencies. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +import urllib.error +import urllib.request +from typing import Optional +from urllib.parse import urlencode + +from config import Config + +log = logging.getLogger("matrix-bridge.matrix") + + +class MatrixClient: + """Client for communicating with a Matrix homeserver.""" + + def __init__(self, config: Config): + self.server = config.matrix.server + self.user_id = config.matrix.user_id + self.room_id = config.matrix.room_id + self._creds_file = config.matrix.credentials_file + self._access_token: Optional[str] = None + self._load_credentials() + + # ── Credential Management ─────────────────────────────────────────── + + def _load_credentials(self) -> None: + """Read access token from the credentials file.""" + creds = self._parse_creds_file() + self._access_token = creds.get("Access Token") + if not self._access_token: + raise RuntimeError("No Access Token found in credentials file") + + def _parse_creds_file(self) -> dict[str, str]: + """Parse key: value pairs from the credentials file.""" + creds = {} + with open(self._creds_file, "r") as f: + for line in f: + if ":" in line: + key, _, value = line.partition(":") + creds[key.strip()] = value.strip() + return creds + + def _refresh_token(self) -> bool: + """Re-authenticate to Matrix and update the credentials file.""" + creds = self._parse_creds_file() + password = creds.get("Password") + username = creds.get("Username") + if not password or not username: + log.error("Cannot refresh token: missing username or password") + return False + + url = f"https://{self.server}/_matrix/client/v3/login" + payload = { + "type": "m.login.password", + "user": username, + "password": password, + } + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url, data=data, + headers={"Content-Type": "application/json"}, + ) + + try: + with urllib.request.urlopen(req) as resp: + result = json.loads(resp.read()) + new_token = result["access_token"] + + # Persist to credentials file + with open(self._creds_file, "r") as f: + content = f.read() + content = content.replace( + creds.get("Access Token", ""), new_token + ) + with open(self._creds_file, "w") as f: + f.write(content) + + self._access_token = new_token + log.info("Matrix token refreshed") + return True + except Exception as e: + log.error(f"Matrix re-login failed: {e}") + return False + + # ── Raw Request ───────────────────────────────────────────────────── + + def _request( + self, method: str, path: str, body: Optional[dict] = None + ) -> Optional[dict]: + """Make an authenticated request to the Matrix API. + + Handles token expiry by re-authenticating and retrying once. + Returns parsed JSON on success, None on failure. + """ + url = f"https://{self.server}{path}" + data = json.dumps(body).encode("utf-8") if body else None + + req = urllib.request.Request( + url, data=data, + headers={ + "Content-Type": "application/json" if data else "", + "Authorization": f"Bearer {self._access_token}", + }, + method=method, + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + raw = resp.read().decode("utf-8") + return json.loads(raw) if raw.strip() else {} + except urllib.error.HTTPError as e: + err_body = e.read().decode("utf-8", errors="replace") + if "M_UNKNOWN_TOKEN" in err_body: + log.info("Token expired, refreshing...") + if self._refresh_token(): + return self._request(method, path, body) + log.error(f"Matrix {method} {path} -> HTTP {e.code}: {err_body[:200]}") + return None + except Exception as e: + log.error(f"Matrix {method} {path} failed: {e}") + return None + + # ── Sync ──────────────────────────────────────────────────────────── + + def sync( + self, since: Optional[str] = None, timeout: int = 0 + ) -> tuple[Optional[str], list[dict]]: + """Poll the Matrix sync API. + + Args: + since: Batch token from the previous sync (None for initial). + timeout: Server-side timeout in ms (0 = return immediately). + + Returns: + (next_batch_token, list_of_events) or (None, []) on error. + """ + params = {"timeout": str(timeout)} + if since: + params["since"] = since + + result = self._request( + "GET", f"/_matrix/client/v3/sync?{urlencode(params)}" + ) + if result is None: + return None, [] + + next_batch = result.get("next_batch") + + # Collect all timeline events from joined rooms + events = [] + for room_data in result.get("rooms", {}).get("join", {}).values(): + events.extend( + room_data.get("timeline", {}).get("events", []) + ) + + return next_batch, events + + # ── Send Message ──────────────────────────────────────────────────── + + def send_message(self, text: str) -> bool: + """Send a text message to the configured room. Returns True on success.""" + txn_id = f"lucy-bridge-{int(time.time())}-{os.getpid()}" + path = ( + f"/_matrix/client/v3/rooms/" + f"{self.room_id}/send/m.room.message/{txn_id}" + ) + payload = {"msgtype": "m.text", "body": text} + + result = self._request("PUT", path, payload) + if result and "event_id" in result: + log.info(f"Sent to Matrix, event_id: {result['event_id']}") + return True + return False + + # ── Typing Indicator ──────────────────────────────────────────────── + + def set_typing(self, typing: bool, timeout_ms: int = 20000) -> bool: + """Show or hide the typing indicator in the configured room. + + This is best-effort — failures are logged at DEBUG level since + typing indicators are non-critical. + """ + path = ( + f"/_matrix/client/v3/rooms/" + f"{self.room_id}/typing/{self.user_id}" + ) + payload: dict = {"typing": typing} + if typing: + payload["timeout"] = timeout_ms + + result = self._request("PUT", path, payload) + if result is not None: + log.debug(f"Typing {'on' if typing else 'off'}") + return True + log.debug("Typing indicator failed (non-critical)") + return False diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e9bd45e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +# Matrix Bridge — No external dependencies required. +# Uses only Python stdlib (urllib, json, logging, etc.). diff --git a/state.py b/state.py new file mode 100644 index 0000000..d2eac37 --- /dev/null +++ b/state.py @@ -0,0 +1,74 @@ +"""Persistent state management for the bridge. + +Saves and loads session ID, Matrix sync batch token, and processed event IDs +so the bridge survives crashes and restarts without losing context. +""" + +from __future__ import annotations + +import json +import logging +import os +from typing import Optional + +log = logging.getLogger("matrix-bridge.state") + + +class BridgeState: + """Persistent state that survives bridge restarts.""" + + def __init__( + self, + session_id: Optional[str] = None, + next_batch: Optional[str] = None, + processed_event_ids: Optional[list[str]] = None, + ): + self.session_id = session_id + self.next_batch = next_batch + self.processed_event_ids = processed_event_ids or [] + + @classmethod + def load(cls, path: str) -> "BridgeState": + """Load state from disk, or return defaults.""" + if os.path.exists(path): + try: + with open(path, "r") as f: + data = json.load(f) + return cls( + session_id=data.get("session_id"), + next_batch=data.get("next_batch"), + processed_event_ids=data.get("processed_event_ids", []), + ) + except (json.JSONDecodeError, IOError) as e: + log.warning(f"Failed to load state file: {e}") + return cls() + + def save(self, path: str) -> None: + """Persist state to disk.""" + data = { + "session_id": self.session_id, + "next_batch": self.next_batch, + "processed_event_ids": self.processed_event_ids, + } + try: + with open(path, "w") as f: + json.dump(data, f, indent=2) + except IOError as e: + log.error(f"Failed to save state: {e}") + + def mark_processed(self, event_ids: list[str], limit: int = 200) -> None: + """Add event IDs to the processed set and trim to prevent bloat.""" + processed = set(self.processed_event_ids) + for eid in event_ids: + processed.add(eid) + if len(processed) > limit: + processed = set(list(processed)[-limit:]) + self.processed_event_ids = list(processed) + + def __repr__(self) -> str: + return ( + f"BridgeState(" + f"session={self.session_id}, " + f"batch={'set' if self.next_batch else 'unset'}, " + f"processed={len(self.processed_event_ids)})" + )