"""The Pirate runtime — multi-turn tool-use loop. Called as a subprocess from the dashboard's /api/pirate/chat endpoint: PIRATE_CONVERSATION_ID= PIRATE_USER_ID= python3 -c \\ "from pirate.runtime import chat_turn; chat_turn()" The conversation state (prior messages) lives in the dashboard DB. This runtime loads the thread, runs the LLM loop, appends assistant + tool messages back via internal dashboard APIs, and exits. """ import json import os import sys from urllib import request as urlreq, error as urlerror # Ensure agents/ is importable _here = os.path.dirname(os.path.abspath(__file__)) _agents = os.path.dirname(_here) sys.path.insert(0, _agents) from shared import DASHBOARD_API, api_request # noqa: E402 from pirate import prompts # noqa: E402 from pirate.tools import build_catalog, find_tool # noqa: E402 MAX_TOOL_ITERATIONS = 8 # prevent runaway loops # ---------- LLM provider resolution (reuse the dashboard's BYOLLM plumbing) ---------- def get_llm_config(user_id): cfg = api_request(f"{DASHBOARD_API}/api/users/{user_id}/llm", retries=1) if cfg.get("source") == "none": raise RuntimeError("No LLM configured for this user. Set one up via the LLM button in the dashboard.") return cfg # ---------- Anthropic tool-use call ---------- def call_anthropic_with_tools(api_url, api_key, model, system, messages, tools, max_tokens=2000): """Send a messages/tool-use request to Anthropic. Returns the raw response dict.""" url = f"{api_url.rstrip('/')}/v1/messages" body = { "model": model, "max_tokens": max_tokens, "system": system, "messages": messages, } # Convert our tool catalog to Anthropic's schema if tools: body["tools"] = [ {"name": t["name"], "description": t["description"], "input_schema": t["input_schema"]} for t in tools ] headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "content-type": "application/json", } req = urlreq.Request(url, data=json.dumps(body).encode(), headers=headers, method="POST") try: with urlreq.urlopen(req, timeout=120) as resp: return json.loads(resp.read().decode()) except urlerror.HTTPError as e: err_body = e.read().decode() if e.fp else "" raise RuntimeError(f"Anthropic API error {e.code}: {err_body[:500]}") # ---------- Conversation history → Anthropic messages ---------- def build_anthropic_messages(thread_messages): """Translate our DB message rows into Anthropic messages[] format. Our DB: each row has role ('user'|'assistant'|'tool'), content, tool_calls, tool_call_id, tool_name, tool_result. Anthropic wants: - user turn: {"role": "user", "content": "text"} OR content as array of tool_result blocks - assistant turn with tools: {"role": "assistant", "content": [{"type":"text",...}, {"type":"tool_use","id","name","input"}]} - tool results travel as USER turns with content = [{"type":"tool_result","tool_use_id","content"}] """ out = [] pending_tool_results = [] def flush_tool_results(): nonlocal pending_tool_results if pending_tool_results: out.append({"role": "user", "content": pending_tool_results}) pending_tool_results = [] for m in thread_messages: role = m["role"] if role == "user": flush_tool_results() out.append({"role": "user", "content": m["content"]}) elif role == "assistant": flush_tool_results() blocks = [] if m.get("content"): blocks.append({"type": "text", "text": m["content"]}) for tc in (m.get("tool_calls") or []): blocks.append({ "type": "tool_use", "id": tc["id"], "name": tc["name"], "input": tc.get("input", {}), }) if not blocks: # Anthropic rejects empty assistant content blocks = [{"type": "text", "text": "..."}] out.append({"role": "assistant", "content": blocks}) elif role == "tool": tool_content = json.dumps(m.get("tool_result")) if m.get("tool_result") is not None else (m.get("content") or "") pending_tool_results.append({ "type": "tool_result", "tool_use_id": m.get("tool_call_id", ""), "content": tool_content, }) flush_tool_results() return out # ---------- Internal endpoints for persisting turns ---------- def load_conversation(conv_id): return api_request(f"{DASHBOARD_API}/api/internal/pirate/conversation/{conv_id}", retries=1) def append_message(conv_id, **fields): return api_request( f"{DASHBOARD_API}/api/internal/pirate/conversation/{conv_id}/messages", data=fields, method="POST", retries=1, ) # ---------- Main loop ---------- def chat_turn(): conv_id = int(os.environ["PIRATE_CONVERSATION_ID"]) user_id = int(os.environ["PIRATE_USER_ID"]) conv = load_conversation(conv_id) thread = conv["messages"] llm_cfg = get_llm_config(user_id) provider = llm_cfg.get("provider_type", "anthropic") if provider not in ("anthropic", "litellm"): # Phase 1 supports Anthropic-native tool use. LiteLLM proxies Anthropic cleanly. raise RuntimeError(f"Pirate Phase 1 requires an Anthropic-compatible LLM (got '{provider}').") api_url = llm_cfg.get("api_url") or "https://api.anthropic.com" api_key = llm_cfg.get("api_key", "") # Pirate Phase 1 needs a capable tool-use model. Default to Sonnet 4.5; # user can override via the LLM settings panel. model = llm_cfg.get("default_model") or "claude-sonnet-4-5" tools = build_catalog() system = prompts.SYSTEM_PROMPT for iteration in range(MAX_TOOL_ITERATIONS): # Build the request from the current conversation state anthropic_messages = build_anthropic_messages(thread) if not anthropic_messages: print("Empty conversation; nothing to send", file=sys.stderr) return response = call_anthropic_with_tools( api_url, api_key, model, system, anthropic_messages, tools, ) stop_reason = response.get("stop_reason", "") usage = response.get("usage", {}) content_blocks = response.get("content", []) # Extract the assistant's text + any tool_use blocks assistant_text = "" tool_calls = [] for block in content_blocks: if block.get("type") == "text": assistant_text += block.get("text", "") elif block.get("type") == "tool_use": tool_calls.append({ "id": block.get("id"), "name": block.get("name"), "input": block.get("input", {}), }) # Persist the assistant turn append_message(conv_id, role="assistant", content=assistant_text, tool_calls=tool_calls if tool_calls else None, model=response.get("model", model), input_tokens=usage.get("input_tokens", 0), output_tokens=usage.get("output_tokens", 0)) # Mirror in-memory so the next iteration sees the new turn thread.append({ "role": "assistant", "content": assistant_text, "tool_calls": tool_calls, }) if stop_reason != "tool_use" or not tool_calls: # Done — LLM returned its final answer return # Execute each tool call, persist results for tc in tool_calls: name = tc["name"] args = tc.get("input") or {} tool_def = find_tool(name) if not tool_def: result = {"error": f"Unknown tool: {name}"} else: try: result = tool_def["fn"](**args) except Exception as e: result = {"error": f"{type(e).__name__}: {e}"} append_message(conv_id, role="tool", content="", tool_call_id=tc["id"], tool_name=name, tool_result=result) thread.append({ "role": "tool", "tool_call_id": tc["id"], "tool_name": name, "tool_result": result, }) # Loop again — feed tool results back to the LLM for the next turn # Fell through the iteration cap append_message(conv_id, role="assistant", content="[Pirate hit the tool iteration limit. Try asking a more focused question.]") if __name__ == "__main__": chat_turn()