Files
ai-agents/agents/pirate/runtime.py
T
Eric Jungbauer eac7b64a90 The Pirate — Phase 1.a: conversational read-only media agent
Ships a chat-based agent at /pirate that LLM-routes user questions to media-stack tools and returns natural-language answers grounded in real data. Foundation built on top of the existing API-tokens + dual-auth infrastructure so other apps (Open WebUI, HA voice, Synap) can consume the same Pirate API.

New subsystem (not the standard trigger/result pattern):
- pirate_conversations + pirate_messages tables
- service_configs table (admin-wide creds shared by media agents)
- /api/pirate/chat + /api/pirate/conversations/* (dual-auth: user session OR Bearer token scoped to user's pirate instance)
- /api/internal/pirate/* endpoints used by runtime subprocess
- /api/admin/services + Services tab in admin.html for cred management
- Auto-seeded service_configs on startup from Media Stack Reference defaults (never overwrite admin edits)
- Auto-seeded pirate catalog entry + per-user pirate instance on startup

Pirate package (agents/pirate/):
- prompts.py: system prompt, enforces read-only in Phase 1
- runtime.py: Anthropic-native tool-use loop (max 8 iterations, persists every turn)
- tools/_common.py: service_configs fetch + qBit session auth
- tools/sonarr.py: queue, upcoming, series_search, library_stats
- tools/radarr.py: queue, movie_search, library_stats
- tools/qbittorrent.py: torrents, transfer_stats, categories
- tools/storage.py: disk_space (via Sonarr diskspace API)
- Default model: claude-sonnet-4-5 (Haiku fumbles multi-step chains)

Dashboard:
- static/pirate.html — full chat UI with conversation sidebar, suggestion chips, inline tool-call visualization, 24h idle reset + New Chat button
- Pirate button added to main dashboard header

Wiki reorg: Agents / Developer Guides / Plans parent docs, per-agent reference docs, The Pirate doc. API Clients + Calling Agents docs moved under Developer Guides.

Working folder: PIRATE_PHASE_1A.md + NEXT_SESSION_PROMPT.md for fast bootstrap.

Smoke tested end-to-end: real tool calls against qBittorrent (13 active torrents correctly reported) and Sonarr disk-space; multi-turn conversation state preserved across follow-up questions.

On deck: Phase 1.b (Lidarr/Whisparr/Overseerr/Plex tools), then 1.d (OWUI pipeline), then 1.c (HA voice).
2026-04-20 19:01:50 +00:00

239 lines
8.7 KiB
Python

"""The Pirate runtime — multi-turn tool-use loop.
Called as a subprocess from the dashboard's /api/pirate/chat endpoint:
PIRATE_CONVERSATION_ID=<id> PIRATE_USER_ID=<id> python3 -c \\
"from pirate.runtime import chat_turn; chat_turn()"
The conversation state (prior messages) lives in the dashboard DB. This runtime
loads the thread, runs the LLM loop, appends assistant + tool messages back via
internal dashboard APIs, and exits.
"""
import json
import os
import sys
from urllib import request as urlreq, error as urlerror
# Ensure agents/ is importable
_here = os.path.dirname(os.path.abspath(__file__))
_agents = os.path.dirname(_here)
sys.path.insert(0, _agents)
from shared import DASHBOARD_API, api_request # noqa: E402
from pirate import prompts # noqa: E402
from pirate.tools import build_catalog, find_tool # noqa: E402
MAX_TOOL_ITERATIONS = 8 # prevent runaway loops
# ---------- LLM provider resolution (reuse the dashboard's BYOLLM plumbing) ----------
def get_llm_config(user_id):
cfg = api_request(f"{DASHBOARD_API}/api/users/{user_id}/llm", retries=1)
if cfg.get("source") == "none":
raise RuntimeError("No LLM configured for this user. Set one up via the LLM button in the dashboard.")
return cfg
# ---------- Anthropic tool-use call ----------
def call_anthropic_with_tools(api_url, api_key, model, system, messages, tools, max_tokens=2000):
"""Send a messages/tool-use request to Anthropic. Returns the raw response dict."""
url = f"{api_url.rstrip('/')}/v1/messages"
body = {
"model": model,
"max_tokens": max_tokens,
"system": system,
"messages": messages,
}
# Convert our tool catalog to Anthropic's schema
if tools:
body["tools"] = [
{"name": t["name"], "description": t["description"], "input_schema": t["input_schema"]}
for t in tools
]
headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
req = urlreq.Request(url, data=json.dumps(body).encode(), headers=headers, method="POST")
try:
with urlreq.urlopen(req, timeout=120) as resp:
return json.loads(resp.read().decode())
except urlerror.HTTPError as e:
err_body = e.read().decode() if e.fp else ""
raise RuntimeError(f"Anthropic API error {e.code}: {err_body[:500]}")
# ---------- Conversation history → Anthropic messages ----------
def build_anthropic_messages(thread_messages):
"""Translate our DB message rows into Anthropic messages[] format.
Our DB: each row has role ('user'|'assistant'|'tool'), content, tool_calls, tool_call_id,
tool_name, tool_result.
Anthropic wants:
- user turn: {"role": "user", "content": "text"} OR content as array of tool_result blocks
- assistant turn with tools: {"role": "assistant", "content": [{"type":"text",...}, {"type":"tool_use","id","name","input"}]}
- tool results travel as USER turns with content = [{"type":"tool_result","tool_use_id","content"}]
"""
out = []
pending_tool_results = []
def flush_tool_results():
nonlocal pending_tool_results
if pending_tool_results:
out.append({"role": "user", "content": pending_tool_results})
pending_tool_results = []
for m in thread_messages:
role = m["role"]
if role == "user":
flush_tool_results()
out.append({"role": "user", "content": m["content"]})
elif role == "assistant":
flush_tool_results()
blocks = []
if m.get("content"):
blocks.append({"type": "text", "text": m["content"]})
for tc in (m.get("tool_calls") or []):
blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc.get("input", {}),
})
if not blocks:
# Anthropic rejects empty assistant content
blocks = [{"type": "text", "text": "..."}]
out.append({"role": "assistant", "content": blocks})
elif role == "tool":
tool_content = json.dumps(m.get("tool_result")) if m.get("tool_result") is not None else (m.get("content") or "")
pending_tool_results.append({
"type": "tool_result",
"tool_use_id": m.get("tool_call_id", ""),
"content": tool_content,
})
flush_tool_results()
return out
# ---------- Internal endpoints for persisting turns ----------
def load_conversation(conv_id):
return api_request(f"{DASHBOARD_API}/api/internal/pirate/conversation/{conv_id}", retries=1)
def append_message(conv_id, **fields):
return api_request(
f"{DASHBOARD_API}/api/internal/pirate/conversation/{conv_id}/messages",
data=fields, method="POST", retries=1,
)
# ---------- Main loop ----------
def chat_turn():
conv_id = int(os.environ["PIRATE_CONVERSATION_ID"])
user_id = int(os.environ["PIRATE_USER_ID"])
conv = load_conversation(conv_id)
thread = conv["messages"]
llm_cfg = get_llm_config(user_id)
provider = llm_cfg.get("provider_type", "anthropic")
if provider not in ("anthropic", "litellm"):
# Phase 1 supports Anthropic-native tool use. LiteLLM proxies Anthropic cleanly.
raise RuntimeError(f"Pirate Phase 1 requires an Anthropic-compatible LLM (got '{provider}').")
api_url = llm_cfg.get("api_url") or "https://api.anthropic.com"
api_key = llm_cfg.get("api_key", "")
# Pirate Phase 1 needs a capable tool-use model. Default to Sonnet 4.5;
# user can override via the LLM settings panel.
model = llm_cfg.get("default_model") or "claude-sonnet-4-5"
tools = build_catalog()
system = prompts.SYSTEM_PROMPT
for iteration in range(MAX_TOOL_ITERATIONS):
# Build the request from the current conversation state
anthropic_messages = build_anthropic_messages(thread)
if not anthropic_messages:
print("Empty conversation; nothing to send", file=sys.stderr)
return
response = call_anthropic_with_tools(
api_url, api_key, model, system, anthropic_messages, tools,
)
stop_reason = response.get("stop_reason", "")
usage = response.get("usage", {})
content_blocks = response.get("content", [])
# Extract the assistant's text + any tool_use blocks
assistant_text = ""
tool_calls = []
for block in content_blocks:
if block.get("type") == "text":
assistant_text += block.get("text", "")
elif block.get("type") == "tool_use":
tool_calls.append({
"id": block.get("id"),
"name": block.get("name"),
"input": block.get("input", {}),
})
# Persist the assistant turn
append_message(conv_id, role="assistant",
content=assistant_text,
tool_calls=tool_calls if tool_calls else None,
model=response.get("model", model),
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0))
# Mirror in-memory so the next iteration sees the new turn
thread.append({
"role": "assistant",
"content": assistant_text,
"tool_calls": tool_calls,
})
if stop_reason != "tool_use" or not tool_calls:
# Done — LLM returned its final answer
return
# Execute each tool call, persist results
for tc in tool_calls:
name = tc["name"]
args = tc.get("input") or {}
tool_def = find_tool(name)
if not tool_def:
result = {"error": f"Unknown tool: {name}"}
else:
try:
result = tool_def["fn"](**args)
except Exception as e:
result = {"error": f"{type(e).__name__}: {e}"}
append_message(conv_id, role="tool",
content="",
tool_call_id=tc["id"],
tool_name=name,
tool_result=result)
thread.append({
"role": "tool",
"tool_call_id": tc["id"],
"tool_name": name,
"tool_result": result,
})
# Loop again — feed tool results back to the LLM for the next turn
# Fell through the iteration cap
append_message(conv_id, role="assistant",
content="[Pirate hit the tool iteration limit. Try asking a more focused question.]")
if __name__ == "__main__":
chat_turn()