Persistent Python daemon connecting Matrix DM room to AiAgent API. - Config-driven (JSON config file) - Extensible command system (/new_session, /help) - Typing indicators while agent processes - Session auto-naming for identification - Persistent state across restarts - Token refresh, retry logic, error handling - Python stdlib only — no external dependencies
330 lines
12 KiB
Python
330 lines
12 KiB
Python
"""Core bridge logic — connects Matrix messages to the agent API.
|
|
|
|
Orchestrates the polling loop, message dispatching, command handling,
|
|
and response delivery. Designed to be extensible: add new commands by
|
|
adding entries to the COMMANDS registry.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import signal
|
|
import time
|
|
from typing import Optional
|
|
|
|
from config import Config
|
|
from state import BridgeState
|
|
from matrix_client import MatrixClient
|
|
from agent_client import AgentClient
|
|
|
|
log = logging.getLogger("matrix-bridge.bridge")
|
|
|
|
# ── User identity ─────────────────────────────────────────────────────
|
|
# Messages from this user are forwarded to the agent.
|
|
DANIEL_USER_ID = "@daniel:matrix.daedalus706.de"
|
|
|
|
|
|
class Bridge:
|
|
"""Main bridge that connects Matrix messages to the agent API."""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.matrix = MatrixClient(config)
|
|
self.agent = AgentClient(config)
|
|
self.state = BridgeState.load(config.bridge.state_file)
|
|
self._running = True
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
|
|
# ── Lifecycle ───────────────────────────────────────────────────────
|
|
|
|
def _signal_handler(self, signum, frame) -> None:
|
|
"""Handle shutdown signals gracefully."""
|
|
log.info(f"Received signal {signum}, shutting down...")
|
|
self._running = False
|
|
|
|
def run(self) -> None:
|
|
"""Enter the main polling loop. Blocks until shutdown."""
|
|
log.info("=" * 50)
|
|
log.info("Matrix Bridge Daemon starting")
|
|
log.info("=" * 50)
|
|
log.info(f"State: {self.state}")
|
|
log.info(f"Room: {self.config.matrix.room_id}")
|
|
log.info(f"Agent: {self.config.agent.base_url}")
|
|
|
|
# Bootstrap the Matrix sync batch token if needed
|
|
if not self.state.next_batch:
|
|
self._bootstrap_sync()
|
|
|
|
# Validate saved session (framework may have restarted)
|
|
self._validate_session()
|
|
|
|
log.info(
|
|
f"Entering main loop (poll every "
|
|
f"{self.config.bridge.poll_interval_seconds}s)"
|
|
)
|
|
consecutive_errors = 0
|
|
|
|
while self._running:
|
|
try:
|
|
nb, events = self.matrix.sync(
|
|
since=self.state.next_batch,
|
|
timeout=self.config.bridge.matrix_poll_timeout,
|
|
)
|
|
|
|
if nb:
|
|
self.state.next_batch = nb
|
|
consecutive_errors = 0
|
|
|
|
# Deduplicate events
|
|
known = set(self.state.processed_event_ids)
|
|
new_events = [
|
|
e for e in events if e.get("event_id") not in known
|
|
]
|
|
|
|
if new_events:
|
|
log.debug(
|
|
f"Sync: {len(events)} event(s), "
|
|
f"{len(new_events)} new"
|
|
)
|
|
for e in new_events:
|
|
log.debug(
|
|
f" Event: type={e.get('type')} "
|
|
f"sender={e.get('sender')} "
|
|
f"id={e.get('event_id')}"
|
|
)
|
|
if e.get("type") == "m.room.message":
|
|
log.debug(
|
|
f" Body: {e.get('content', {}).get('body', '')[:80]}"
|
|
)
|
|
|
|
self.state.mark_processed(
|
|
[e["event_id"] for e in new_events if e.get("event_id")],
|
|
limit=self.config.bridge.processed_ids_limit,
|
|
)
|
|
self.state.save(self.config.bridge.state_file)
|
|
|
|
self._process_events(new_events)
|
|
else:
|
|
log.debug(
|
|
f"Sync OK, 0 new events (batch: {nb[:35]}...)"
|
|
)
|
|
else:
|
|
consecutive_errors += 1
|
|
if consecutive_errors > 10:
|
|
log.warning(
|
|
f"{consecutive_errors} consecutive errors, "
|
|
f"backing off"
|
|
)
|
|
time.sleep(10)
|
|
consecutive_errors = 0
|
|
|
|
except Exception as e:
|
|
log.error(f"Unexpected error in main loop: {e}", exc_info=True)
|
|
consecutive_errors += 1
|
|
|
|
time.sleep(self.config.bridge.poll_interval_seconds)
|
|
|
|
log.info("Bridge stopped gracefully")
|
|
|
|
# ── Initialization Helpers ──────────────────────────────────────────
|
|
|
|
def _bootstrap_sync(self) -> None:
|
|
"""Perform an initial sync to get the first batch token."""
|
|
log.info("No batch token — performing initial sync...")
|
|
nb, _ = self.matrix.sync(timeout=self.config.bridge.matrix_poll_timeout)
|
|
if nb:
|
|
self.state.next_batch = nb
|
|
self.state.save(self.config.bridge.state_file)
|
|
log.info(f"Got initial batch: {nb[:40]}...")
|
|
else:
|
|
log.warning("Initial sync failed, will retry in main loop")
|
|
|
|
def _validate_session(self) -> None:
|
|
"""Check if the saved session is still valid on the agent."""
|
|
sid = self.state.session_id
|
|
if sid:
|
|
state = self.agent.get_session_state(sid)
|
|
if state is None:
|
|
log.warning(
|
|
f"Saved session {sid} no longer valid, clearing"
|
|
)
|
|
self.state.session_id = None
|
|
self.state.processed_event_ids = []
|
|
self.state.save(self.config.bridge.state_file)
|
|
|
|
# ── Event Processing ────────────────────────────────────────────────
|
|
|
|
DANIEL_USER_ID = DANIEL_USER_ID
|
|
|
|
@staticmethod
|
|
def _is_from_daniel(event: dict) -> bool:
|
|
"""Check if a Matrix event is a text message from Daniel."""
|
|
return (
|
|
event.get("type") == "m.room.message"
|
|
and event.get("content", {}).get("msgtype") == "m.text"
|
|
and event.get("sender") == DANIEL_USER_ID
|
|
)
|
|
|
|
def _process_events(self, events: list[dict]) -> None:
|
|
"""Process a batch of new Matrix events.
|
|
|
|
Extracts Daniel's text messages, checks for commands,
|
|
bundles them, and forwards to the agent.
|
|
"""
|
|
daniel_msgs = [
|
|
e for e in events if self._is_from_daniel(e)
|
|
]
|
|
if not daniel_msgs:
|
|
return
|
|
|
|
# Sort chronologically
|
|
daniel_msgs.sort(key=lambda e: e.get("origin_server_ts", 0))
|
|
texts = [e.get("content", {}).get("body", "") for e in daniel_msgs]
|
|
log.info(f"Received {len(texts)} message(s) from Daniel")
|
|
|
|
# Check for commands first
|
|
remaining = self._dispatch_commands(texts)
|
|
if not remaining:
|
|
return
|
|
|
|
# Forward remaining texts to the agent
|
|
self._forward_to_agent(remaining)
|
|
|
|
# ── Command System ──────────────────────────────────────────────────
|
|
|
|
COMMANDS = {
|
|
"/new_session": "Create a fresh agent session (clears conversation history)",
|
|
"/help": "Show this list of available commands",
|
|
}
|
|
|
|
def _dispatch_commands(self, texts: list[str]) -> list[str]:
|
|
"""Check messages for commands and handle them.
|
|
|
|
Returns the list of texts that were NOT handled as commands,
|
|
so they can be forwarded to the agent.
|
|
"""
|
|
remaining = []
|
|
for text in texts:
|
|
stripped = text.strip()
|
|
if stripped == "/new_session":
|
|
self._cmd_new_session()
|
|
elif stripped == "/help":
|
|
self._cmd_help()
|
|
else:
|
|
remaining.append(text)
|
|
return remaining
|
|
|
|
def _cmd_new_session(self) -> None:
|
|
"""Handle /new_session: create a fresh session and notify the user."""
|
|
log.info("Executing /new_session command")
|
|
session_id = self.agent.create_session()
|
|
if session_id:
|
|
self.agent.rename_session(session_id, f"Matrix {session_id}")
|
|
self.state.session_id = session_id
|
|
self.state.save(self.config.bridge.state_file)
|
|
self.matrix.send_message("🔄 New session created!")
|
|
else:
|
|
self.matrix.send_message(
|
|
"❌ Failed to create new session. Check logs."
|
|
)
|
|
|
|
def _cmd_help(self) -> None:
|
|
"""Handle /help: list available commands."""
|
|
lines = ["**Available commands:**"]
|
|
for cmd, desc in self.COMMANDS.items():
|
|
lines.append(f"- `{cmd}` — {desc}")
|
|
self.matrix.send_message("\n".join(lines))
|
|
|
|
# ── Agent Forwarding ────────────────────────────────────────────────
|
|
|
|
def _ensure_session(self) -> Optional[str]:
|
|
"""Get or create an agent session. Returns session ID or None."""
|
|
session_id = self.state.session_id
|
|
|
|
if not session_id:
|
|
log.info("No active session, creating one")
|
|
session_id = self.agent.create_session()
|
|
if not session_id:
|
|
return None
|
|
self.agent.rename_session(session_id, f"Matrix {session_id}")
|
|
self.state.session_id = session_id
|
|
self.state.save(self.config.bridge.state_file)
|
|
return session_id
|
|
|
|
# Verify the session still exists
|
|
if self.agent.get_session_state(session_id) is None:
|
|
log.warning(f"Session {session_id} gone, creating new one")
|
|
session_id = self.agent.create_session()
|
|
if not session_id:
|
|
return None
|
|
self.agent.rename_session(session_id, f"Matrix {session_id}")
|
|
self.state.session_id = session_id
|
|
self.state.save(self.config.bridge.state_file)
|
|
|
|
return session_id
|
|
|
|
@staticmethod
|
|
def _bundle_messages(texts: list[str]) -> str:
|
|
"""Combine multiple messages into a single agent prompt."""
|
|
if len(texts) == 1:
|
|
return texts[0]
|
|
|
|
parts = [
|
|
f"--- Message {i} ---\n{t}"
|
|
for i, t in enumerate(texts, 1)
|
|
]
|
|
return "Your human sent these messages at once:\n\n" + "\n\n".join(parts)
|
|
|
|
def _forward_to_agent(self, texts: list[str]) -> None:
|
|
"""Send texts to the agent and relay the response back to Matrix."""
|
|
session_id = self._ensure_session()
|
|
if not session_id:
|
|
self.matrix.send_message(
|
|
"❌ Failed to create agent session. Check logs."
|
|
)
|
|
return
|
|
|
|
prompt = self._bundle_messages(texts)
|
|
log.info(
|
|
f"Sending {len(texts)} message(s) to agent: "
|
|
f"{prompt[:120]}..."
|
|
)
|
|
|
|
# Show typing indicator while processing
|
|
self.matrix.set_typing(True)
|
|
|
|
try:
|
|
if not self.agent.send_message(session_id, prompt):
|
|
self.matrix.send_message(
|
|
"❌ Failed to send message to agent."
|
|
)
|
|
return
|
|
|
|
response = self.agent.wait_for_response(
|
|
session_id,
|
|
timeout=self.config.bridge.agent_response_timeout,
|
|
)
|
|
finally:
|
|
self.matrix.set_typing(False)
|
|
|
|
if response:
|
|
self._send_response(response)
|
|
else:
|
|
log.warning("Agent returned no response — not sending to Matrix")
|
|
|
|
def _send_response(self, text: str) -> None:
|
|
"""Send the agent's response to Matrix, splitting long messages."""
|
|
max_chunk = self.config.bridge.max_message_length
|
|
if len(text) <= max_chunk:
|
|
self.matrix.send_message(text)
|
|
else:
|
|
for i in range(0, len(text), max_chunk):
|
|
chunk = text[i : i + max_chunk]
|
|
if i == 0:
|
|
self.matrix.send_message(chunk)
|
|
else:
|
|
self.matrix.send_message(f"(continued)\n{chunk}")
|