Persistent Python daemon connecting Matrix DM room to AiAgent API. - Config-driven (JSON config file) - Extensible command system (/new_session, /help) - Typing indicators while agent processes - Session auto-naming for identification - Persistent state across restarts - Token refresh, retry logic, error handling - Python stdlib only — no external dependencies
180 lines
7.1 KiB
Python
180 lines
7.1 KiB
Python
"""Agent API client — communicates with the local AI agent framework.
|
|
|
|
Provides: session management, message sending, state polling, response fetching.
|
|
Uses only Python stdlib — no external dependencies.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Optional
|
|
|
|
from config import Config
|
|
|
|
log = logging.getLogger("matrix-bridge.agent")
|
|
|
|
|
|
class AgentClient:
|
|
"""Client for the local agent HTTP API."""
|
|
|
|
def __init__(self, config: Config):
|
|
self.base_url = config.agent.base_url
|
|
self._timeout = config.bridge.agent_timeout_seconds
|
|
self._retries = config.bridge.agent_retries
|
|
|
|
# ── Raw Request ─────────────────────────────────────────────────────
|
|
|
|
def _request(
|
|
self, method: str, path: str, body: Optional[dict] = None
|
|
) -> Optional[dict]:
|
|
"""Make a request to the agent API with retry logic.
|
|
|
|
Retries on transient connection errors (connection reset, timeout).
|
|
HTTP errors (4xx, 5xx) are not retried.
|
|
"""
|
|
url = f"{self.base_url}{path}"
|
|
last_error = None
|
|
|
|
for attempt in range(self._retries):
|
|
data = json.dumps(body).encode("utf-8") if body else None
|
|
req = urllib.request.Request(
|
|
url, data=data,
|
|
headers={"Content-Type": "application/json"} if data else {},
|
|
method=method,
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=self._timeout) as resp:
|
|
raw = resp.read().decode("utf-8")
|
|
return json.loads(raw) if raw.strip() else {}
|
|
except urllib.error.HTTPError as e:
|
|
err_body = e.read().decode("utf-8", errors="replace")
|
|
log.error(f"Agent API {method} {path} -> {e.code}: {err_body[:200]}")
|
|
return None
|
|
except (urllib.error.URLError, ConnectionError, TimeoutError, OSError) as e:
|
|
last_error = e
|
|
if attempt < self._retries - 1:
|
|
log.warning(
|
|
f"Agent API {method} {path} transient "
|
|
f"(attempt {attempt + 1}/{self._retries}): {e}"
|
|
)
|
|
time.sleep(1)
|
|
else:
|
|
log.error(
|
|
f"Agent API {method} {path} failed "
|
|
f"after {self._retries} attempts: {e}"
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Agent API {method} {path} unexpected error: {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
# ── Session Management ──────────────────────────────────────────────
|
|
|
|
def create_session(self) -> Optional[str]:
|
|
"""Create a new agent session. Returns session ID or None."""
|
|
result = self._request("POST", "/api/session/new")
|
|
if result and "id" in result:
|
|
log.info(f"Created session: {result['id']}")
|
|
return result["id"]
|
|
log.error("Failed to create agent session")
|
|
return None
|
|
|
|
def rename_session(self, session_id: str, new_name: str) -> bool:
|
|
"""Rename a session to identify its purpose."""
|
|
result = self._request(
|
|
"PATCH", f"/api/session/{session_id}", {"name": new_name}
|
|
)
|
|
if result is not None and result.get("success") is not False:
|
|
log.info(f"Renamed {session_id} -> '{new_name}'")
|
|
return True
|
|
log.warning(f"Failed to rename session {session_id}")
|
|
return False
|
|
|
|
# ── Messaging ───────────────────────────────────────────────────────
|
|
|
|
def send_message(self, session_id: str, content: str) -> bool:
|
|
"""Send a message to the agent. Returns True if accepted."""
|
|
result = self._request(
|
|
"POST",
|
|
f"/api/chat/{session_id}/message",
|
|
{"message": content},
|
|
)
|
|
if result and result.get("ok"):
|
|
log.info(f"Forwarded to session {session_id}")
|
|
return True
|
|
log.error(f"Agent rejected message: {result}")
|
|
return False
|
|
|
|
# ── State & Response Polling ────────────────────────────────────────
|
|
|
|
def get_session_state(self, session_id: str) -> Optional[dict]:
|
|
"""Get the current processing state of a session."""
|
|
return self._request("GET", f"/api/session/{session_id}/state")
|
|
|
|
def get_session_messages(self, session_id: str) -> Optional[list]:
|
|
"""Get the element list for a session."""
|
|
result = self._request("GET", f"/api/session/{session_id}/messages")
|
|
return result if isinstance(result, list) else None
|
|
|
|
def get_message(self, message_id: str) -> Optional[dict]:
|
|
"""Get the full content of a message."""
|
|
return self._request("GET", f"/api/message/{message_id}")
|
|
|
|
def wait_for_response(
|
|
self, session_id: str, timeout: int = 300
|
|
) -> Optional[str]:
|
|
"""Poll until the agent finishes, then return the last assistant message.
|
|
|
|
**Phase 1:** Wait for ``{"streaming": true}`` to appear (agent started).
|
|
**Phase 2:** Wait for streaming to end (agent finished).
|
|
**Phase 3:** Fetch and return the last assistant message content.
|
|
|
|
Intermediate tool calls are invisible — only the final response is returned.
|
|
"""
|
|
log.info(f"Waiting for agent (session {session_id})...")
|
|
start = time.time()
|
|
|
|
# Phase 1: Wait for agent to start streaming
|
|
while time.time() - start < timeout:
|
|
state = self.get_session_state(session_id)
|
|
if state is None:
|
|
log.warning("Session disappeared")
|
|
return None
|
|
if state.get("streaming"):
|
|
log.info("Agent is now streaming")
|
|
break
|
|
if time.time() - start >= 3:
|
|
# Could be a very fast response (no streaming phase)
|
|
break
|
|
time.sleep(0.3)
|
|
|
|
# Phase 2: Wait for streaming to end
|
|
while time.time() - start < timeout:
|
|
state = self.get_session_state(session_id)
|
|
if state is None:
|
|
log.warning("Session disappeared during processing")
|
|
return None
|
|
if not state.get("streaming"):
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
# Phase 3: Fetch the last assistant message
|
|
messages = self.get_session_messages(session_id)
|
|
if not messages:
|
|
log.warning("No messages in session after agent finished")
|
|
return None
|
|
|
|
for elem in reversed(messages):
|
|
if elem.get("type") == "message":
|
|
msg = self.get_message(elem["id"])
|
|
if msg and msg.get("role") == "assistant":
|
|
return msg.get("content", "")
|
|
|
|
log.warning("No assistant message found")
|
|
return None
|