"""Matrix API client — handles all communication with the Matrix server. Provides: sync, send messages, typing indicators, token refresh. Uses only Python stdlib — no external dependencies. """ from __future__ import annotations import json import logging import os import time import urllib.error import urllib.request from typing import Optional from urllib.parse import urlencode from config import Config log = logging.getLogger("matrix-bridge.matrix") class MatrixClient: """Client for communicating with a Matrix homeserver.""" def __init__(self, config: Config): self.server = config.matrix.server self.user_id = config.matrix.user_id self.room_id = config.matrix.room_id self._creds_file = config.matrix.credentials_file self._access_token: Optional[str] = None self._load_credentials() # ── Credential Management ─────────────────────────────────────────── def _load_credentials(self) -> None: """Read access token from the credentials file.""" creds = self._parse_creds_file() self._access_token = creds.get("Access Token") if not self._access_token: raise RuntimeError("No Access Token found in credentials file") def _parse_creds_file(self) -> dict[str, str]: """Parse key: value pairs from the credentials file.""" creds = {} with open(self._creds_file, "r") as f: for line in f: if ":" in line: key, _, value = line.partition(":") creds[key.strip()] = value.strip() return creds def _refresh_token(self) -> bool: """Re-authenticate to Matrix and update the credentials file.""" creds = self._parse_creds_file() password = creds.get("Password") username = creds.get("Username") if not password or not username: log.error("Cannot refresh token: missing username or password") return False url = f"https://{self.server}/_matrix/client/v3/login" payload = { "type": "m.login.password", "user": username, "password": password, } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req) as resp: result = json.loads(resp.read()) new_token = result["access_token"] # Persist to credentials file with open(self._creds_file, "r") as f: content = f.read() content = content.replace( creds.get("Access Token", ""), new_token ) with open(self._creds_file, "w") as f: f.write(content) self._access_token = new_token log.info("Matrix token refreshed") return True except Exception as e: log.error(f"Matrix re-login failed: {e}") return False # ── Raw Request ───────────────────────────────────────────────────── def _request( self, method: str, path: str, body: Optional[dict] = None ) -> Optional[dict]: """Make an authenticated request to the Matrix API. Handles token expiry by re-authenticating and retrying once. Returns parsed JSON on success, None on failure. """ url = f"https://{self.server}{path}" data = json.dumps(body).encode("utf-8") if body else None req = urllib.request.Request( url, data=data, headers={ "Content-Type": "application/json" if data else "", "Authorization": f"Bearer {self._access_token}", }, method=method, ) try: with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw.strip() else {} except urllib.error.HTTPError as e: err_body = e.read().decode("utf-8", errors="replace") if "M_UNKNOWN_TOKEN" in err_body: log.info("Token expired, refreshing...") if self._refresh_token(): return self._request(method, path, body) log.error(f"Matrix {method} {path} -> HTTP {e.code}: {err_body[:200]}") return None except Exception as e: log.error(f"Matrix {method} {path} failed: {e}") return None # ── Sync ──────────────────────────────────────────────────────────── def sync( self, since: Optional[str] = None, timeout: int = 0 ) -> tuple[Optional[str], list[dict]]: """Poll the Matrix sync API. Args: since: Batch token from the previous sync (None for initial). timeout: Server-side timeout in ms (0 = return immediately). Returns: (next_batch_token, list_of_events) or (None, []) on error. """ params = {"timeout": str(timeout)} if since: params["since"] = since result = self._request( "GET", f"/_matrix/client/v3/sync?{urlencode(params)}" ) if result is None: return None, [] next_batch = result.get("next_batch") # Collect all timeline events from joined rooms events = [] for room_data in result.get("rooms", {}).get("join", {}).values(): events.extend( room_data.get("timeline", {}).get("events", []) ) return next_batch, events # ── Send Message ──────────────────────────────────────────────────── def send_message(self, text: str) -> bool: """Send a text message to the configured room. Returns True on success.""" txn_id = f"lucy-bridge-{int(time.time())}-{os.getpid()}" path = ( f"/_matrix/client/v3/rooms/" f"{self.room_id}/send/m.room.message/{txn_id}" ) payload = {"msgtype": "m.text", "body": text} result = self._request("PUT", path, payload) if result and "event_id" in result: log.info(f"Sent to Matrix, event_id: {result['event_id']}") return True return False # ── Typing Indicator ──────────────────────────────────────────────── def set_typing(self, typing: bool, timeout_ms: int = 20000) -> bool: """Show or hide the typing indicator in the configured room. This is best-effort — failures are logged at DEBUG level since typing indicators are non-critical. """ path = ( f"/_matrix/client/v3/rooms/" f"{self.room_id}/typing/{self.user_id}" ) payload: dict = {"typing": typing} if typing: payload["timeout"] = timeout_ms result = self._request("PUT", path, payload) if result is not None: log.debug(f"Typing {'on' if typing else 'off'}") return True log.debug("Typing indicator failed (non-critical)") return False