"""Shared utilities for all agents.""" import json import os import sys import time from datetime import datetime from zoneinfo import ZoneInfo from urllib import request, error as urlerror MT = ZoneInfo("America/Denver") DASHBOARD_API = os.environ.get("DASHBOARD_API", "http://localhost:8550") WIKI_API = "https://wiki.jfamily.io/api" WIKI_TOKEN = os.environ.get("WIKI_TOKEN", "ol_api_yHXypRyqf4CscWDzPluGfPev9GhdFg6mwrXwkT") WIKI_COLLECTION_ID = os.environ.get("WIKI_COLLECTION_ID", "9d9e471c-84cd-4ba7-bae5-c70f61805228") WIKI_PARENT_DOC_ID = os.environ.get("WIKI_PARENT_DOC_ID", "2a891fe8-579b-450b-a663-de93915896b7") # Eric's Daily Briefing MONTH_NAMES = [ "", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December", ] # Retry config DEFAULT_RETRIES = 3 DEFAULT_BACKOFF = 2 # seconds, doubles each retry RETRIABLE_CODES = {408, 429, 500, 502, 503, 504} def api_request(url, data=None, headers=None, method="GET", retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF): """HTTP helper with automatic retry on transient failures.""" if data is not None: data = json.dumps(data).encode("utf-8") last_error = None for attempt in range(retries + 1): try: req = request.Request(url, data=data, headers=headers or {}, method=method) if data: req.add_header("Content-Type", "application/json") with request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except urlerror.HTTPError as e: last_error = e if e.code in RETRIABLE_CODES and attempt < retries: wait = backoff * (2 ** attempt) print(f" Retry {attempt + 1}/{retries} after {e.code} from {url} (waiting {wait}s)", file=sys.stderr) time.sleep(wait) continue raise except (urlerror.URLError, TimeoutError, ConnectionError, OSError) as e: last_error = e if attempt < retries: wait = backoff * (2 ** attempt) print(f" Retry {attempt + 1}/{retries} after {type(e).__name__} from {url} (waiting {wait}s)", file=sys.stderr) time.sleep(wait) continue raise raise last_error def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None): """Log a run to the dashboard API.""" try: if instance_id: api_request( f"{DASHBOARD_API}/api/instances/{instance_id}/runs", data={"status": status, "output": output, "error": err, "metadata": metadata or {}}, method="POST", retries=1, # Don't retry logging too aggressively ) else: print(f"Warning: no instance_id, run not logged for {agent_id}", file=sys.stderr) except Exception as e: print(f"Warning: failed to log run to dashboard: {e}", file=sys.stderr) def wiki_headers(): return {"Authorization": f"Bearer {WIKI_TOKEN}"} def find_child_doc(parent_id, title): """Search for a child doc by title under a given parent.""" result = api_request( f"{WIKI_API}/documents.search", data={"query": title, "collectionId": WIKI_COLLECTION_ID}, headers=wiki_headers(), method="POST", ) for doc in result.get("data", []): d = doc.get("document", {}) if d.get("title") == title and d.get("parentDocumentId") == parent_id: return d["id"] return None def ensure_child_doc(parent_id, title, body): """Find or create a child doc under a parent. Returns doc ID.""" doc_id = find_child_doc(parent_id, title) if doc_id: return doc_id result = api_request( f"{WIKI_API}/documents.create", data={ "title": title, "text": body, "collectionId": WIKI_COLLECTION_ID, "parentDocumentId": parent_id, "publish": True, }, headers=wiki_headers(), method="POST", ) print(f"Created wiki doc: {title}") return result["data"]["id"]