Persistent Python daemon connecting Matrix DM room to AiAgent API. - Config-driven (JSON config file) - Extensible command system (/new_session, /help) - Typing indicators while agent processes - Session auto-naming for identification - Persistent state across restarts - Token refresh, retry logic, error handling - Python stdlib only — no external dependencies
207 lines
7.6 KiB
Python
207 lines
7.6 KiB
Python
"""Matrix API client — handles all communication with the Matrix server.
|
|
|
|
Provides: sync, send messages, typing indicators, token refresh.
|
|
Uses only Python stdlib — no external dependencies.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Optional
|
|
from urllib.parse import urlencode
|
|
|
|
from config import Config
|
|
|
|
log = logging.getLogger("matrix-bridge.matrix")
|
|
|
|
|
|
class MatrixClient:
|
|
"""Client for communicating with a Matrix homeserver."""
|
|
|
|
def __init__(self, config: Config):
|
|
self.server = config.matrix.server
|
|
self.user_id = config.matrix.user_id
|
|
self.room_id = config.matrix.room_id
|
|
self._creds_file = config.matrix.credentials_file
|
|
self._access_token: Optional[str] = None
|
|
self._load_credentials()
|
|
|
|
# ── Credential Management ───────────────────────────────────────────
|
|
|
|
def _load_credentials(self) -> None:
|
|
"""Read access token from the credentials file."""
|
|
creds = self._parse_creds_file()
|
|
self._access_token = creds.get("Access Token")
|
|
if not self._access_token:
|
|
raise RuntimeError("No Access Token found in credentials file")
|
|
|
|
def _parse_creds_file(self) -> dict[str, str]:
|
|
"""Parse key: value pairs from the credentials file."""
|
|
creds = {}
|
|
with open(self._creds_file, "r") as f:
|
|
for line in f:
|
|
if ":" in line:
|
|
key, _, value = line.partition(":")
|
|
creds[key.strip()] = value.strip()
|
|
return creds
|
|
|
|
def _refresh_token(self) -> bool:
|
|
"""Re-authenticate to Matrix and update the credentials file."""
|
|
creds = self._parse_creds_file()
|
|
password = creds.get("Password")
|
|
username = creds.get("Username")
|
|
if not password or not username:
|
|
log.error("Cannot refresh token: missing username or password")
|
|
return False
|
|
|
|
url = f"https://{self.server}/_matrix/client/v3/login"
|
|
payload = {
|
|
"type": "m.login.password",
|
|
"user": username,
|
|
"password": password,
|
|
}
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
result = json.loads(resp.read())
|
|
new_token = result["access_token"]
|
|
|
|
# Persist to credentials file
|
|
with open(self._creds_file, "r") as f:
|
|
content = f.read()
|
|
content = content.replace(
|
|
creds.get("Access Token", ""), new_token
|
|
)
|
|
with open(self._creds_file, "w") as f:
|
|
f.write(content)
|
|
|
|
self._access_token = new_token
|
|
log.info("Matrix token refreshed")
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"Matrix re-login failed: {e}")
|
|
return False
|
|
|
|
# ── Raw Request ─────────────────────────────────────────────────────
|
|
|
|
def _request(
|
|
self, method: str, path: str, body: Optional[dict] = None
|
|
) -> Optional[dict]:
|
|
"""Make an authenticated request to the Matrix API.
|
|
|
|
Handles token expiry by re-authenticating and retrying once.
|
|
Returns parsed JSON on success, None on failure.
|
|
"""
|
|
url = f"https://{self.server}{path}"
|
|
data = json.dumps(body).encode("utf-8") if body else None
|
|
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={
|
|
"Content-Type": "application/json" if data else "",
|
|
"Authorization": f"Bearer {self._access_token}",
|
|
},
|
|
method=method,
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
raw = resp.read().decode("utf-8")
|
|
return json.loads(raw) if raw.strip() else {}
|
|
except urllib.error.HTTPError as e:
|
|
err_body = e.read().decode("utf-8", errors="replace")
|
|
if "M_UNKNOWN_TOKEN" in err_body:
|
|
log.info("Token expired, refreshing...")
|
|
if self._refresh_token():
|
|
return self._request(method, path, body)
|
|
log.error(f"Matrix {method} {path} -> HTTP {e.code}: {err_body[:200]}")
|
|
return None
|
|
except Exception as e:
|
|
log.error(f"Matrix {method} {path} failed: {e}")
|
|
return None
|
|
|
|
# ── Sync ────────────────────────────────────────────────────────────
|
|
|
|
def sync(
|
|
self, since: Optional[str] = None, timeout: int = 0
|
|
) -> tuple[Optional[str], list[dict]]:
|
|
"""Poll the Matrix sync API.
|
|
|
|
Args:
|
|
since: Batch token from the previous sync (None for initial).
|
|
timeout: Server-side timeout in ms (0 = return immediately).
|
|
|
|
Returns:
|
|
(next_batch_token, list_of_events) or (None, []) on error.
|
|
"""
|
|
params = {"timeout": str(timeout)}
|
|
if since:
|
|
params["since"] = since
|
|
|
|
result = self._request(
|
|
"GET", f"/_matrix/client/v3/sync?{urlencode(params)}"
|
|
)
|
|
if result is None:
|
|
return None, []
|
|
|
|
next_batch = result.get("next_batch")
|
|
|
|
# Collect all timeline events from joined rooms
|
|
events = []
|
|
for room_data in result.get("rooms", {}).get("join", {}).values():
|
|
events.extend(
|
|
room_data.get("timeline", {}).get("events", [])
|
|
)
|
|
|
|
return next_batch, events
|
|
|
|
# ── Send Message ────────────────────────────────────────────────────
|
|
|
|
def send_message(self, text: str) -> bool:
|
|
"""Send a text message to the configured room. Returns True on success."""
|
|
txn_id = f"lucy-bridge-{int(time.time())}-{os.getpid()}"
|
|
path = (
|
|
f"/_matrix/client/v3/rooms/"
|
|
f"{self.room_id}/send/m.room.message/{txn_id}"
|
|
)
|
|
payload = {"msgtype": "m.text", "body": text}
|
|
|
|
result = self._request("PUT", path, payload)
|
|
if result and "event_id" in result:
|
|
log.info(f"Sent to Matrix, event_id: {result['event_id']}")
|
|
return True
|
|
return False
|
|
|
|
# ── Typing Indicator ────────────────────────────────────────────────
|
|
|
|
def set_typing(self, typing: bool, timeout_ms: int = 20000) -> bool:
|
|
"""Show or hide the typing indicator in the configured room.
|
|
|
|
This is best-effort — failures are logged at DEBUG level since
|
|
typing indicators are non-critical.
|
|
"""
|
|
path = (
|
|
f"/_matrix/client/v3/rooms/"
|
|
f"{self.room_id}/typing/{self.user_id}"
|
|
)
|
|
payload: dict = {"typing": typing}
|
|
if typing:
|
|
payload["timeout"] = timeout_ms
|
|
|
|
result = self._request("PUT", path, payload)
|
|
if result is not None:
|
|
log.debug(f"Typing {'on' if typing else 'off'}")
|
|
return True
|
|
log.debug("Typing indicator failed (non-critical)")
|
|
return False
|