Files
ai-agents/agents/shared.py
T

118 lines
4.1 KiB
Python

"""Shared utilities for all agents."""
import json
import os
import sys
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from urllib import request, error as urlerror
MT = ZoneInfo("America/Denver")
DASHBOARD_API = os.environ.get("DASHBOARD_API", "http://localhost:8550")
WIKI_API = "https://wiki.jfamily.io/api"
WIKI_TOKEN = os.environ.get("WIKI_TOKEN", "ol_api_yHXypRyqf4CscWDzPluGfPev9GhdFg6mwrXwkT")
WIKI_COLLECTION_ID = os.environ.get("WIKI_COLLECTION_ID", "9d9e471c-84cd-4ba7-bae5-c70f61805228")
WIKI_PARENT_DOC_ID = os.environ.get("WIKI_PARENT_DOC_ID", "2a891fe8-579b-450b-a663-de93915896b7") # Eric's Daily Briefing
MONTH_NAMES = [
"", "January", "February", "March", "April", "May", "June",
"July", "August", "September", "October", "November", "December",
]
# Retry config
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF = 2 # seconds, doubles each retry
RETRIABLE_CODES = {408, 429, 500, 502, 503, 504}
def api_request(url, data=None, headers=None, method="GET", retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF):
"""HTTP helper with automatic retry on transient failures."""
if data is not None:
data = json.dumps(data).encode("utf-8")
last_error = None
for attempt in range(retries + 1):
try:
req = request.Request(url, data=data, headers=headers or {}, method=method)
if data:
req.add_header("Content-Type", "application/json")
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urlerror.HTTPError as e:
last_error = e
if e.code in RETRIABLE_CODES and attempt < retries:
wait = backoff * (2 ** attempt)
print(f" Retry {attempt + 1}/{retries} after {e.code} from {url} (waiting {wait}s)", file=sys.stderr)
time.sleep(wait)
continue
raise
except (urlerror.URLError, TimeoutError, ConnectionError, OSError) as e:
last_error = e
if attempt < retries:
wait = backoff * (2 ** attempt)
print(f" Retry {attempt + 1}/{retries} after {type(e).__name__} from {url} (waiting {wait}s)", file=sys.stderr)
time.sleep(wait)
continue
raise
raise last_error
def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None):
"""Log a run to the dashboard API."""
try:
if instance_id:
api_request(
f"{DASHBOARD_API}/api/instances/{instance_id}/runs",
data={"status": status, "output": output, "error": err, "metadata": metadata or {}},
method="POST",
retries=1, # Don't retry logging too aggressively
)
else:
print(f"Warning: no instance_id, run not logged for {agent_id}", file=sys.stderr)
except Exception as e:
print(f"Warning: failed to log run to dashboard: {e}", file=sys.stderr)
def wiki_headers():
return {"Authorization": f"Bearer {WIKI_TOKEN}"}
def find_child_doc(parent_id, title):
"""Search for a child doc by title under a given parent."""
result = api_request(
f"{WIKI_API}/documents.search",
data={"query": title, "collectionId": WIKI_COLLECTION_ID},
headers=wiki_headers(),
method="POST",
)
for doc in result.get("data", []):
d = doc.get("document", {})
if d.get("title") == title and d.get("parentDocumentId") == parent_id:
return d["id"]
return None
def ensure_child_doc(parent_id, title, body):
"""Find or create a child doc under a parent. Returns doc ID."""
doc_id = find_child_doc(parent_id, title)
if doc_id:
return doc_id
result = api_request(
f"{WIKI_API}/documents.create",
data={
"title": title,
"text": body,
"collectionId": WIKI_COLLECTION_ID,
"parentDocumentId": parent_id,
"publish": True,
},
headers=wiki_headers(),
method="POST",
)
print(f"Created wiki doc: {title}")
return result["data"]["id"]