"""Agent API client — communicates with the local AI agent framework. Provides: session management, message sending, state polling, response fetching. Uses only Python stdlib — no external dependencies. """ from __future__ import annotations import json import logging import time import urllib.error import urllib.request from typing import Optional from config import Config log = logging.getLogger("matrix-bridge.agent") class AgentClient: """Client for the local agent HTTP API.""" def __init__(self, config: Config): self.base_url = config.agent.base_url self._timeout = config.bridge.agent_timeout_seconds self._retries = config.bridge.agent_retries # ── Raw Request ───────────────────────────────────────────────────── def _request( self, method: str, path: str, body: Optional[dict] = None ) -> Optional[dict]: """Make a request to the agent API with retry logic. Retries on transient connection errors (connection reset, timeout). HTTP errors (4xx, 5xx) are not retried. """ url = f"{self.base_url}{path}" last_error = None for attempt in range(self._retries): data = json.dumps(body).encode("utf-8") if body else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} if data else {}, method=method, ) try: with urllib.request.urlopen(req, timeout=self._timeout) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw.strip() else {} except urllib.error.HTTPError as e: err_body = e.read().decode("utf-8", errors="replace") log.error(f"Agent API {method} {path} -> {e.code}: {err_body[:200]}") return None except (urllib.error.URLError, ConnectionError, TimeoutError, OSError) as e: last_error = e if attempt < self._retries - 1: log.warning( f"Agent API {method} {path} transient " f"(attempt {attempt + 1}/{self._retries}): {e}" ) time.sleep(1) else: log.error( f"Agent API {method} {path} failed " f"after {self._retries} attempts: {e}" ) except Exception as e: log.error(f"Agent API {method} {path} unexpected error: {e}") return None return None # ── Session Management ────────────────────────────────────────────── def create_session(self) -> Optional[str]: """Create a new agent session. Returns session ID or None.""" result = self._request("POST", "/api/session/new") if result and "id" in result: log.info(f"Created session: {result['id']}") return result["id"] log.error("Failed to create agent session") return None def rename_session(self, session_id: str, new_name: str) -> bool: """Rename a session to identify its purpose.""" result = self._request( "PATCH", f"/api/session/{session_id}", {"name": new_name} ) if result is not None and result.get("success") is not False: log.info(f"Renamed {session_id} -> '{new_name}'") return True log.warning(f"Failed to rename session {session_id}") return False # ── Messaging ─────────────────────────────────────────────────────── def send_message(self, session_id: str, content: str) -> bool: """Send a message to the agent. Returns True if accepted.""" result = self._request( "POST", f"/api/chat/{session_id}/message", {"message": content}, ) if result and result.get("ok"): log.info(f"Forwarded to session {session_id}") return True log.error(f"Agent rejected message: {result}") return False # ── State & Response Polling ──────────────────────────────────────── def get_session_state(self, session_id: str) -> Optional[dict]: """Get the current processing state of a session.""" return self._request("GET", f"/api/session/{session_id}/state") def get_session_messages(self, session_id: str) -> Optional[list]: """Get the element list for a session.""" result = self._request("GET", f"/api/session/{session_id}/messages") return result if isinstance(result, list) else None def get_message(self, message_id: str) -> Optional[dict]: """Get the full content of a message.""" return self._request("GET", f"/api/message/{message_id}") def wait_for_response( self, session_id: str, timeout: int = 300 ) -> Optional[str]: """Poll until the agent finishes, then return the last assistant message. **Phase 1:** Wait for ``{"streaming": true}`` to appear (agent started). **Phase 2:** Wait for streaming to end (agent finished). **Phase 3:** Fetch and return the last assistant message content. Intermediate tool calls are invisible — only the final response is returned. """ log.info(f"Waiting for agent (session {session_id})...") start = time.time() # Phase 1: Wait for agent to start streaming while time.time() - start < timeout: state = self.get_session_state(session_id) if state is None: log.warning("Session disappeared") return None if state.get("streaming"): log.info("Agent is now streaming") break if time.time() - start >= 3: # Could be a very fast response (no streaming phase) break time.sleep(0.3) # Phase 2: Wait for streaming to end while time.time() - start < timeout: state = self.get_session_state(session_id) if state is None: log.warning("Session disappeared during processing") return None if not state.get("streaming"): break time.sleep(0.5) # Phase 3: Fetch the last assistant message messages = self.get_session_messages(session_id) if not messages: log.warning("No messages in session after agent finished") return None for elem in reversed(messages): if elem.get("type") == "message": msg = self.get_message(elem["id"]) if msg and msg.get("role") == "assistant": return msg.get("content", "") log.warning("No assistant message found") return None