"""Simple HTTP API for the Matrix Bridge. Provides endpoints for agents and scheduled tasks to interact with the bridge without going through the Matrix protocol. Endpoints: GET /session-id — Return the current bridge session ID POST /send — Send a text message to the configured Matrix room """ from __future__ import annotations import json import logging import threading from http.server import HTTPServer, BaseHTTPRequestHandler from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from bridge import Bridge log = logging.getLogger("matrix-bridge.api") class _Handler(BaseHTTPRequestHandler): """HTTP handler that delegates to a shared bridge reference.""" # Set once before the server starts — shared across all requests. bridge: Optional["Bridge"] = None # ── Helpers ─────────────────────────────────────────────────────── def _send_json(self, status: int, data: dict) -> None: """Send a JSON response with the given status code.""" body = json.dumps(data).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _read_body(self) -> Optional[dict]: """Read and parse the request body as JSON.""" try: length = int(self.headers.get("Content-Length", 0)) except (ValueError, TypeError): return None if length <= 0: return None raw = self.rfile.read(length) try: return json.loads(raw) except json.JSONDecodeError: return None # ── Routes ──────────────────────────────────────────────────────── def do_GET(self) -> None: if self.path == "/session-id": if self.bridge is None: self._send_json(503, {"error": "Bridge not initialized"}) return sid = self.bridge.state.session_id self._send_json(200, {"session_id": sid}) else: self._send_json(404, {"error": f"Not found: {self.path}"}) def do_POST(self) -> None: if self.path == "/send": if self.bridge is None: self._send_json(503, {"error": "Bridge not initialized"}) return body = self._read_body() if body is None: self._send_json(400, {"error": "Invalid or missing JSON body"}) return message = body.get("message") if not message or not isinstance(message, str): self._send_json(400, {"error": "Missing or invalid 'message' field"}) return success = self.bridge.matrix.send_message(message) if success: self._send_json(200, {"ok": True}) else: self._send_json(500, {"error": "Failed to send Matrix message"}) else: self._send_json(404, {"error": f"Not found: {self.path}"}) # ── Silence default logging ─────────────────────────────────────── def log_message(self, format: str, *args) -> None: log.debug(f"HTTP: {format % args}") def start_api_server( bridge: "Bridge", host: str = "127.0.0.1", port: int = 8082 ) -> None: """Start the API HTTP server in a background daemon thread. The server runs until the main process exits. It provides a lightweight JSON API on ``host:port`` for other components to query the bridge state or trigger Matrix messages. """ _Handler.bridge = bridge server = HTTPServer((host, port), _Handler) server.timeout = 1.0 # Allows clean shutdown without dedicated stop thread = threading.Thread( target=server.serve_forever, name=f"api-{host}:{port}", daemon=True, ) thread.start() log.info(f"API server listening on http://{host}:{port}")