Files
MatrixBridge/bridge.py
Lucy 751584c99d Initial commit: Matrix Bridge Daemon
Persistent Python daemon connecting Matrix DM room to AiAgent API.

- Config-driven (JSON config file)
- Extensible command system (/new_session, /help)
- Typing indicators while agent processes
- Session auto-naming for identification
- Persistent state across restarts
- Token refresh, retry logic, error handling
- Python stdlib only — no external dependencies
2026-05-15 13:45:24 +02:00

330 lines
12 KiB
Python

"""Core bridge logic — connects Matrix messages to the agent API.
Orchestrates the polling loop, message dispatching, command handling,
and response delivery. Designed to be extensible: add new commands by
adding entries to the COMMANDS registry.
"""
from __future__ import annotations
import logging
import signal
import time
from typing import Optional
from config import Config
from state import BridgeState
from matrix_client import MatrixClient
from agent_client import AgentClient
log = logging.getLogger("matrix-bridge.bridge")
# ── User identity ─────────────────────────────────────────────────────
# Messages from this user are forwarded to the agent.
DANIEL_USER_ID = "@daniel:matrix.daedalus706.de"
class Bridge:
"""Main bridge that connects Matrix messages to the agent API."""
def __init__(self, config: Config):
self.config = config
self.matrix = MatrixClient(config)
self.agent = AgentClient(config)
self.state = BridgeState.load(config.bridge.state_file)
self._running = True
# Register signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
# ── Lifecycle ───────────────────────────────────────────────────────
def _signal_handler(self, signum, frame) -> None:
"""Handle shutdown signals gracefully."""
log.info(f"Received signal {signum}, shutting down...")
self._running = False
def run(self) -> None:
"""Enter the main polling loop. Blocks until shutdown."""
log.info("=" * 50)
log.info("Matrix Bridge Daemon starting")
log.info("=" * 50)
log.info(f"State: {self.state}")
log.info(f"Room: {self.config.matrix.room_id}")
log.info(f"Agent: {self.config.agent.base_url}")
# Bootstrap the Matrix sync batch token if needed
if not self.state.next_batch:
self._bootstrap_sync()
# Validate saved session (framework may have restarted)
self._validate_session()
log.info(
f"Entering main loop (poll every "
f"{self.config.bridge.poll_interval_seconds}s)"
)
consecutive_errors = 0
while self._running:
try:
nb, events = self.matrix.sync(
since=self.state.next_batch,
timeout=self.config.bridge.matrix_poll_timeout,
)
if nb:
self.state.next_batch = nb
consecutive_errors = 0
# Deduplicate events
known = set(self.state.processed_event_ids)
new_events = [
e for e in events if e.get("event_id") not in known
]
if new_events:
log.debug(
f"Sync: {len(events)} event(s), "
f"{len(new_events)} new"
)
for e in new_events:
log.debug(
f" Event: type={e.get('type')} "
f"sender={e.get('sender')} "
f"id={e.get('event_id')}"
)
if e.get("type") == "m.room.message":
log.debug(
f" Body: {e.get('content', {}).get('body', '')[:80]}"
)
self.state.mark_processed(
[e["event_id"] for e in new_events if e.get("event_id")],
limit=self.config.bridge.processed_ids_limit,
)
self.state.save(self.config.bridge.state_file)
self._process_events(new_events)
else:
log.debug(
f"Sync OK, 0 new events (batch: {nb[:35]}...)"
)
else:
consecutive_errors += 1
if consecutive_errors > 10:
log.warning(
f"{consecutive_errors} consecutive errors, "
f"backing off"
)
time.sleep(10)
consecutive_errors = 0
except Exception as e:
log.error(f"Unexpected error in main loop: {e}", exc_info=True)
consecutive_errors += 1
time.sleep(self.config.bridge.poll_interval_seconds)
log.info("Bridge stopped gracefully")
# ── Initialization Helpers ──────────────────────────────────────────
def _bootstrap_sync(self) -> None:
"""Perform an initial sync to get the first batch token."""
log.info("No batch token — performing initial sync...")
nb, _ = self.matrix.sync(timeout=self.config.bridge.matrix_poll_timeout)
if nb:
self.state.next_batch = nb
self.state.save(self.config.bridge.state_file)
log.info(f"Got initial batch: {nb[:40]}...")
else:
log.warning("Initial sync failed, will retry in main loop")
def _validate_session(self) -> None:
"""Check if the saved session is still valid on the agent."""
sid = self.state.session_id
if sid:
state = self.agent.get_session_state(sid)
if state is None:
log.warning(
f"Saved session {sid} no longer valid, clearing"
)
self.state.session_id = None
self.state.processed_event_ids = []
self.state.save(self.config.bridge.state_file)
# ── Event Processing ────────────────────────────────────────────────
DANIEL_USER_ID = DANIEL_USER_ID
@staticmethod
def _is_from_daniel(event: dict) -> bool:
"""Check if a Matrix event is a text message from Daniel."""
return (
event.get("type") == "m.room.message"
and event.get("content", {}).get("msgtype") == "m.text"
and event.get("sender") == DANIEL_USER_ID
)
def _process_events(self, events: list[dict]) -> None:
"""Process a batch of new Matrix events.
Extracts Daniel's text messages, checks for commands,
bundles them, and forwards to the agent.
"""
daniel_msgs = [
e for e in events if self._is_from_daniel(e)
]
if not daniel_msgs:
return
# Sort chronologically
daniel_msgs.sort(key=lambda e: e.get("origin_server_ts", 0))
texts = [e.get("content", {}).get("body", "") for e in daniel_msgs]
log.info(f"Received {len(texts)} message(s) from Daniel")
# Check for commands first
remaining = self._dispatch_commands(texts)
if not remaining:
return
# Forward remaining texts to the agent
self._forward_to_agent(remaining)
# ── Command System ──────────────────────────────────────────────────
COMMANDS = {
"/new_session": "Create a fresh agent session (clears conversation history)",
"/help": "Show this list of available commands",
}
def _dispatch_commands(self, texts: list[str]) -> list[str]:
"""Check messages for commands and handle them.
Returns the list of texts that were NOT handled as commands,
so they can be forwarded to the agent.
"""
remaining = []
for text in texts:
stripped = text.strip()
if stripped == "/new_session":
self._cmd_new_session()
elif stripped == "/help":
self._cmd_help()
else:
remaining.append(text)
return remaining
def _cmd_new_session(self) -> None:
"""Handle /new_session: create a fresh session and notify the user."""
log.info("Executing /new_session command")
session_id = self.agent.create_session()
if session_id:
self.agent.rename_session(session_id, f"Matrix {session_id}")
self.state.session_id = session_id
self.state.save(self.config.bridge.state_file)
self.matrix.send_message("🔄 New session created!")
else:
self.matrix.send_message(
"❌ Failed to create new session. Check logs."
)
def _cmd_help(self) -> None:
"""Handle /help: list available commands."""
lines = ["**Available commands:**"]
for cmd, desc in self.COMMANDS.items():
lines.append(f"- `{cmd}` — {desc}")
self.matrix.send_message("\n".join(lines))
# ── Agent Forwarding ────────────────────────────────────────────────
def _ensure_session(self) -> Optional[str]:
"""Get or create an agent session. Returns session ID or None."""
session_id = self.state.session_id
if not session_id:
log.info("No active session, creating one")
session_id = self.agent.create_session()
if not session_id:
return None
self.agent.rename_session(session_id, f"Matrix {session_id}")
self.state.session_id = session_id
self.state.save(self.config.bridge.state_file)
return session_id
# Verify the session still exists
if self.agent.get_session_state(session_id) is None:
log.warning(f"Session {session_id} gone, creating new one")
session_id = self.agent.create_session()
if not session_id:
return None
self.agent.rename_session(session_id, f"Matrix {session_id}")
self.state.session_id = session_id
self.state.save(self.config.bridge.state_file)
return session_id
@staticmethod
def _bundle_messages(texts: list[str]) -> str:
"""Combine multiple messages into a single agent prompt."""
if len(texts) == 1:
return texts[0]
parts = [
f"--- Message {i} ---\n{t}"
for i, t in enumerate(texts, 1)
]
return "Your human sent these messages at once:\n\n" + "\n\n".join(parts)
def _forward_to_agent(self, texts: list[str]) -> None:
"""Send texts to the agent and relay the response back to Matrix."""
session_id = self._ensure_session()
if not session_id:
self.matrix.send_message(
"❌ Failed to create agent session. Check logs."
)
return
prompt = self._bundle_messages(texts)
log.info(
f"Sending {len(texts)} message(s) to agent: "
f"{prompt[:120]}..."
)
# Show typing indicator while processing
self.matrix.set_typing(True)
try:
if not self.agent.send_message(session_id, prompt):
self.matrix.send_message(
"❌ Failed to send message to agent."
)
return
response = self.agent.wait_for_response(
session_id,
timeout=self.config.bridge.agent_response_timeout,
)
finally:
self.matrix.set_typing(False)
if response:
self._send_response(response)
else:
log.warning("Agent returned no response — not sending to Matrix")
def _send_response(self, text: str) -> None:
"""Send the agent's response to Matrix, splitting long messages."""
max_chunk = self.config.bridge.max_message_length
if len(text) <= max_chunk:
self.matrix.send_message(text)
else:
for i in range(0, len(text), max_chunk):
chunk = text[i : i + max_chunk]
if i == 0:
self.matrix.send_message(chunk)
else:
self.matrix.send_message(f"(continued)\n{chunk}")