Files
MatrixBridge/matrix_client.py
Lucy 751584c99d Initial commit: Matrix Bridge Daemon
Persistent Python daemon connecting Matrix DM room to AiAgent API.

- Config-driven (JSON config file)
- Extensible command system (/new_session, /help)
- Typing indicators while agent processes
- Session auto-naming for identification
- Persistent state across restarts
- Token refresh, retry logic, error handling
- Python stdlib only — no external dependencies
2026-05-15 13:45:24 +02:00

207 lines
7.6 KiB
Python

"""Matrix API client — handles all communication with the Matrix server.
Provides: sync, send messages, typing indicators, token refresh.
Uses only Python stdlib — no external dependencies.
"""
from __future__ import annotations
import json
import logging
import os
import time
import urllib.error
import urllib.request
from typing import Optional
from urllib.parse import urlencode
from config import Config
log = logging.getLogger("matrix-bridge.matrix")
class MatrixClient:
"""Client for communicating with a Matrix homeserver."""
def __init__(self, config: Config):
self.server = config.matrix.server
self.user_id = config.matrix.user_id
self.room_id = config.matrix.room_id
self._creds_file = config.matrix.credentials_file
self._access_token: Optional[str] = None
self._load_credentials()
# ── Credential Management ───────────────────────────────────────────
def _load_credentials(self) -> None:
"""Read access token from the credentials file."""
creds = self._parse_creds_file()
self._access_token = creds.get("Access Token")
if not self._access_token:
raise RuntimeError("No Access Token found in credentials file")
def _parse_creds_file(self) -> dict[str, str]:
"""Parse key: value pairs from the credentials file."""
creds = {}
with open(self._creds_file, "r") as f:
for line in f:
if ":" in line:
key, _, value = line.partition(":")
creds[key.strip()] = value.strip()
return creds
def _refresh_token(self) -> bool:
"""Re-authenticate to Matrix and update the credentials file."""
creds = self._parse_creds_file()
password = creds.get("Password")
username = creds.get("Username")
if not password or not username:
log.error("Cannot refresh token: missing username or password")
return False
url = f"https://{self.server}/_matrix/client/v3/login"
payload = {
"type": "m.login.password",
"user": username,
"password": password,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req) as resp:
result = json.loads(resp.read())
new_token = result["access_token"]
# Persist to credentials file
with open(self._creds_file, "r") as f:
content = f.read()
content = content.replace(
creds.get("Access Token", ""), new_token
)
with open(self._creds_file, "w") as f:
f.write(content)
self._access_token = new_token
log.info("Matrix token refreshed")
return True
except Exception as e:
log.error(f"Matrix re-login failed: {e}")
return False
# ── Raw Request ─────────────────────────────────────────────────────
def _request(
self, method: str, path: str, body: Optional[dict] = None
) -> Optional[dict]:
"""Make an authenticated request to the Matrix API.
Handles token expiry by re-authenticating and retrying once.
Returns parsed JSON on success, None on failure.
"""
url = f"https://{self.server}{path}"
data = json.dumps(body).encode("utf-8") if body else None
req = urllib.request.Request(
url, data=data,
headers={
"Content-Type": "application/json" if data else "",
"Authorization": f"Bearer {self._access_token}",
},
method=method,
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8", errors="replace")
if "M_UNKNOWN_TOKEN" in err_body:
log.info("Token expired, refreshing...")
if self._refresh_token():
return self._request(method, path, body)
log.error(f"Matrix {method} {path} -> HTTP {e.code}: {err_body[:200]}")
return None
except Exception as e:
log.error(f"Matrix {method} {path} failed: {e}")
return None
# ── Sync ────────────────────────────────────────────────────────────
def sync(
self, since: Optional[str] = None, timeout: int = 0
) -> tuple[Optional[str], list[dict]]:
"""Poll the Matrix sync API.
Args:
since: Batch token from the previous sync (None for initial).
timeout: Server-side timeout in ms (0 = return immediately).
Returns:
(next_batch_token, list_of_events) or (None, []) on error.
"""
params = {"timeout": str(timeout)}
if since:
params["since"] = since
result = self._request(
"GET", f"/_matrix/client/v3/sync?{urlencode(params)}"
)
if result is None:
return None, []
next_batch = result.get("next_batch")
# Collect all timeline events from joined rooms
events = []
for room_data in result.get("rooms", {}).get("join", {}).values():
events.extend(
room_data.get("timeline", {}).get("events", [])
)
return next_batch, events
# ── Send Message ────────────────────────────────────────────────────
def send_message(self, text: str) -> bool:
"""Send a text message to the configured room. Returns True on success."""
txn_id = f"lucy-bridge-{int(time.time())}-{os.getpid()}"
path = (
f"/_matrix/client/v3/rooms/"
f"{self.room_id}/send/m.room.message/{txn_id}"
)
payload = {"msgtype": "m.text", "body": text}
result = self._request("PUT", path, payload)
if result and "event_id" in result:
log.info(f"Sent to Matrix, event_id: {result['event_id']}")
return True
return False
# ── Typing Indicator ────────────────────────────────────────────────
def set_typing(self, typing: bool, timeout_ms: int = 20000) -> bool:
"""Show or hide the typing indicator in the configured room.
This is best-effort — failures are logged at DEBUG level since
typing indicators are non-critical.
"""
path = (
f"/_matrix/client/v3/rooms/"
f"{self.room_id}/typing/{self.user_id}"
)
payload: dict = {"typing": typing}
if typing:
payload["timeout"] = timeout_ms
result = self._request("PUT", path, payload)
if result is not None:
log.debug(f"Typing {'on' if typing else 'off'}")
return True
log.debug("Typing indicator failed (non-critical)")
return False