Files
Eric Jungbauer 043aa18f3f API Clients + structured JSON results: app-level tokens for Synap/WSIT integration
- New api_clients + api_client_scopes tables; tokens scoped per-instance
- Admin UI tab at /admin for token create/rotate/revoke/delete with one-time reveal
- Dual-auth dependency (user session OR Bearer app token) on trigger + runs endpoints
- /api/instances/{id}/trigger pre-creates a run and returns run_id + cached last_result instantly
- New GET /api/runs/{id} for polling
- Generic trigger path for sub-agent instances (weather, calendar, etc.)
- runs.result column for structured JSON alongside markdown output
- agent_catalog.result_schema describes each agent's result shape
- Weather, daily-briefing, project-monitor retrofitted to emit structured results
- log_run: env INSTANCE_ID/RUN_ID only used when target matches, so nested sub-agents don't clobber parent runs
- Wiki docs: API Clients & Token Scoping + Calling Agents From Your Apps
2026-04-20 17:54:32 +00:00

155 lines
5.6 KiB
Python

"""Shared utilities for all agents."""
import json
import os
import sys
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from urllib import request, error as urlerror
MT = ZoneInfo("America/Denver")
DASHBOARD_API = os.environ.get("DASHBOARD_API", "http://localhost:8550")
WIKI_API = "https://wiki.jfamily.io/api"
WIKI_TOKEN = os.environ.get("WIKI_TOKEN", "ol_api_yHXypRyqf4CscWDzPluGfPev9GhdFg6mwrXwkT")
WIKI_COLLECTION_ID = os.environ.get("WIKI_COLLECTION_ID", "9d9e471c-84cd-4ba7-bae5-c70f61805228")
WIKI_PARENT_DOC_ID = os.environ.get("WIKI_PARENT_DOC_ID", "2a891fe8-579b-450b-a663-de93915896b7") # Eric's Daily Briefing
MONTH_NAMES = [
"", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December",
]
# Retry config
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF = 2 # seconds, doubles each retry
RETRIABLE_CODES = {408, 429, 500, 502, 503, 504}
def api_request(url, data=None, headers=None, method="GET", retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF):
"""HTTP helper with automatic retry on transient failures."""
if data is not None:
data = json.dumps(data).encode("utf-8")
last_error = None
for attempt in range(retries + 1):
try:
req = request.Request(url, data=data, headers=headers or {}, method=method)
if data:
req.add_header("Content-Type", "application/json")
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urlerror.HTTPError as e:
last_error = e
if e.code in RETRIABLE_CODES and attempt < retries:
wait = backoff * (2 ** attempt)
print(f" Retry {attempt + 1}/{retries} after {e.code} from {url} (waiting {wait}s)", file=sys.stderr)
time.sleep(wait)
continue
raise
except (urlerror.URLError, TimeoutError, ConnectionError, OSError) as e:
last_error = e
if attempt < retries:
wait = backoff * (2 ** attempt)
print(f" Retry {attempt + 1}/{retries} after {type(e).__name__} from {url} (waiting {wait}s)", file=sys.stderr)
time.sleep(wait)
continue
raise
raise last_error
def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None, result=None, run_id=None):
"""Log a run to the dashboard API.
Resolution rules:
* `instance_id`: explicit wins; else env INSTANCE_ID; else skip (no run logged).
* `run_id`: explicit wins; else env RUN_ID — but ONLY when the target instance matches
env INSTANCE_ID (the subprocess was triggered for THIS instance). A sub-agent called
inside a briefing (e.g. project_monitor with its own instance_id) does NOT inherit
the briefing's RUN_ID and will create a fresh run row instead.
Pass `result` (dict) to populate the structured-output column that API consumers read.
"""
try:
env_inst = int(os.environ["INSTANCE_ID"]) if os.environ.get("INSTANCE_ID") else None
env_run = int(os.environ["RUN_ID"]) if os.environ.get("RUN_ID") else None
target_instance_id = instance_id if instance_id is not None else env_inst
if target_instance_id is None:
print(f"Warning: no instance_id, run not logged for {agent_id}", file=sys.stderr)
return
target_run_id = run_id
if target_run_id is None and env_run is not None and env_inst == target_instance_id:
target_run_id = env_run
payload = {
"status": status,
"output": output,
"error": err,
"metadata": metadata or {},
}
if result is not None:
payload["result"] = result
if target_run_id:
payload["run_id"] = target_run_id
api_request(
f"{DASHBOARD_API}/api/instances/{target_instance_id}/runs",
data=payload,
method="POST",
retries=1, # Don't retry logging too aggressively
)
except Exception as e:
print(f"Warning: failed to log run to dashboard: {e}", file=sys.stderr)
def get_instance_config(instance_id):
"""Fetch an instance's config from the dashboard API. Used by agents run from /trigger."""
return api_request(f"{DASHBOARD_API}/api/instances/{instance_id}/config")
def wiki_headers():
return {"Authorization": f"Bearer {WIKI_TOKEN}"}
def find_child_doc(parent_id, title, collection_id=None):
"""Search for a child doc by title under a given parent."""
coll = collection_id or WIKI_COLLECTION_ID
result = api_request(
f"{WIKI_API}/documents.search",
data={"query": title, "collectionId": coll},
headers=wiki_headers(),
method="POST",
)
for doc in result.get("data", []):
d = doc.get("document", {})
if d.get("title") == title and d.get("parentDocumentId") == parent_id:
return d["id"]
return None
def ensure_child_doc(parent_id, title, body, collection_id=None):
"""Find or create a child doc under a parent. Returns doc ID."""
coll = collection_id or WIKI_COLLECTION_ID
doc_id = find_child_doc(parent_id, title, collection_id=coll)
if doc_id:
return doc_id
result = api_request(
f"{WIKI_API}/documents.create",
data={
"title": title,
"text": body,
"collectionId": coll,
"parentDocumentId": parent_id,
"publish": True,
},
headers=wiki_headers(),
method="POST",
)
print(f"Created wiki doc: {title}")
return result["data"]["id"]