Files
MatrixBridge/api_server.py

118 lines
4.2 KiB
Python

"""Simple HTTP API for the Matrix Bridge.
Provides endpoints for agents and scheduled tasks to interact
with the bridge without going through the Matrix protocol.
Endpoints:
GET /session-id — Return the current bridge session ID
POST /send — Send a text message to the configured Matrix room
"""
from __future__ import annotations
import json
import logging
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from bridge import Bridge
log = logging.getLogger("matrix-bridge.api")
class _Handler(BaseHTTPRequestHandler):
"""HTTP handler that delegates to a shared bridge reference."""
# Set once before the server starts — shared across all requests.
bridge: Optional["Bridge"] = None
# ── Helpers ───────────────────────────────────────────────────────
def _send_json(self, status: int, data: dict) -> None:
"""Send a JSON response with the given status code."""
body = json.dumps(data).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _read_body(self) -> Optional[dict]:
"""Read and parse the request body as JSON."""
try:
length = int(self.headers.get("Content-Length", 0))
except (ValueError, TypeError):
return None
if length <= 0:
return None
raw = self.rfile.read(length)
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
# ── Routes ────────────────────────────────────────────────────────
def do_GET(self) -> None:
if self.path == "/session-id":
if self.bridge is None:
self._send_json(503, {"error": "Bridge not initialized"})
return
sid = self.bridge.state.session_id
self._send_json(200, {"session_id": sid})
else:
self._send_json(404, {"error": f"Not found: {self.path}"})
def do_POST(self) -> None:
if self.path == "/send":
if self.bridge is None:
self._send_json(503, {"error": "Bridge not initialized"})
return
body = self._read_body()
if body is None:
self._send_json(400, {"error": "Invalid or missing JSON body"})
return
message = body.get("message")
if not message or not isinstance(message, str):
self._send_json(400, {"error": "Missing or invalid 'message' field"})
return
success = self.bridge.matrix.send_message(message)
if success:
self._send_json(200, {"ok": True})
else:
self._send_json(500, {"error": "Failed to send Matrix message"})
else:
self._send_json(404, {"error": f"Not found: {self.path}"})
# ── Silence default logging ───────────────────────────────────────
def log_message(self, format: str, *args) -> None:
log.debug(f"HTTP: {format % args}")
def start_api_server(
bridge: "Bridge", host: str = "127.0.0.1", port: int = 8082
) -> None:
"""Start the API HTTP server in a background daemon thread.
The server runs until the main process exits. It provides a
lightweight JSON API on ``host:port`` for other components to
query the bridge state or trigger Matrix messages.
"""
_Handler.bridge = bridge
server = HTTPServer((host, port), _Handler)
server.timeout = 1.0 # Allows clean shutdown without dedicated stop
thread = threading.Thread(
target=server.serve_forever,
name=f"api-{host}:{port}",
daemon=True,
)
thread.start()
log.info(f"API server listening on http://{host}:{port}")