diff --git a/agents/daily_briefing.py b/agents/daily_briefing.py index 982ec22..fa1f02b 100644 --- a/agents/daily_briefing.py +++ b/agents/daily_briefing.py @@ -14,10 +14,29 @@ from shared import ( ) -def collect_sections(config): - """Run each sub-agent and collect markdown sections. +def _invoke_sub_agent(module_name, **kwargs): + """Call a sub-agent, preferring its run_structured() contract if available. - Returns a list of (section_name, markdown, summary) tuples. + Returns dict with keys: markdown, summary, result (result may be None for + agents that haven't been retrofitted yet). Raises on agent failure. + """ + mod = __import__(module_name) + if hasattr(mod, "run_structured"): + out = mod.run_structured(**kwargs) + return { + "markdown": out.get("markdown", ""), + "summary": out.get("summary", ""), + "result": out.get("result"), + } + # Legacy contract: returns (markdown, summary) + md, summary = mod.run(**kwargs) + return {"markdown": md, "summary": summary, "result": None} + + +def collect_sections(config): + """Run each sub-agent and collect output. + + Returns a list of section dicts: {name, key, markdown, summary, result}. Failed sub-agents are logged but don't stop the briefing. """ sections = [] @@ -25,88 +44,114 @@ def collect_sections(config): # --- Weather --- try: - from weather_agent import run as weather_run - md, summary = weather_run(location=location) - sections.append(("Weather", md, summary)) - print(f" Weather: {summary}") + out = _invoke_sub_agent("weather_agent", location=location) + sections.append({"name": "Weather", "key": "weather", **out}) + print(f" Weather: {out['summary']}") except Exception as e: print(f" Weather failed: {e}", file=sys.stderr) - sections.append(("Weather", "## Weather\n\n*Weather data unavailable.*\n", f"error: {e}")) + sections.append({"name": "Weather", "key": "weather", + "markdown": "## Weather\n\n*Weather data unavailable.*\n", + "summary": f"error: {e}", "result": None, "error": str(e)}) # --- Calendar --- calendars = config.get("calendars", []) try: - from calendar_agent import run as calendar_run - md, summary = calendar_run(calendars_config=calendars) - sections.append(("Calendar", md, summary)) - print(f" Calendar: {summary}") + out = _invoke_sub_agent("calendar_agent", calendars_config=calendars) + sections.append({"name": "Calendar", "key": "calendar", **out}) + print(f" Calendar: {out['summary']}") except Exception as e: print(f" Calendar failed: {e}", file=sys.stderr) - sections.append(("Calendar", "## Calendar\n\n*Calendar data unavailable.*\n", f"error: {e}")) + sections.append({"name": "Calendar", "key": "calendar", + "markdown": "## Calendar\n\n*Calendar data unavailable.*\n", + "summary": f"error: {e}", "result": None, "error": str(e)}) # --- Reminders (CalDAV VTODO) --- reminder_sources = config.get("reminder_sources", []) reminder_mode = config.get("reminder_mode", "due_today_3days") if reminder_sources: try: - from reminders_agent import run as reminders_run - md, summary = reminders_run(reminders_config=reminder_sources, mode=reminder_mode) - sections.append(("Reminders", md, summary)) - print(f" Reminders: {summary}") + out = _invoke_sub_agent("reminders_agent", reminders_config=reminder_sources, mode=reminder_mode) + sections.append({"name": "Reminders", "key": "reminders", **out}) + print(f" Reminders: {out['summary']}") except Exception as e: print(f" Reminders failed: {e}", file=sys.stderr) - sections.append(("Reminders", "## Reminders\n\n*Unavailable.*\n", f"error: {e}")) + sections.append({"name": "Reminders", "key": "reminders", + "markdown": "## Reminders\n\n*Unavailable.*\n", + "summary": f"error: {e}", "result": None, "error": str(e)}) # --- Notes (via Mac bridge) --- notes_config = config.get("notes", {}) if notes_config.get("enabled", False): try: - from notes_agent import run as notes_run - md, summary = notes_run(config=notes_config) - sections.append(("Notes", md, summary)) - print(f" Notes: {summary}") + out = _invoke_sub_agent("notes_agent", config=notes_config) + sections.append({"name": "Notes", "key": "notes", **out}) + print(f" Notes: {out['summary']}") except Exception as e: print(f" Notes failed: {e}", file=sys.stderr) - sections.append(("Notes", "## Recent Notes\n\n*Unavailable.*\n", f"error: {e}")) + sections.append({"name": "Notes", "key": "notes", + "markdown": "## Recent Notes\n\n*Unavailable.*\n", + "summary": f"error: {e}", "result": None, "error": str(e)}) # --- Reading List (via Mac bridge) --- reading_config = config.get("reading_list", {}) if reading_config.get("enabled", False): try: - from reading_list_agent import run as reading_list_run - md, summary = reading_list_run(config=reading_config) - sections.append(("Reading List", md, summary)) - print(f" Reading List: {summary}") + out = _invoke_sub_agent("reading_list_agent", config=reading_config) + sections.append({"name": "Reading List", "key": "reading_list", **out}) + print(f" Reading List: {out['summary']}") except Exception as e: print(f" Reading List failed: {e}", file=sys.stderr) - sections.append(("Reading List", "## Reading List\n\n*Unavailable.*\n", f"error: {e}")) + sections.append({"name": "Reading List", "key": "reading_list", + "markdown": "## Reading List\n\n*Unavailable.*\n", + "summary": f"error: {e}", "result": None, "error": str(e)}) # --- Project Monitors (LLM-powered) --- instance_id = config.get("instance_id", 0) user_id = config.get("user_id", 0) if user_id: try: - # Fetch this user's project-monitor instances that are set to include in briefing pm_instances = api_request( f"{DASHBOARD_API}/api/instances/by-user/{user_id}?catalog_id=project-monitor", retries=1, ) - project_sections = [] + project_sections_md = [] + project_results = [] for pm in pm_instances: pm_config = pm.get("config", {}) if str(pm_config.get("include_in_briefing", "false")).lower() != "true": continue try: from project_monitor import run as pm_run - md, summary = pm_run(pm_config, user_id=user_id, instance_id=pm.get("id")) - project_sections.append(md) - print(f" Project [{pm_config.get('project_name', '?')}]: {summary[:80]}") + pm_out = pm_run(pm_config, user_id=user_id, instance_id=pm.get("id")) + # project_monitor.run may return (md, summary) or {markdown, summary, result} + if isinstance(pm_out, dict): + project_sections_md.append(pm_out.get("markdown", "")) + project_results.append({ + "project_name": pm_config.get("project_name", ""), + "summary": pm_out.get("summary", ""), + "result": pm_out.get("result"), + }) + print(f" Project [{pm_config.get('project_name', '?')}]: {pm_out.get('summary','')[:80]}") + else: + md, summary = pm_out + project_sections_md.append(md) + project_results.append({ + "project_name": pm_config.get("project_name", ""), + "summary": summary, + "result": None, + }) + print(f" Project [{pm_config.get('project_name', '?')}]: {summary[:80]}") except Exception as e: print(f" Project [{pm_config.get('project_name', '?')}] failed: {e}", file=sys.stderr) - if project_sections: - combined = "## Projects\n\n" + "\n\n".join(project_sections) - sections.append(("Projects", combined, f"{len(project_sections)} project(s)")) + if project_sections_md: + combined = "## Projects\n\n" + "\n\n".join(project_sections_md) + sections.append({ + "name": "Projects", "key": "projects", + "markdown": combined, + "summary": f"{len(project_sections_md)} project(s)", + "result": {"projects": project_results}, + }) except Exception as e: print(f" Project monitors skipped: {e}", file=sys.stderr) @@ -127,8 +172,8 @@ def compose_briefing(config, sections): md += f"**{date_str}** | {loc_label}\n\n" md += "---\n\n" - for _name, section_md, _summary in sections: - md += section_md + "\n\n" + for s in sections: + md += s["markdown"] + "\n\n" md += "---\n" md += f"*Generated at {now.strftime('%I:%M %p MT')} by {person}'s Daily Briefing Agent*\n" @@ -136,6 +181,29 @@ def compose_briefing(config, sections): return md +def compose_result(config, sections, wiki_doc_id=None, wiki_action=None): + """Compose the structured result dict that API consumers read. Mirrors the markdown but + as data — each sub-agent's result is nested by key.""" + now = datetime.now(MT) + return { + "date": now.strftime("%Y-%m-%d"), + "generated_at": now.isoformat(), + "person": config.get("person", ""), + "location": config.get("location", {}), + "sections": { + s["key"]: { + "name": s["name"], + "summary": s.get("summary", ""), + "result": s.get("result"), # may be None for not-yet-retrofitted agents + "error": s.get("error"), + } + for s in sections + }, + "wiki_doc_id": wiki_doc_id, + "wiki_action": wiki_action, + } + + def post_to_wiki(config, markdown, date_str): """Post the briefing to wiki under Year/Month hierarchy.""" wiki_collection = config.get("wiki_collection_id", DEFAULT_WIKI_COLLECTION) @@ -218,13 +286,17 @@ def run(config): doc_id, action = post_to_wiki(config, markdown, date_str) print(f"Wiki doc {action}: {doc_id}") - summaries = "; ".join(f"{name}: {s}" for name, _, s in sections) + result = compose_result(config, sections, wiki_doc_id=doc_id, wiki_action=action) + summaries = "; ".join(f"{s['name']}: {s.get('summary','')}" for s in sections) output = f"Briefing {action}. {summaries}" - log_run(agent_id, "success", output=output, instance_id=instance_id, metadata={ - "wiki_doc_id": doc_id, - "action": action, - "sub_agents": [name for name, _, _ in sections], - }) + log_run(agent_id, "success", output=markdown, instance_id=instance_id, + result=result, + metadata={ + "wiki_doc_id": doc_id, + "action": action, + "sub_agents": [s["name"] for s in sections], + "summary": summaries, + }) print(f"Done: {output}") except Exception as e: diff --git a/agents/project_monitor.py b/agents/project_monitor.py index 7e97f41..3aefbbb 100644 --- a/agents/project_monitor.py +++ b/agents/project_monitor.py @@ -283,7 +283,8 @@ def run(config, user_id=None, instance_id=None): project_name = config.get("project_name", "Unknown Project") if not user_id: - return f"## {project_name}\n\n*No user context for LLM.*\n", "error: no user_id" + md = f"## {project_name}\n\n*No user context for LLM.*\n" + return {"markdown": md, "summary": "error: no user_id", "result": None} print(f" Collecting data for {project_name}...") @@ -313,16 +314,17 @@ def run(config, user_id=None, instance_id=None): print(f" Calling LLM for analysis...") try: - result = llm_complete(user_id, prompt, system=SYSTEM_PROMPT, max_tokens=2000) + llm_result = llm_complete(user_id, prompt, system=SYSTEM_PROMPT, max_tokens=2000) except RuntimeError as e: err = str(e) log_run(AGENT_ID, "failed", err=err, instance_id=instance_id) - return f"## {project_name}\n\n*LLM error: {err}*\n", f"error: {err}" + md = f"## {project_name}\n\n*LLM error: {err}*\n" + return {"markdown": md, "summary": f"error: {err}", "result": None} - report_md = result["text"] - model = result["model"] - tokens_in = result["input_tokens"] - tokens_out = result["output_tokens"] + report_md = llm_result["text"] + model = llm_result["model"] + tokens_in = llm_result["input_tokens"] + tokens_out = llm_result["output_tokens"] print(f" LLM: {model}, {tokens_in}+{tokens_out} tokens") @@ -372,15 +374,31 @@ def run(config, user_id=None, instance_id=None): if links_md: section += f"\n{links_md}\n" - log_run(AGENT_ID, "success", output=f"{project_name}: {summary[:100]}", instance_id=instance_id, metadata={ - "project": project_name, - "model": model, - "tokens_in": tokens_in, - "tokens_out": tokens_out, + structured = { + "project_name": project_name, + "app_url": app_url or None, + "wiki_collection_id": wiki_collection or None, "wiki_report_id": doc_id, - }) + "gitea_repo": gitea_repo or None, + "summary": summary, + "report_markdown": report_md, + "model": model, + "tokens": {"input": tokens_in, "output": tokens_out}, + "generated_at": datetime.now(MT).isoformat(), + } - return section, summary + log_run(AGENT_ID, "success", output=f"{project_name}: {summary[:100]}", + instance_id=instance_id, + result=structured, + metadata={ + "project": project_name, + "model": model, + "tokens_in": tokens_in, + "tokens_out": tokens_out, + "wiki_report_id": doc_id, + }) + + return {"markdown": section, "summary": summary, "result": structured} if __name__ == "__main__": @@ -398,6 +416,6 @@ if __name__ == "__main__": "wiki_collection_id": args.wiki_collection, "gitea_repo": args.gitea_repo, } - section, summary = run(config, user_id=args.user_id, instance_id=args.instance_id) - print(section) - print(f"\nSummary: {summary}") + out = run(config, user_id=args.user_id, instance_id=args.instance_id) + print(out["markdown"]) + print(f"\nSummary: {out['summary']}") diff --git a/agents/shared.py b/agents/shared.py index dfd9251..ffceceb 100644 --- a/agents/shared.py +++ b/agents/shared.py @@ -61,22 +61,57 @@ def api_request(url, data=None, headers=None, method="GET", retries=DEFAULT_RETR raise last_error -def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None): - """Log a run to the dashboard API.""" +def log_run(agent_id, status, output="", err="", metadata=None, instance_id=None, result=None, run_id=None): + """Log a run to the dashboard API. + + Resolution rules: + * `instance_id`: explicit wins; else env INSTANCE_ID; else skip (no run logged). + * `run_id`: explicit wins; else env RUN_ID — but ONLY when the target instance matches + env INSTANCE_ID (the subprocess was triggered for THIS instance). A sub-agent called + inside a briefing (e.g. project_monitor with its own instance_id) does NOT inherit + the briefing's RUN_ID and will create a fresh run row instead. + + Pass `result` (dict) to populate the structured-output column that API consumers read. + """ try: - if instance_id: - api_request( - f"{DASHBOARD_API}/api/instances/{instance_id}/runs", - data={"status": status, "output": output, "error": err, "metadata": metadata or {}}, - method="POST", - retries=1, # Don't retry logging too aggressively - ) - else: + env_inst = int(os.environ["INSTANCE_ID"]) if os.environ.get("INSTANCE_ID") else None + env_run = int(os.environ["RUN_ID"]) if os.environ.get("RUN_ID") else None + + target_instance_id = instance_id if instance_id is not None else env_inst + if target_instance_id is None: print(f"Warning: no instance_id, run not logged for {agent_id}", file=sys.stderr) + return + + target_run_id = run_id + if target_run_id is None and env_run is not None and env_inst == target_instance_id: + target_run_id = env_run + + payload = { + "status": status, + "output": output, + "error": err, + "metadata": metadata or {}, + } + if result is not None: + payload["result"] = result + if target_run_id: + payload["run_id"] = target_run_id + + api_request( + f"{DASHBOARD_API}/api/instances/{target_instance_id}/runs", + data=payload, + method="POST", + retries=1, # Don't retry logging too aggressively + ) except Exception as e: print(f"Warning: failed to log run to dashboard: {e}", file=sys.stderr) +def get_instance_config(instance_id): + """Fetch an instance's config from the dashboard API. Used by agents run from /trigger.""" + return api_request(f"{DASHBOARD_API}/api/instances/{instance_id}/config") + + def wiki_headers(): return {"Authorization": f"Bearer {WIKI_TOKEN}"} diff --git a/agents/weather_agent.py b/agents/weather_agent.py index 384f5ee..c45d996 100644 --- a/agents/weather_agent.py +++ b/agents/weather_agent.py @@ -5,9 +5,10 @@ Fetches weather for a configurable location and returns structured data + markdo Called by Daily Briefing agents with location config. Can also run standalone. """ +import os import sys from datetime import datetime -from shared import MT, api_request, log_run +from shared import MT, api_request, log_run, get_instance_config AGENT_ID = "weather" @@ -116,18 +117,106 @@ def format_section(weather, location=None): return md, summary -def run(location=None): - """Run the weather agent. Returns (markdown_section, summary) or raises.""" +def build_result(weather, location): + """Extract the structured 'result' dict. This is what API consumers (Synap, WSIT) read.""" + loc = location or DEFAULT_LOCATION + current = weather["current"] + daily = weather["daily"] + condition = WMO_CODES.get(current["weather_code"], "Unknown") + + forecast = [] + for i in range(len(daily["time"])): + forecast.append({ + "date": daily["time"][i], + "weekday": DAY_NAMES[datetime.strptime(daily["time"][i], "%Y-%m-%d").weekday()], + "condition": WMO_CODES.get(daily["weather_code"][i], "Unknown"), + "weather_code": daily["weather_code"][i], + "temp_high_f": round(daily["temperature_2m_max"][i]), + "temp_low_f": round(daily["temperature_2m_min"][i]), + "precip_in": daily["precipitation_sum"][i], + "wind_max_mph": round(daily["wind_speed_10m_max"][i]), + "sunrise": daily["sunrise"][i], + "sunset": daily["sunset"][i], + }) + + return { + "location": { + "name": loc.get("name", ""), + "state": loc.get("state", ""), + "country": loc.get("country", ""), + "lat": loc["lat"], + "lon": loc["lon"], + "label": location_label(loc), + }, + "current": { + "condition": condition, + "weather_code": current["weather_code"], + "temperature_f": round(current["temperature_2m"]), + "feels_like_f": round(current["apparent_temperature"]), + "wind_mph": round(current["wind_speed_10m"]), + "humidity_pct": current["relative_humidity_2m"], + }, + "forecast": forecast, + "fetched_at": datetime.now(MT).isoformat(), + } + + +def run_structured(location=None): + """Fetch weather and return {result, markdown, summary}. This is the new contract — + structured data for API consumers, markdown for the wiki.""" loc = location or DEFAULT_LOCATION weather = fetch_weather(loc) section, summary = format_section(weather, loc) - log_run(AGENT_ID, "success", output=summary, metadata={"location": location_label(loc)}) - return section, summary + result = build_result(weather, loc) + return {"result": result, "markdown": section, "summary": summary} + + +def run(location=None): + """Legacy entrypoint: returns (markdown_section, summary). Kept for backward compat + with daily_briefing.py's older call sites. Prefer run_structured(). + Deliberately does NOT call log_run — logging is the caller's responsibility. + """ + out = run_structured(location) + return out["markdown"], out["summary"] + + +def _location_from_config(cfg): + """Map an instance config dict to a location dict. Config keys: name, state, country, lat, lon.""" + if not cfg or "lat" not in cfg or "lon" not in cfg: + return DEFAULT_LOCATION + return { + "name": cfg.get("name", "Custom"), + "state": cfg.get("state", ""), + "country": cfg.get("country", "US"), + "lat": cfg["lat"], + "lon": cfg["lon"], + } + + +def _main_from_api(): + """Entry point when invoked via /api/instances/{id}/trigger. Reads INSTANCE_ID + RUN_ID + from env, fetches config, runs, and posts structured result back to the dashboard.""" + instance_id = int(os.environ["INSTANCE_ID"]) + try: + cfg = get_instance_config(instance_id) + loc = _location_from_config(cfg) + out = run_structured(loc) + log_run(AGENT_ID, "success", output=out["markdown"], + result=out["result"], + metadata={"location": out["result"]["location"]["label"], "summary": out["summary"]}) + print(out["summary"]) + except Exception as e: + err_msg = f"{type(e).__name__}: {e}" + print(f"Error: {err_msg}", file=sys.stderr) + log_run(AGENT_ID, "failed", err=err_msg) + sys.exit(1) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Weather Agent") + parser.add_argument("--from-api", action="store_true", + help="Run from /trigger: read INSTANCE_ID/RUN_ID from env, fetch config, post result") parser.add_argument("--lat", type=float, help="Latitude") parser.add_argument("--lon", type=float, help="Longitude") parser.add_argument("--name", type=str, help="City/location name") @@ -135,6 +224,10 @@ if __name__ == "__main__": parser.add_argument("--country", type=str, default="US", help="Country code") args = parser.parse_args() + if args.from_api: + _main_from_api() + sys.exit(0) + loc = DEFAULT_LOCATION if args.lat and args.lon: loc = { @@ -146,9 +239,9 @@ if __name__ == "__main__": } try: - section, summary = run(loc) - print(section) - print(f"\nSummary: {summary}") + out = run_structured(loc) + print(out["markdown"]) + print(f"\nSummary: {out['summary']}") except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"Error: {err_msg}", file=sys.stderr) diff --git a/dashboard/app.py b/dashboard/app.py index b3938e9..3fa9f76 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, Depends, HTTPException, Response, Cookie +from fastapi import FastAPI, Depends, HTTPException, Response, Cookie, Header from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, RedirectResponse from sqlalchemy.orm import Session @@ -13,8 +13,8 @@ import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart -from database import get_db, init_db -from models import User, AgentCatalog, AgentInstance, Run, LLMProvider, Bridge, RouteLog +from database import get_db, init_db, SessionLocal +from models import User, AgentCatalog, AgentInstance, Run, LLMProvider, Bridge, RouteLog, APIClient, APIClientScope, APIClientCall app = FastAPI(title="Agent Command Center", version="2026.04.12.01") @@ -70,6 +70,94 @@ def require_admin(session: Optional[str] = Cookie(None)) -> dict: return user +# --- API Client auth (for external apps like Synap) --- + +def _hash_api_token(token: str) -> str: + return hashlib.sha256(token.encode()).hexdigest() + + +def _load_api_client(authorization: Optional[str], db: Session) -> Optional[APIClient]: + """Resolve an API client from a Bearer token. Returns None if absent/invalid/revoked.""" + if not authorization or not authorization.lower().startswith("bearer "): + return None + token = authorization[7:].strip() + if not token: + return None + th = _hash_api_token(token) + client = db.query(APIClient).filter( + APIClient.token_hash == th, + APIClient.revoked_at.is_(None), + ).first() + if client: + client.last_used_at = datetime.now(timezone.utc) + db.commit() + return client + + +def require_user_or_api( + session: Optional[str] = Cookie(None), + authorization: Optional[str] = Header(None), + db: Session = Depends(get_db), +) -> dict: + """Accept either a logged-in user session OR a Bearer API token. + Returns a dict describing the caller: + {"kind": "user", "user_id": int, "username": str, "role": str, "allowed_instance_ids": None} + {"kind": "api", "api_client_id": int, "api_client_name": str, "allowed_instance_ids": set[int]} + """ + u = get_current_user(session) + if u: + return { + "kind": "user", + "user_id": u["user_id"], + "username": u["username"], + "role": u["role"], + "allowed_instance_ids": None, # user auth is scoped by instance ownership + } + client = _load_api_client(authorization, db) + if client: + allowed = {s.instance_id for s in client.scopes} + return { + "kind": "api", + "api_client_id": client.id, + "api_client_name": client.name, + "allowed_instance_ids": allowed, + } + raise HTTPException(status_code=401, detail="Not authenticated") + + +def caller_can_access_instance(caller: dict, inst: AgentInstance) -> bool: + """Authorization check: does this caller have rights to read/trigger this instance?""" + if caller["kind"] == "user": + # Admins can access any instance; users only their own + return caller["role"] == "admin" or inst.user_id == caller["user_id"] + # API client: must have instance in scope + return inst.id in caller["allowed_instance_ids"] + + +def caller_label(caller: dict) -> str: + """Short identifier for logging: 'user:eric' or 'api_client:synap'.""" + if caller["kind"] == "user": + return f"user:{caller['username']}" + return f"api_client:{caller['api_client_name']}" + + +def log_api_client_call(db: Session, caller: dict, endpoint: str, instance_id: Optional[int], status_code: int): + """Record an audit entry for API client calls (no-op for user sessions).""" + if caller["kind"] != "api": + return + try: + entry = APIClientCall( + api_client_id=caller["api_client_id"], + instance_id=instance_id, + endpoint=endpoint, + status_code=status_code, + ) + db.add(entry) + db.commit() + except Exception: + db.rollback() + + # --- Schemas --- class LoginRequest(BaseModel): @@ -96,6 +184,8 @@ class RunCreate(BaseModel): output: str = "" error: str = "" metadata: dict = {} + result: Optional[dict] = None # structured data for API consumers + run_id: Optional[int] = None # if set, update this run instead of creating a new one class UserCreate(BaseModel): username: str @@ -126,6 +216,16 @@ class LLMProviderUpdate(BaseModel): default_model: Optional[str] = None is_default: Optional[bool] = None +class APIClientCreate(BaseModel): + name: str + description: str = "" + instance_ids: list[int] = [] # which instances this client can trigger/read + +class APIClientUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + instance_ids: Optional[list[int]] = None # replaces existing scopes if provided + # --- Auth Routes --- @@ -329,6 +429,7 @@ def list_catalog(user: dict = Depends(require_auth), db: Session = Depends(get_d "default_config": e.default_config or {}, "supports_schedule": e.supports_schedule, "is_sub_agent": e.is_sub_agent, "requires_llm": e.requires_llm, + "result_schema": e.result_schema or {}, "enabled": e.id in user_instance_ids, } for e in entries] @@ -343,6 +444,8 @@ def get_catalog_entry(catalog_id: str, user: dict = Depends(require_auth), db: S "category": entry.category, "config_schema": entry.config_schema or {}, "default_config": entry.default_config or {}, "supports_schedule": entry.supports_schedule, "is_sub_agent": entry.is_sub_agent, + "requires_llm": entry.requires_llm, + "result_schema": entry.result_schema or {}, } @@ -462,54 +565,108 @@ def delete_instance(instance_id: int, user: dict = Depends(require_auth), db: Se return {"status": "deleted"} +SUB_AGENT_SCRIPTS = { + "weather": "weather_agent.py", + "calendar": "calendar_agent.py", + "reminders": "reminders_agent.py", + "notes": "notes_agent.py", + "reading-list": "reading_list_agent.py", +} + + @app.post("/api/instances/{instance_id}/trigger") -def trigger_instance(instance_id: int, user: dict = Depends(require_auth), db: Session = Depends(get_db)): - """Trigger a manual run of an agent instance. Runs async via subprocess.""" - inst = db.query(AgentInstance).filter( - AgentInstance.id == instance_id, AgentInstance.user_id == user["user_id"] - ).first() +def trigger_instance( + instance_id: int, + caller: dict = Depends(require_user_or_api), + db: Session = Depends(get_db), +): + """Trigger a manual run of an agent instance. + Accepts either a user session cookie or a Bearer app token (scoped to this instance). + Returns immediately with a run_id to poll + the last successful run's cached result.""" + inst = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first() if not inst: raise HTTPException(status_code=404) + if not caller_can_access_instance(caller, inst): + log_api_client_call(db, caller, f"POST /api/instances/{instance_id}/trigger", instance_id, 403) + raise HTTPException(status_code=403, detail="Not authorized for this instance") - # Determine which script to run based on catalog type and user catalog_id = inst.catalog_id - u = db.query(User).filter(User.id == user["user_id"]).first() + u = db.query(User).filter(User.id == inst.user_id).first() + + # Look up last successful run — we return its cached result immediately + last = db.query(Run).filter( + Run.instance_id == instance_id, + Run.status == "success", + ).order_by(Run.started_at.desc()).first() + last_result = last.result if last else None + last_run_at = last.started_at.isoformat() if last and last.started_at else None + + # Create a placeholder run row so the caller gets an id to poll + new_run = Run( + instance_id=instance_id, + user_id=inst.user_id, + status="queued", + triggered_by=caller_label(caller), + ) + db.add(new_run) + db.commit() + db.refresh(new_run) import subprocess agent_dir = "/app/agents" - env = {**dict(os.environ), "PYTHONPATH": agent_dir} + env = { + **dict(os.environ), + "PYTHONPATH": agent_dir, + "RUN_ID": str(new_run.id), + "INSTANCE_ID": str(instance_id), + } + cmd = None if catalog_id == "daily-briefing": script_map = {"eric": "eric_briefing.py", "angela": "angela_briefing.py"} - script = script_map.get(u.username, None) + script = script_map.get(u.username) env_key = f"{u.username.upper().replace('.', '_')}_INSTANCE_ID" env[env_key] = str(instance_id) if script: cmd = ["python3", f"{agent_dir}/{script}"] else: - # Generic: run the engine directly with instance config cmd = ["python3", "-c", f"import sys; sys.path.insert(0, '{agent_dir}'); " f"from daily_briefing import run; " f"run({{'person': '{u.display_name}', 'agent_id': '{catalog_id}', " f"'instance_id': {instance_id}, 'wiki_parent_doc_id': '', 'location': {{}}}})"] - subprocess.Popen(cmd, env=env, cwd=agent_dir) - return {"status": "triggered", "message": f"Running {catalog_id} for {u.display_name}"} - - if catalog_id == "project-monitor": - # Write config to a temp file to avoid shell escaping issues + elif catalog_id == "project-monitor": config_path = f"/tmp/pm_config_{instance_id}.json" with open(config_path, "w") as f: - json.dump({"config": inst.config or {}, "user_id": user["user_id"], "instance_id": instance_id}, f) + json.dump({"config": inst.config or {}, "user_id": inst.user_id, "instance_id": instance_id}, f) cmd = ["python3", "-c", f"import sys, json; sys.path.insert(0, '{agent_dir}'); " f"d = json.load(open('{config_path}')); " f"from project_monitor import run; " f"run(d['config'], user_id=d['user_id'], instance_id=d['instance_id'])"] - subprocess.Popen(cmd, env=env, cwd=agent_dir) - return {"status": "triggered", "message": f"Running project monitor: {inst.name}"} + elif catalog_id in SUB_AGENT_SCRIPTS: + cmd = ["python3", f"{agent_dir}/{SUB_AGENT_SCRIPTS[catalog_id]}", "--from-api"] - return {"status": "error", "message": f"Manual trigger not yet supported for {catalog_id}"} + if not cmd: + new_run.status = "failed" + new_run.error = f"No runner configured for catalog_id={catalog_id}" + new_run.finished_at = datetime.now(timezone.utc) + db.commit() + log_api_client_call(db, caller, f"POST /api/instances/{instance_id}/trigger", instance_id, 400) + raise HTTPException(status_code=400, detail=f"Manual trigger not supported for {catalog_id}") + + subprocess.Popen(cmd, env=env, cwd=agent_dir) + log_api_client_call(db, caller, f"POST /api/instances/{instance_id}/trigger", instance_id, 200) + + return { + "run_id": new_run.id, + "status": "queued", + "instance_id": instance_id, + "catalog_id": catalog_id, + "triggered_by": new_run.triggered_by, + "last_result": last_result, + "last_run_at": last_run_at, + } # --- Internal endpoints (no auth, for agent scripts) --- @@ -533,9 +690,31 @@ def get_instance_config(instance_id: int, db: Session = Depends(get_db)): @app.post("/api/instances/{instance_id}/runs") def create_run(instance_id: int, run: RunCreate, db: Session = Depends(get_db)): + """Internal endpoint: agents POST here when they start and finish. + If run.run_id is set, update that existing row (used when /trigger pre-created a run). + Otherwise create a new row (used by cron-launched agents that weren't pre-triggered).""" inst = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first() if not inst: raise HTTPException(status_code=404) + + if run.run_id: + existing = db.query(Run).filter(Run.id == run.run_id, Run.instance_id == instance_id).first() + if not existing: + raise HTTPException(status_code=404, detail=f"Run {run.run_id} not found for this instance") + existing.status = run.status + if run.output: + existing.output = run.output + if run.error: + existing.error = run.error + if run.metadata: + existing.metadata_ = run.metadata + if run.result is not None: + existing.result = run.result + if run.status in ("success", "failed"): + existing.finished_at = datetime.now(timezone.utc) + db.commit() + return {"id": existing.id, "status": existing.status} + new_run = Run( instance_id=instance_id, user_id=inst.user_id, @@ -543,6 +722,7 @@ def create_run(instance_id: int, run: RunCreate, db: Session = Depends(get_db)): output=run.output, error=run.error, metadata_=run.metadata, + result=run.result, ) if run.status in ("success", "failed"): new_run.finished_at = datetime.now(timezone.utc) @@ -551,17 +731,46 @@ def create_run(instance_id: int, run: RunCreate, db: Session = Depends(get_db)): return {"id": new_run.id, "status": new_run.status} -# --- Runs (user-scoped) --- +def _serialize_run(r: Run) -> dict: + return { + "id": r.id, + "instance_id": r.instance_id, + "status": r.status, + "result": r.result, + "output": r.output, + "error": r.error, + "started_at": r.started_at.isoformat() if r.started_at else None, + "finished_at": r.finished_at.isoformat() if r.finished_at else None, + "triggered_by": getattr(r, "triggered_by", "") or "", + "metadata": r.metadata_, + } + + +# --- Runs (user or api-client scoped) --- @app.get("/api/runs") def list_runs(limit: int = 50, user: dict = Depends(require_auth), db: Session = Depends(get_db)): runs = db.query(Run).filter(Run.user_id == user["user_id"]).order_by(Run.started_at.desc()).limit(limit).all() - return [{ - "id": r.id, "instance_id": r.instance_id, - "started_at": r.started_at.isoformat() if r.started_at else None, - "finished_at": r.finished_at.isoformat() if r.finished_at else None, - "status": r.status, "output": r.output, "error": r.error, "metadata": r.metadata_, - } for r in runs] + return [_serialize_run(r) for r in runs] + + +@app.get("/api/runs/{run_id}") +def get_run( + run_id: int, + caller: dict = Depends(require_user_or_api), + db: Session = Depends(get_db), +): + """Fetch a single run. Used by callers polling a triggered run until it finishes. + Accepts user session or Bearer app token (scoped to the run's instance).""" + run = db.query(Run).filter(Run.id == run_id).first() + if not run: + raise HTTPException(status_code=404) + inst = db.query(AgentInstance).filter(AgentInstance.id == run.instance_id).first() + if not inst or not caller_can_access_instance(caller, inst): + log_api_client_call(db, caller, f"GET /api/runs/{run_id}", run.instance_id, 403) + raise HTTPException(status_code=403, detail="Not authorized for this run") + log_api_client_call(db, caller, f"GET /api/runs/{run_id}", run.instance_id, 200) + return _serialize_run(run) # --- Admin: Users --- @@ -671,6 +880,146 @@ def admin_delete_provider(provider_id: int, admin: dict = Depends(require_admin) return {"status": "deleted"} +# --- Admin: API Clients (external app tokens) --- + +def _serialize_api_client(client: APIClient, include_token: Optional[str] = None) -> dict: + return { + "id": client.id, + "name": client.name, + "description": client.description or "", + "token_prefix": client.token_prefix or "", + "token": include_token, # only set once, on creation + "instance_ids": [s.instance_id for s in client.scopes], + "created_at": client.created_at.isoformat() if client.created_at else None, + "last_used_at": client.last_used_at.isoformat() if client.last_used_at else None, + "revoked_at": client.revoked_at.isoformat() if client.revoked_at else None, + "revoked": client.revoked_at is not None, + } + + +@app.get("/api/admin/instances") +def admin_list_all_instances(admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + """Admin-only: list every instance across all users. Used by the API Clients UI to + pick which instances a token can access.""" + rows = db.query(AgentInstance, User).join(User, AgentInstance.user_id == User.id)\ + .order_by(User.username, AgentInstance.catalog_id).all() + return [{ + "id": inst.id, + "name": inst.name, + "catalog_id": inst.catalog_id, + "status": inst.status, + "user_id": u.id, + "username": u.username, + "display_name": u.display_name, + } for inst, u in rows] + + +@app.get("/api/admin/api-clients") +def admin_list_api_clients(admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + clients = db.query(APIClient).order_by(APIClient.created_at.desc()).all() + return [_serialize_api_client(c) for c in clients] + + +@app.post("/api/admin/api-clients") +def admin_create_api_client(data: APIClientCreate, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + if db.query(APIClient).filter(APIClient.name == data.name).first(): + raise HTTPException(status_code=409, detail="A client with that name already exists") + # Verify every instance_id exists + for iid in data.instance_ids: + if not db.query(AgentInstance).filter(AgentInstance.id == iid).first(): + raise HTTPException(status_code=400, detail=f"Instance {iid} does not exist") + # Generate a token: 'acc_' + 40 hex chars (no ambiguity with OAuth/other schemes) + plaintext = "acc_" + secrets.token_hex(20) + client = APIClient( + name=data.name, + description=data.description, + token_hash=_hash_api_token(plaintext), + token_prefix=plaintext[:12], # "acc_" + 8 hex + ) + db.add(client) + db.flush() + for iid in data.instance_ids: + db.add(APIClientScope(api_client_id=client.id, instance_id=iid)) + db.commit() + db.refresh(client) + return _serialize_api_client(client, include_token=plaintext) + + +@app.put("/api/admin/api-clients/{client_id}") +def admin_update_api_client(client_id: int, update: APIClientUpdate, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + client = db.query(APIClient).filter(APIClient.id == client_id).first() + if not client: + raise HTTPException(status_code=404) + if update.name is not None: + dup = db.query(APIClient).filter(APIClient.name == update.name, APIClient.id != client_id).first() + if dup: + raise HTTPException(status_code=409, detail="A client with that name already exists") + client.name = update.name + if update.description is not None: + client.description = update.description + if update.instance_ids is not None: + for iid in update.instance_ids: + if not db.query(AgentInstance).filter(AgentInstance.id == iid).first(): + raise HTTPException(status_code=400, detail=f"Instance {iid} does not exist") + # Replace scope set + db.query(APIClientScope).filter(APIClientScope.api_client_id == client_id).delete() + for iid in update.instance_ids: + db.add(APIClientScope(api_client_id=client_id, instance_id=iid)) + db.commit() + db.refresh(client) + return _serialize_api_client(client) + + +@app.post("/api/admin/api-clients/{client_id}/revoke") +def admin_revoke_api_client(client_id: int, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + client = db.query(APIClient).filter(APIClient.id == client_id).first() + if not client: + raise HTTPException(status_code=404) + client.revoked_at = datetime.now(timezone.utc) + db.commit() + return {"id": client.id, "status": "revoked"} + + +@app.post("/api/admin/api-clients/{client_id}/rotate") +def admin_rotate_api_client(client_id: int, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + """Issue a new token for an existing client (invalidates the old one).""" + client = db.query(APIClient).filter(APIClient.id == client_id).first() + if not client: + raise HTTPException(status_code=404) + plaintext = "acc_" + secrets.token_hex(20) + client.token_hash = _hash_api_token(plaintext) + client.token_prefix = plaintext[:12] + client.revoked_at = None # un-revoke if it was + db.commit() + db.refresh(client) + return _serialize_api_client(client, include_token=plaintext) + + +@app.delete("/api/admin/api-clients/{client_id}") +def admin_delete_api_client(client_id: int, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + client = db.query(APIClient).filter(APIClient.id == client_id).first() + if not client: + raise HTTPException(status_code=404) + db.delete(client) + db.commit() + return {"status": "deleted"} + + +@app.get("/api/admin/api-clients/{client_id}/calls") +def admin_api_client_calls(client_id: int, limit: int = 50, admin: dict = Depends(require_admin), db: Session = Depends(get_db)): + if not db.query(APIClient).filter(APIClient.id == client_id).first(): + raise HTTPException(status_code=404) + calls = db.query(APIClientCall).filter(APIClientCall.api_client_id == client_id)\ + .order_by(APIClientCall.called_at.desc()).limit(limit).all() + return [{ + "id": c.id, + "endpoint": c.endpoint, + "instance_id": c.instance_id, + "status_code": c.status_code, + "called_at": c.called_at.isoformat() if c.called_at else None, + } for c in calls] + + # --- Admin: Catalog Management --- class CatalogCreate(BaseModel): @@ -682,6 +1031,8 @@ class CatalogCreate(BaseModel): default_config: dict = {} supports_schedule: bool = True is_sub_agent: bool = False + requires_llm: bool = False + result_schema: dict = {} class CatalogUpdate(BaseModel): name: Optional[str] = None @@ -691,6 +1042,8 @@ class CatalogUpdate(BaseModel): default_config: Optional[dict] = None supports_schedule: Optional[bool] = None is_sub_agent: Optional[bool] = None + requires_llm: Optional[bool] = None + result_schema: Optional[dict] = None @app.post("/api/admin/catalog") @@ -1142,8 +1495,88 @@ def root(session: Optional[str] = Cookie(None)): return FileResponse("static/index.html") +# --- Result schemas (what each agent's structured result looks like) --- + +RESULT_SCHEMAS = { + "weather": { + "description": "Current conditions + 7-day forecast for a configured location.", + "shape": { + "location": { + "name": "string", "state": "string", "country": "string", + "lat": "number", "lon": "number", "label": "string", + }, + "current": { + "condition": "string", "weather_code": "int", + "temperature_f": "int", "feels_like_f": "int", + "wind_mph": "int", "humidity_pct": "int", + }, + "forecast": "array of 7 day objects: {date, weekday, condition, weather_code, temp_high_f, temp_low_f, precip_in, wind_max_mph, sunrise, sunset}", + "fetched_at": "ISO datetime string (Mountain Time)", + }, + }, + "calendar": { + "description": "Upcoming calendar events across configured sources. Structured result not yet populated — markdown only.", + "shape": {"note": "result_schema not yet implemented for this agent"}, + }, + "reminders": { + "description": "Pending/overdue reminders from configured CalDAV sources. Structured result not yet populated — markdown only.", + "shape": {"note": "result_schema not yet implemented for this agent"}, + }, + "notes": { + "description": "Recent Apple Notes via Mac bridge. Structured result not yet populated — markdown only.", + "shape": {"note": "result_schema not yet implemented for this agent"}, + }, + "reading-list": { + "description": "Safari Reading List items via Mac bridge. Structured result not yet populated — markdown only.", + "shape": {"note": "result_schema not yet implemented for this agent"}, + }, + "daily-briefing": { + "description": "Aggregated daily briefing: wiki doc posted + each sub-agent's result nested under sections.", + "shape": { + "date": "YYYY-MM-DD", + "generated_at": "ISO datetime string", + "person": "string", + "location": "object (same shape as weather.location)", + "sections": "object keyed by sub-agent (weather|calendar|reminders|notes|reading_list|projects), each value is {name, summary, result, error?}", + "wiki_doc_id": "string (Outline doc id) or null", + "wiki_action": "'created' | 'updated' | null", + }, + }, + "project-monitor": { + "description": "LLM-generated status report for a tracked project.", + "shape": { + "project_name": "string", + "app_url": "string or null", + "wiki_collection_id": "string or null", + "wiki_report_id": "string or null (Outline doc id of the full report)", + "gitea_repo": "string or null", + "summary": "string (first paragraph of the report)", + "report_markdown": "string (full LLM-generated report)", + "model": "string (LLM model used)", + "tokens": {"input": "int", "output": "int"}, + "generated_at": "ISO datetime string", + }, + }, +} + + +def _seed_result_schemas(db: Session): + """Populate agent_catalog.result_schema for known agents. Idempotent — only fills empty.""" + for catalog_id, schema in RESULT_SCHEMAS.items(): + entry = db.query(AgentCatalog).filter(AgentCatalog.id == catalog_id).first() + if entry and not entry.result_schema: + entry.result_schema = schema + db.commit() + + # --- Startup --- @app.on_event("startup") def startup(): init_db() + # Seed result schemas for catalog entries that don't have them yet + db = SessionLocal() + try: + _seed_result_schemas(db) + finally: + db.close() diff --git a/dashboard/database.py b/dashboard/database.py index b82b630..75fbedc 100644 --- a/dashboard/database.py +++ b/dashboard/database.py @@ -1,4 +1,4 @@ -from sqlalchemy import create_engine +from sqlalchemy import create_engine, inspect, text from sqlalchemy.orm import sessionmaker, DeclarativeBase import os @@ -19,5 +19,19 @@ def get_db(): db.close() +def _ensure_column(conn, table: str, column: str, ddl: str): + """Add a column to an existing table if it doesn't exist (SQLite idempotent migration).""" + insp = inspect(conn) + cols = {c["name"] for c in insp.get_columns(table)} + if column not in cols: + conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}")) + print(f" migration: added {table}.{column}") + + def init_db(): Base.metadata.create_all(bind=engine) + with engine.begin() as conn: + # Additive columns on existing tables (safe to re-run) + _ensure_column(conn, "runs", "result", "JSON") + _ensure_column(conn, "runs", "triggered_by", "VARCHAR DEFAULT ''") + _ensure_column(conn, "agent_catalog", "result_schema", "JSON") diff --git a/dashboard/models.py b/dashboard/models.py index 8912773..30e8f4a 100644 --- a/dashboard/models.py +++ b/dashboard/models.py @@ -31,6 +31,7 @@ class AgentCatalog(Base): supports_schedule = Column(Boolean, default=True) is_sub_agent = Column(Boolean, default=False) requires_llm = Column(Boolean, default=False) + result_schema = Column(JSON, default=dict) # shape of the agent's structured result instances = relationship("AgentInstance", back_populates="catalog_entry") @@ -61,9 +62,11 @@ class Run(Base): started_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) finished_at = Column(DateTime, nullable=True) status = Column(String, default="running") - output = Column(Text, default="") + output = Column(Text, default="") # markdown rendering (for wiki) + result = Column(JSON, nullable=True) # structured data for API consumers error = Column(Text, default="") metadata_ = Column("metadata", JSON, default=dict) + triggered_by = Column(String, default="") # "user:eric", "api_client:synap", "cron" instance = relationship("AgentInstance", back_populates="runs") @@ -109,3 +112,43 @@ class LLMProvider(Base): api_key = Column(String, default="") default_model = Column(String, default="") is_default = Column(Boolean, default=False) + + +class APIClient(Base): + """App-level API client. Each external app (Synap, WSIT, etc.) gets one. + Scoped to specific agent instances — see APIClientScope.""" + __tablename__ = "api_clients" + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String, nullable=False, unique=True) # "Synap", "WSIT" + token_hash = Column(String, nullable=False, unique=True) # SHA-256 of the plaintext token + token_prefix = Column(String, default="") # first 8 chars of token, shown in admin UI + description = Column(Text, default="") + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + last_used_at = Column(DateTime, nullable=True) + revoked_at = Column(DateTime, nullable=True) + + scopes = relationship("APIClientScope", back_populates="client", cascade="all, delete-orphan") + + +class APIClientScope(Base): + """Join table: which instances can an API client trigger/read?""" + __tablename__ = "api_client_scopes" + + id = Column(Integer, primary_key=True, autoincrement=True) + api_client_id = Column(Integer, ForeignKey("api_clients.id", ondelete="CASCADE"), nullable=False) + instance_id = Column(Integer, ForeignKey("agent_instances.id", ondelete="CASCADE"), nullable=False) + + client = relationship("APIClient", back_populates="scopes") + + +class APIClientCall(Base): + """Audit log for every authenticated API call by an API client.""" + __tablename__ = "api_client_calls" + + id = Column(Integer, primary_key=True, autoincrement=True) + api_client_id = Column(Integer, ForeignKey("api_clients.id", ondelete="CASCADE"), nullable=False) + instance_id = Column(Integer, nullable=True) # may be null for non-instance endpoints + endpoint = Column(String, default="") # e.g. "POST /api/instances/2/trigger" + status_code = Column(Integer, default=0) + called_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) diff --git a/dashboard/static/admin.html b/dashboard/static/admin.html index ec8c17d..23690a3 100644 --- a/dashboard/static/admin.html +++ b/dashboard/static/admin.html @@ -65,6 +65,7 @@ tr:hover td{background:var(--surface2)}
| User | Hostname | URL | Platform | Status | Last Heartbeat | Capabilities |
|---|
+ Issues a bearer token that an external app (Synap, WSIT, etc.) can use to trigger agent instances and read results. + Tokens are scoped to specific instances — see the + token scoping doc. +
+| Name | Prefix | Scopes | Created | Last Used | Status | Actions |
|---|
${c.token_prefix||'-'}…