API Clients + structured JSON results: app-level tokens for Synap/WSIT integration

- New api_clients + api_client_scopes tables; tokens scoped per-instance
- Admin UI tab at /admin for token create/rotate/revoke/delete with one-time reveal
- Dual-auth dependency (user session OR Bearer app token) on trigger + runs endpoints
- /api/instances/{id}/trigger pre-creates a run and returns run_id + cached last_result instantly
- New GET /api/runs/{id} for polling
- Generic trigger path for sub-agent instances (weather, calendar, etc.)
- runs.result column for structured JSON alongside markdown output
- agent_catalog.result_schema describes each agent's result shape
- Weather, daily-briefing, project-monitor retrofitted to emit structured results
- log_run: env INSTANCE_ID/RUN_ID only used when target matches, so nested sub-agents don't clobber parent runs
- Wiki docs: API Clients & Token Scoping + Calling Agents From Your Apps
This commit is contained in:
Eric Jungbauer
2026-04-20 17:54:32 +00:00
parent f01553c511
commit 043aa18f3f
8 changed files with 983 additions and 111 deletions
+116 -44
View File
@@ -14,10 +14,29 @@ from shared import (
)
def collect_sections(config):
"""Run each sub-agent and collect markdown sections.
def _invoke_sub_agent(module_name, **kwargs):
"""Call a sub-agent, preferring its run_structured() contract if available.
Returns a list of (section_name, markdown, summary) tuples.
Returns dict with keys: markdown, summary, result (result may be None for
agents that haven't been retrofitted yet). Raises on agent failure.
"""
mod = __import__(module_name)
if hasattr(mod, "run_structured"):
out = mod.run_structured(**kwargs)
return {
"markdown": out.get("markdown", ""),
"summary": out.get("summary", ""),
"result": out.get("result"),
}
# Legacy contract: returns (markdown, summary)
md, summary = mod.run(**kwargs)
return {"markdown": md, "summary": summary, "result": None}
def collect_sections(config):
"""Run each sub-agent and collect output.
Returns a list of section dicts: {name, key, markdown, summary, result}.
Failed sub-agents are logged but don't stop the briefing.
"""
sections = []
@@ -25,88 +44,114 @@ def collect_sections(config):
# --- Weather ---
try:
from weather_agent import run as weather_run
md, summary = weather_run(location=location)
sections.append(("Weather", md, summary))
print(f" Weather: {summary}")
out = _invoke_sub_agent("weather_agent", location=location)
sections.append({"name": "Weather", "key": "weather", **out})
print(f" Weather: {out['summary']}")
except Exception as e:
print(f" Weather failed: {e}", file=sys.stderr)
sections.append(("Weather", "## Weather\n\n*Weather data unavailable.*\n", f"error: {e}"))
sections.append({"name": "Weather", "key": "weather",
"markdown": "## Weather\n\n*Weather data unavailable.*\n",
"summary": f"error: {e}", "result": None, "error": str(e)})
# --- Calendar ---
calendars = config.get("calendars", [])
try:
from calendar_agent import run as calendar_run
md, summary = calendar_run(calendars_config=calendars)
sections.append(("Calendar", md, summary))
print(f" Calendar: {summary}")
out = _invoke_sub_agent("calendar_agent", calendars_config=calendars)
sections.append({"name": "Calendar", "key": "calendar", **out})
print(f" Calendar: {out['summary']}")
except Exception as e:
print(f" Calendar failed: {e}", file=sys.stderr)
sections.append(("Calendar", "## Calendar\n\n*Calendar data unavailable.*\n", f"error: {e}"))
sections.append({"name": "Calendar", "key": "calendar",
"markdown": "## Calendar\n\n*Calendar data unavailable.*\n",
"summary": f"error: {e}", "result": None, "error": str(e)})
# --- Reminders (CalDAV VTODO) ---
reminder_sources = config.get("reminder_sources", [])
reminder_mode = config.get("reminder_mode", "due_today_3days")
if reminder_sources:
try:
from reminders_agent import run as reminders_run
md, summary = reminders_run(reminders_config=reminder_sources, mode=reminder_mode)
sections.append(("Reminders", md, summary))
print(f" Reminders: {summary}")
out = _invoke_sub_agent("reminders_agent", reminders_config=reminder_sources, mode=reminder_mode)
sections.append({"name": "Reminders", "key": "reminders", **out})
print(f" Reminders: {out['summary']}")
except Exception as e:
print(f" Reminders failed: {e}", file=sys.stderr)
sections.append(("Reminders", "## Reminders\n\n*Unavailable.*\n", f"error: {e}"))
sections.append({"name": "Reminders", "key": "reminders",
"markdown": "## Reminders\n\n*Unavailable.*\n",
"summary": f"error: {e}", "result": None, "error": str(e)})
# --- Notes (via Mac bridge) ---
notes_config = config.get("notes", {})
if notes_config.get("enabled", False):
try:
from notes_agent import run as notes_run
md, summary = notes_run(config=notes_config)
sections.append(("Notes", md, summary))
print(f" Notes: {summary}")
out = _invoke_sub_agent("notes_agent", config=notes_config)
sections.append({"name": "Notes", "key": "notes", **out})
print(f" Notes: {out['summary']}")
except Exception as e:
print(f" Notes failed: {e}", file=sys.stderr)
sections.append(("Notes", "## Recent Notes\n\n*Unavailable.*\n", f"error: {e}"))
sections.append({"name": "Notes", "key": "notes",
"markdown": "## Recent Notes\n\n*Unavailable.*\n",
"summary": f"error: {e}", "result": None, "error": str(e)})
# --- Reading List (via Mac bridge) ---
reading_config = config.get("reading_list", {})
if reading_config.get("enabled", False):
try:
from reading_list_agent import run as reading_list_run
md, summary = reading_list_run(config=reading_config)
sections.append(("Reading List", md, summary))
print(f" Reading List: {summary}")
out = _invoke_sub_agent("reading_list_agent", config=reading_config)
sections.append({"name": "Reading List", "key": "reading_list", **out})
print(f" Reading List: {out['summary']}")
except Exception as e:
print(f" Reading List failed: {e}", file=sys.stderr)
sections.append(("Reading List", "## Reading List\n\n*Unavailable.*\n", f"error: {e}"))
sections.append({"name": "Reading List", "key": "reading_list",
"markdown": "## Reading List\n\n*Unavailable.*\n",
"summary": f"error: {e}", "result": None, "error": str(e)})
# --- Project Monitors (LLM-powered) ---
instance_id = config.get("instance_id", 0)
user_id = config.get("user_id", 0)
if user_id:
try:
# Fetch this user's project-monitor instances that are set to include in briefing
pm_instances = api_request(
f"{DASHBOARD_API}/api/instances/by-user/{user_id}?catalog_id=project-monitor",
retries=1,
)
project_sections = []
project_sections_md = []
project_results = []
for pm in pm_instances:
pm_config = pm.get("config", {})
if str(pm_config.get("include_in_briefing", "false")).lower() != "true":
continue
try:
from project_monitor import run as pm_run
md, summary = pm_run(pm_config, user_id=user_id, instance_id=pm.get("id"))
project_sections.append(md)
print(f" Project [{pm_config.get('project_name', '?')}]: {summary[:80]}")
pm_out = pm_run(pm_config, user_id=user_id, instance_id=pm.get("id"))
# project_monitor.run may return (md, summary) or {markdown, summary, result}
if isinstance(pm_out, dict):
project_sections_md.append(pm_out.get("markdown", ""))
project_results.append({
"project_name": pm_config.get("project_name", ""),
"summary": pm_out.get("summary", ""),
"result": pm_out.get("result"),
})
print(f" Project [{pm_config.get('project_name', '?')}]: {pm_out.get('summary','')[:80]}")
else:
md, summary = pm_out
project_sections_md.append(md)
project_results.append({
"project_name": pm_config.get("project_name", ""),
"summary": summary,
"result": None,
})
print(f" Project [{pm_config.get('project_name', '?')}]: {summary[:80]}")
except Exception as e:
print(f" Project [{pm_config.get('project_name', '?')}] failed: {e}", file=sys.stderr)
if project_sections:
combined = "## Projects\n\n" + "\n\n".join(project_sections)
sections.append(("Projects", combined, f"{len(project_sections)} project(s)"))
if project_sections_md:
combined = "## Projects\n\n" + "\n\n".join(project_sections_md)
sections.append({
"name": "Projects", "key": "projects",
"markdown": combined,
"summary": f"{len(project_sections_md)} project(s)",
"result": {"projects": project_results},
})
except Exception as e:
print(f" Project monitors skipped: {e}", file=sys.stderr)
@@ -127,8 +172,8 @@ def compose_briefing(config, sections):
md += f"**{date_str}** | {loc_label}\n\n"
md += "---\n\n"
for _name, section_md, _summary in sections:
md += section_md + "\n\n"
for s in sections:
md += s["markdown"] + "\n\n"
md += "---\n"
md += f"*Generated at {now.strftime('%I:%M %p MT')} by {person}'s Daily Briefing Agent*\n"
@@ -136,6 +181,29 @@ def compose_briefing(config, sections):
return md
def compose_result(config, sections, wiki_doc_id=None, wiki_action=None):
"""Compose the structured result dict that API consumers read. Mirrors the markdown but
as data — each sub-agent's result is nested by key."""
now = datetime.now(MT)
return {
"date": now.strftime("%Y-%m-%d"),
"generated_at": now.isoformat(),
"person": config.get("person", ""),
"location": config.get("location", {}),
"sections": {
s["key"]: {
"name": s["name"],
"summary": s.get("summary", ""),
"result": s.get("result"), # may be None for not-yet-retrofitted agents
"error": s.get("error"),
}
for s in sections
},
"wiki_doc_id": wiki_doc_id,
"wiki_action": wiki_action,
}
def post_to_wiki(config, markdown, date_str):
"""Post the briefing to wiki under Year/Month hierarchy."""
wiki_collection = config.get("wiki_collection_id", DEFAULT_WIKI_COLLECTION)
@@ -218,13 +286,17 @@ def run(config):
doc_id, action = post_to_wiki(config, markdown, date_str)
print(f"Wiki doc {action}: {doc_id}")
summaries = "; ".join(f"{name}: {s}" for name, _, s in sections)
result = compose_result(config, sections, wiki_doc_id=doc_id, wiki_action=action)
summaries = "; ".join(f"{s['name']}: {s.get('summary','')}" for s in sections)
output = f"Briefing {action}. {summaries}"
log_run(agent_id, "success", output=output, instance_id=instance_id, metadata={
"wiki_doc_id": doc_id,
"action": action,
"sub_agents": [name for name, _, _ in sections],
})
log_run(agent_id, "success", output=markdown, instance_id=instance_id,
result=result,
metadata={
"wiki_doc_id": doc_id,
"action": action,
"sub_agents": [s["name"] for s in sections],
"summary": summaries,
})
print(f"Done: {output}")
except Exception as e:
+35 -17
View File
@@ -283,7 +283,8 @@ def run(config, user_id=None, instance_id=None):
project_name = config.get("project_name", "Unknown Project")
if not user_id:
return f"## {project_name}\n\n*No user context for LLM.*\n", "error: no user_id"
md = f"## {project_name}\n\n*No user context for LLM.*\n"
return {"markdown": md, "summary": "error: no user_id", "result": None}
print(f" Collecting data for {project_name}...")
@@ -313,16 +314,17 @@ def run(config, user_id=None, instance_id=None):
print(f" Calling LLM for analysis...")
try:
result = llm_complete(user_id, prompt, system=SYSTEM_PROMPT, max_tokens=2000)
llm_result = llm_complete(user_id, prompt, system=SYSTEM_PROMPT, max_tokens=2000)
except RuntimeError as e:
err = str(e)
log_run(AGENT_ID, "failed", err=err, instance_id=instance_id)
return f"## {project_name}\n\n*LLM error: {err}*\n", f"error: {err}"
md = f"## {project_name}\n\n*LLM error: {err}*\n"
return {"markdown": md, "summary": f"error: {err}", "result": None}
report_md = result["text"]
model = result["model"]
tokens_in = result["input_tokens"]
tokens_out = result["output_tokens"]
report_md = llm_result["text"]
model = llm_result["model"]
tokens_in = llm_result["input_tokens"]
tokens_out = llm_result["output_tokens"]
print(f" LLM: {model}, {tokens_in}+{tokens_out} tokens")
@@ -372,15 +374,31 @@ def run(config, user_id=None, instance_id=None):
if links_md:
section += f"\n{links_md}\n"
log_run(AGENT_ID, "success", output=f"{project_name}: {summary[:100]}", instance_id=instance_id, metadata={
"project": project_name,
"model": model,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
structured = {
"project_name": project_name,
"app_url": app_url or None,
"wiki_collection_id": wiki_collection or None,
"wiki_report_id": doc_id,
})
"gitea_repo": gitea_repo or None,
"summary": summary,
"report_markdown": report_md,
"model": model,
"tokens": {"input": tokens_in, "output": tokens_out},
"generated_at": datetime.now(MT).isoformat(),
}
return section, summary
log_run(AGENT_ID, "success", output=f"{project_name}: {summary[:100]}",
instance_id=instance_id,
result=structured,
metadata={
"project": project_name,
"model": model,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"wiki_report_id": doc_id,
})
return {"markdown": section, "summary": summary, "result": structured}
if __name__ == "__main__":
@@ -398,6 +416,6 @@ if __name__ == "__main__":
"wiki_collection_id": args.wiki_collection,
"gitea_repo": args.gitea_repo,
}
section, summary = run(config, user_id=args.user_id, instance_id=args.instance_id)
print(section)
print(f"\nSummary: {summary}")
out = run(config, user_id=args.user_id, instance_id=args.instance_id)
print(out["markdown"])
print(f"\nSummary: {out['summary']}")
+45 -10
View File
@@ -61,22 +61,57 @@ def api_request(url, data=None, headers=None, method="GET", retries=DEFAULT_RETR
raise last_error
def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None):
"""Log a run to the dashboard API."""
def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None, result=None, run_id=None):
"""Log a run to the dashboard API.
Resolution rules:
* `instance_id`: explicit wins; else env INSTANCE_ID; else skip (no run logged).
* `run_id`: explicit wins; else env RUN_ID — but ONLY when the target instance matches
env INSTANCE_ID (the subprocess was triggered for THIS instance). A sub-agent called
inside a briefing (e.g. project_monitor with its own instance_id) does NOT inherit
the briefing's RUN_ID and will create a fresh run row instead.
Pass `result` (dict) to populate the structured-output column that API consumers read.
"""
try:
if instance_id:
api_request(
f"{DASHBOARD_API}/api/instances/{instance_id}/runs",
data={"status": status, "output": output, "error": err, "metadata": metadata or {}},
method="POST",
retries=1, # Don't retry logging too aggressively
)
else:
env_inst = int(os.environ["INSTANCE_ID"]) if os.environ.get("INSTANCE_ID") else None
env_run = int(os.environ["RUN_ID"]) if os.environ.get("RUN_ID") else None
target_instance_id = instance_id if instance_id is not None else env_inst
if target_instance_id is None:
print(f"Warning: no instance_id, run not logged for {agent_id}", file=sys.stderr)
return
target_run_id = run_id
if target_run_id is None and env_run is not None and env_inst == target_instance_id:
target_run_id = env_run
payload = {
"status": status,
"output": output,
"error": err,
"metadata": metadata or {},
}
if result is not None:
payload["result"] = result
if target_run_id:
payload["run_id"] = target_run_id
api_request(
f"{DASHBOARD_API}/api/instances/{target_instance_id}/runs",
data=payload,
method="POST",
retries=1, # Don't retry logging too aggressively
)
except Exception as e:
print(f"Warning: failed to log run to dashboard: {e}", file=sys.stderr)
def get_instance_config(instance_id):
"""Fetch an instance's config from the dashboard API. Used by agents run from /trigger."""
return api_request(f"{DASHBOARD_API}/api/instances/{instance_id}/config")
def wiki_headers():
return {"Authorization": f"Bearer {WIKI_TOKEN}"}
+101 -8
View File
@@ -5,9 +5,10 @@ Fetches weather for a configurable location and returns structured data + markdo
Called by Daily Briefing agents with location config. Can also run standalone.
"""
import os
import sys
from datetime import datetime
from shared import MT, api_request, log_run
from shared import MT, api_request, log_run, get_instance_config
AGENT_ID = "weather"
@@ -116,18 +117,106 @@ def format_section(weather, location=None):
return md, summary
def run(location=None):
"""Run the weather agent. Returns (markdown_section, summary) or raises."""
def build_result(weather, location):
"""Extract the structured 'result' dict. This is what API consumers (Synap, WSIT) read."""
loc = location or DEFAULT_LOCATION
current = weather["current"]
daily = weather["daily"]
condition = WMO_CODES.get(current["weather_code"], "Unknown")
forecast = []
for i in range(len(daily["time"])):
forecast.append({
"date": daily["time"][i],
"weekday": DAY_NAMES[datetime.strptime(daily["time"][i], "%Y-%m-%d").weekday()],
"condition": WMO_CODES.get(daily["weather_code"][i], "Unknown"),
"weather_code": daily["weather_code"][i],
"temp_high_f": round(daily["temperature_2m_max"][i]),
"temp_low_f": round(daily["temperature_2m_min"][i]),
"precip_in": daily["precipitation_sum"][i],
"wind_max_mph": round(daily["wind_speed_10m_max"][i]),
"sunrise": daily["sunrise"][i],
"sunset": daily["sunset"][i],
})
return {
"location": {
"name": loc.get("name", ""),
"state": loc.get("state", ""),
"country": loc.get("country", ""),
"lat": loc["lat"],
"lon": loc["lon"],
"label": location_label(loc),
},
"current": {
"condition": condition,
"weather_code": current["weather_code"],
"temperature_f": round(current["temperature_2m"]),
"feels_like_f": round(current["apparent_temperature"]),
"wind_mph": round(current["wind_speed_10m"]),
"humidity_pct": current["relative_humidity_2m"],
},
"forecast": forecast,
"fetched_at": datetime.now(MT).isoformat(),
}
def run_structured(location=None):
"""Fetch weather and return {result, markdown, summary}. This is the new contract —
structured data for API consumers, markdown for the wiki."""
loc = location or DEFAULT_LOCATION
weather = fetch_weather(loc)
section, summary = format_section(weather, loc)
log_run(AGENT_ID, "success", output=summary, metadata={"location": location_label(loc)})
return section, summary
result = build_result(weather, loc)
return {"result": result, "markdown": section, "summary": summary}
def run(location=None):
"""Legacy entrypoint: returns (markdown_section, summary). Kept for backward compat
with daily_briefing.py's older call sites. Prefer run_structured().
Deliberately does NOT call log_run — logging is the caller's responsibility.
"""
out = run_structured(location)
return out["markdown"], out["summary"]
def _location_from_config(cfg):
"""Map an instance config dict to a location dict. Config keys: name, state, country, lat, lon."""
if not cfg or "lat" not in cfg or "lon" not in cfg:
return DEFAULT_LOCATION
return {
"name": cfg.get("name", "Custom"),
"state": cfg.get("state", ""),
"country": cfg.get("country", "US"),
"lat": cfg["lat"],
"lon": cfg["lon"],
}
def _main_from_api():
"""Entry point when invoked via /api/instances/{id}/trigger. Reads INSTANCE_ID + RUN_ID
from env, fetches config, runs, and posts structured result back to the dashboard."""
instance_id = int(os.environ["INSTANCE_ID"])
try:
cfg = get_instance_config(instance_id)
loc = _location_from_config(cfg)
out = run_structured(loc)
log_run(AGENT_ID, "success", output=out["markdown"],
result=out["result"],
metadata={"location": out["result"]["location"]["label"], "summary": out["summary"]})
print(out["summary"])
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
print(f"Error: {err_msg}", file=sys.stderr)
log_run(AGENT_ID, "failed", err=err_msg)
sys.exit(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Weather Agent")
parser.add_argument("--from-api", action="store_true",
help="Run from /trigger: read INSTANCE_ID/RUN_ID from env, fetch config, post result")
parser.add_argument("--lat", type=float, help="Latitude")
parser.add_argument("--lon", type=float, help="Longitude")
parser.add_argument("--name", type=str, help="City/location name")
@@ -135,6 +224,10 @@ if __name__ == "__main__":
parser.add_argument("--country", type=str, default="US", help="Country code")
args = parser.parse_args()
if args.from_api:
_main_from_api()
sys.exit(0)
loc = DEFAULT_LOCATION
if args.lat and args.lon:
loc = {
@@ -146,9 +239,9 @@ if __name__ == "__main__":
}
try:
section, summary = run(loc)
print(section)
print(f"\nSummary: {summary}")
out = run_structured(loc)
print(out["markdown"])
print(f"\nSummary: {out['summary']}")
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
print(f"Error: {err_msg}", file=sys.stderr)