"""Core bridge logic — connects Matrix messages to the agent API. Orchestrates the polling loop, message dispatching, command handling, and response delivery. Designed to be extensible: add new commands by adding entries to the COMMANDS registry. """ from __future__ import annotations import logging import signal import time from typing import Optional from config import Config from state import BridgeState from matrix_client import MatrixClient from agent_client import AgentClient log = logging.getLogger("matrix-bridge.bridge") # ── User identity ───────────────────────────────────────────────────── # Messages from this user are forwarded to the agent. DANIEL_USER_ID = "@daniel:matrix.daedalus706.de" class Bridge: """Main bridge that connects Matrix messages to the agent API.""" def __init__(self, config: Config): self.config = config self.matrix = MatrixClient(config) self.agent = AgentClient(config) self.state = BridgeState.load(config.bridge.state_file) self._running = True # Register signal handlers signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) # ── Lifecycle ─────────────────────────────────────────────────────── def _signal_handler(self, signum, frame) -> None: """Handle shutdown signals gracefully.""" log.info(f"Received signal {signum}, shutting down...") self._running = False def run(self) -> None: """Enter the main polling loop. Blocks until shutdown.""" log.info("=" * 50) log.info("Matrix Bridge Daemon starting") log.info("=" * 50) log.info(f"State: {self.state}") log.info(f"Room: {self.config.matrix.room_id}") log.info(f"Agent: {self.config.agent.base_url}") # Bootstrap the Matrix sync batch token if needed if not self.state.next_batch: self._bootstrap_sync() # Validate saved session (framework may have restarted) self._validate_session() log.info( f"Entering main loop (poll every " f"{self.config.bridge.poll_interval_seconds}s)" ) consecutive_errors = 0 while self._running: try: nb, events = self.matrix.sync( since=self.state.next_batch, timeout=self.config.bridge.matrix_poll_timeout, ) if nb: self.state.next_batch = nb consecutive_errors = 0 # Deduplicate events known = set(self.state.processed_event_ids) new_events = [ e for e in events if e.get("event_id") not in known ] if new_events: log.debug( f"Sync: {len(events)} event(s), " f"{len(new_events)} new" ) for e in new_events: log.debug( f" Event: type={e.get('type')} " f"sender={e.get('sender')} " f"id={e.get('event_id')}" ) if e.get("type") == "m.room.message": log.debug( f" Body: {e.get('content', {}).get('body', '')[:80]}" ) self.state.mark_processed( [e["event_id"] for e in new_events if e.get("event_id")], limit=self.config.bridge.processed_ids_limit, ) self.state.save(self.config.bridge.state_file) self._process_events(new_events) else: log.debug( f"Sync OK, 0 new events (batch: {nb[:35]}...)" ) else: consecutive_errors += 1 if consecutive_errors > 10: log.warning( f"{consecutive_errors} consecutive errors, " f"backing off" ) time.sleep(10) consecutive_errors = 0 except Exception as e: log.error(f"Unexpected error in main loop: {e}", exc_info=True) consecutive_errors += 1 time.sleep(self.config.bridge.poll_interval_seconds) log.info("Bridge stopped gracefully") # ── Initialization Helpers ────────────────────────────────────────── def _bootstrap_sync(self) -> None: """Perform an initial sync to get the first batch token.""" log.info("No batch token — performing initial sync...") nb, _ = self.matrix.sync(timeout=self.config.bridge.matrix_poll_timeout) if nb: self.state.next_batch = nb self.state.save(self.config.bridge.state_file) log.info(f"Got initial batch: {nb[:40]}...") else: log.warning("Initial sync failed, will retry in main loop") def _validate_session(self) -> None: """Check if the saved session is still valid on the agent.""" sid = self.state.session_id if sid: state = self.agent.get_session_state(sid) if state is None: log.warning( f"Saved session {sid} no longer valid, clearing" ) self.state.session_id = None self.state.processed_event_ids = [] self.state.save(self.config.bridge.state_file) # ── Event Processing ──────────────────────────────────────────────── DANIEL_USER_ID = DANIEL_USER_ID @staticmethod def _is_from_daniel(event: dict) -> bool: """Check if a Matrix event is a text message from Daniel.""" return ( event.get("type") == "m.room.message" and event.get("content", {}).get("msgtype") == "m.text" and event.get("sender") == DANIEL_USER_ID ) def _process_events(self, events: list[dict]) -> None: """Process a batch of new Matrix events. Extracts Daniel's text messages, checks for commands, bundles them, and forwards to the agent. """ daniel_msgs = [ e for e in events if self._is_from_daniel(e) ] if not daniel_msgs: return # Sort chronologically daniel_msgs.sort(key=lambda e: e.get("origin_server_ts", 0)) texts = [e.get("content", {}).get("body", "") for e in daniel_msgs] log.info(f"Received {len(texts)} message(s) from Daniel") # Check for commands first remaining = self._dispatch_commands(texts) if not remaining: return # Forward remaining texts to the agent self._forward_to_agent(remaining) # ── Command System ────────────────────────────────────────────────── COMMANDS = { "/new_session": "Create a fresh agent session (clears conversation history)", "/help": "Show this list of available commands", } def _dispatch_commands(self, texts: list[str]) -> list[str]: """Check messages for commands and handle them. Returns the list of texts that were NOT handled as commands, so they can be forwarded to the agent. """ remaining = [] for text in texts: stripped = text.strip() if stripped == "/new_session": self._cmd_new_session() elif stripped == "/help": self._cmd_help() else: remaining.append(text) return remaining def _cmd_new_session(self) -> None: """Handle /new_session: create a fresh session and notify the user.""" log.info("Executing /new_session command") session_id = self.agent.create_session() if session_id: self.agent.rename_session(session_id, f"Matrix {session_id}") self.state.session_id = session_id self.state.save(self.config.bridge.state_file) self.matrix.send_message("🔄 New session created!") else: self.matrix.send_message( "❌ Failed to create new session. Check logs." ) def _cmd_help(self) -> None: """Handle /help: list available commands.""" lines = ["**Available commands:**"] for cmd, desc in self.COMMANDS.items(): lines.append(f"- `{cmd}` — {desc}") self.matrix.send_message("\n".join(lines)) # ── Agent Forwarding ──────────────────────────────────────────────── def _ensure_session(self) -> Optional[str]: """Get or create an agent session. Returns session ID or None.""" session_id = self.state.session_id if not session_id: log.info("No active session, creating one") session_id = self.agent.create_session() if not session_id: return None self.agent.rename_session(session_id, f"Matrix {session_id}") self.state.session_id = session_id self.state.save(self.config.bridge.state_file) return session_id # Verify the session still exists if self.agent.get_session_state(session_id) is None: log.warning(f"Session {session_id} gone, creating new one") session_id = self.agent.create_session() if not session_id: return None self.agent.rename_session(session_id, f"Matrix {session_id}") self.state.session_id = session_id self.state.save(self.config.bridge.state_file) return session_id @staticmethod def _bundle_messages(texts: list[str]) -> str: """Combine multiple messages into a single agent prompt.""" if len(texts) == 1: return texts[0] parts = [ f"--- Message {i} ---\n{t}" for i, t in enumerate(texts, 1) ] return "Your human sent these messages at once:\n\n" + "\n\n".join(parts) def _forward_to_agent(self, texts: list[str]) -> None: """Send texts to the agent and relay the response back to Matrix.""" session_id = self._ensure_session() if not session_id: self.matrix.send_message( "❌ Failed to create agent session. Check logs." ) return prompt = self._bundle_messages(texts) log.info( f"Sending {len(texts)} message(s) to agent: " f"{prompt[:120]}..." ) # Show typing indicator while processing self.matrix.set_typing(True) try: if not self.agent.send_message(session_id, prompt): self.matrix.send_message( "❌ Failed to send message to agent." ) return response = self.agent.wait_for_response( session_id, timeout=self.config.bridge.agent_response_timeout, ) finally: self.matrix.set_typing(False) if response: self._send_response(response) else: log.warning("Agent returned no response — not sending to Matrix") def _send_response(self, text: str) -> None: """Send the agent's response to Matrix, splitting long messages.""" max_chunk = self.config.bridge.max_message_length if len(text) <= max_chunk: self.matrix.send_message(text) else: for i in range(0, len(text), max_chunk): chunk = text[i : i + max_chunk] if i == 0: self.matrix.send_message(chunk) else: self.matrix.send_message(f"(continued)\n{chunk}")