Files
MatrixBridge/agent_client.py
Lucy 751584c99d Initial commit: Matrix Bridge Daemon
Persistent Python daemon connecting Matrix DM room to AiAgent API.

- Config-driven (JSON config file)
- Extensible command system (/new_session, /help)
- Typing indicators while agent processes
- Session auto-naming for identification
- Persistent state across restarts
- Token refresh, retry logic, error handling
- Python stdlib only — no external dependencies
2026-05-15 13:45:24 +02:00

180 lines
7.1 KiB
Python

"""Agent API client — communicates with the local AI agent framework.
Provides: session management, message sending, state polling, response fetching.
Uses only Python stdlib — no external dependencies.
"""
from __future__ import annotations
import json
import logging
import time
import urllib.error
import urllib.request
from typing import Optional
from config import Config
log = logging.getLogger("matrix-bridge.agent")
class AgentClient:
"""Client for the local agent HTTP API."""
def __init__(self, config: Config):
self.base_url = config.agent.base_url
self._timeout = config.bridge.agent_timeout_seconds
self._retries = config.bridge.agent_retries
# ── Raw Request ─────────────────────────────────────────────────────
def _request(
self, method: str, path: str, body: Optional[dict] = None
) -> Optional[dict]:
"""Make a request to the agent API with retry logic.
Retries on transient connection errors (connection reset, timeout).
HTTP errors (4xx, 5xx) are not retried.
"""
url = f"{self.base_url}{path}"
last_error = None
for attempt in range(self._retries):
data = json.dumps(body).encode("utf-8") if body else None
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"} if data else {},
method=method,
)
try:
with urllib.request.urlopen(req, timeout=self._timeout) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8", errors="replace")
log.error(f"Agent API {method} {path} -> {e.code}: {err_body[:200]}")
return None
except (urllib.error.URLError, ConnectionError, TimeoutError, OSError) as e:
last_error = e
if attempt < self._retries - 1:
log.warning(
f"Agent API {method} {path} transient "
f"(attempt {attempt + 1}/{self._retries}): {e}"
)
time.sleep(1)
else:
log.error(
f"Agent API {method} {path} failed "
f"after {self._retries} attempts: {e}"
)
except Exception as e:
log.error(f"Agent API {method} {path} unexpected error: {e}")
return None
return None
# ── Session Management ──────────────────────────────────────────────
def create_session(self) -> Optional[str]:
"""Create a new agent session. Returns session ID or None."""
result = self._request("POST", "/api/session/new")
if result and "id" in result:
log.info(f"Created session: {result['id']}")
return result["id"]
log.error("Failed to create agent session")
return None
def rename_session(self, session_id: str, new_name: str) -> bool:
"""Rename a session to identify its purpose."""
result = self._request(
"PATCH", f"/api/session/{session_id}", {"name": new_name}
)
if result is not None and result.get("success") is not False:
log.info(f"Renamed {session_id} -> '{new_name}'")
return True
log.warning(f"Failed to rename session {session_id}")
return False
# ── Messaging ───────────────────────────────────────────────────────
def send_message(self, session_id: str, content: str) -> bool:
"""Send a message to the agent. Returns True if accepted."""
result = self._request(
"POST",
f"/api/chat/{session_id}/message",
{"message": content},
)
if result and result.get("ok"):
log.info(f"Forwarded to session {session_id}")
return True
log.error(f"Agent rejected message: {result}")
return False
# ── State & Response Polling ────────────────────────────────────────
def get_session_state(self, session_id: str) -> Optional[dict]:
"""Get the current processing state of a session."""
return self._request("GET", f"/api/session/{session_id}/state")
def get_session_messages(self, session_id: str) -> Optional[list]:
"""Get the element list for a session."""
result = self._request("GET", f"/api/session/{session_id}/messages")
return result if isinstance(result, list) else None
def get_message(self, message_id: str) -> Optional[dict]:
"""Get the full content of a message."""
return self._request("GET", f"/api/message/{message_id}")
def wait_for_response(
self, session_id: str, timeout: int = 300
) -> Optional[str]:
"""Poll until the agent finishes, then return the last assistant message.
**Phase 1:** Wait for ``{"streaming": true}`` to appear (agent started).
**Phase 2:** Wait for streaming to end (agent finished).
**Phase 3:** Fetch and return the last assistant message content.
Intermediate tool calls are invisible — only the final response is returned.
"""
log.info(f"Waiting for agent (session {session_id})...")
start = time.time()
# Phase 1: Wait for agent to start streaming
while time.time() - start < timeout:
state = self.get_session_state(session_id)
if state is None:
log.warning("Session disappeared")
return None
if state.get("streaming"):
log.info("Agent is now streaming")
break
if time.time() - start >= 3:
# Could be a very fast response (no streaming phase)
break
time.sleep(0.3)
# Phase 2: Wait for streaming to end
while time.time() - start < timeout:
state = self.get_session_state(session_id)
if state is None:
log.warning("Session disappeared during processing")
return None
if not state.get("streaming"):
break
time.sleep(0.5)
# Phase 3: Fetch the last assistant message
messages = self.get_session_messages(session_id)
if not messages:
log.warning("No messages in session after agent finished")
return None
for elem in reversed(messages):
if elem.get("type") == "message":
msg = self.get_message(elem["id"])
if msg and msg.get("role") == "assistant":
return msg.get("content", "")
log.warning("No assistant message found")
return None