Files
ai-agents/agents/daily_briefing.py
T

235 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""
Daily Briefing Engine (Reusable)
Collects sub-agent data, composes a markdown briefing, and posts to wiki.
Called by person-specific wrappers (eric_briefing.py, angela_briefing.py, etc.)
with a config dict specifying location, wiki parent, and agent ID.
"""
import sys
from datetime import datetime
from shared import (
MT, DASHBOARD_API, WIKI_API, WIKI_COLLECTION_ID as DEFAULT_WIKI_COLLECTION, MONTH_NAMES,
api_request, log_run, wiki_headers, find_child_doc, ensure_child_doc,
)
def collect_sections(config):
"""Run each sub-agent and collect markdown sections.
Returns a list of (section_name, markdown, summary) tuples.
Failed sub-agents are logged but don't stop the briefing.
"""
sections = []
location = config.get("location")
# --- Weather ---
try:
from weather_agent import run as weather_run
md, summary = weather_run(location=location)
sections.append(("Weather", md, summary))
print(f" Weather: {summary}")
except Exception as e:
print(f" Weather failed: {e}", file=sys.stderr)
sections.append(("Weather", "## Weather\n\n*Weather data unavailable.*\n", f"error: {e}"))
# --- Calendar ---
calendars = config.get("calendars", [])
try:
from calendar_agent import run as calendar_run
md, summary = calendar_run(calendars_config=calendars)
sections.append(("Calendar", md, summary))
print(f" Calendar: {summary}")
except Exception as e:
print(f" Calendar failed: {e}", file=sys.stderr)
sections.append(("Calendar", "## Calendar\n\n*Calendar data unavailable.*\n", f"error: {e}"))
# --- Reminders (CalDAV VTODO) ---
reminder_sources = config.get("reminder_sources", [])
reminder_mode = config.get("reminder_mode", "due_today_3days")
if reminder_sources:
try:
from reminders_agent import run as reminders_run
md, summary = reminders_run(reminders_config=reminder_sources, mode=reminder_mode)
sections.append(("Reminders", md, summary))
print(f" Reminders: {summary}")
except Exception as e:
print(f" Reminders failed: {e}", file=sys.stderr)
sections.append(("Reminders", "## Reminders\n\n*Unavailable.*\n", f"error: {e}"))
# --- Notes (via Mac bridge) ---
notes_config = config.get("notes", {})
if notes_config.get("enabled", False):
try:
from notes_agent import run as notes_run
md, summary = notes_run(config=notes_config)
sections.append(("Notes", md, summary))
print(f" Notes: {summary}")
except Exception as e:
print(f" Notes failed: {e}", file=sys.stderr)
sections.append(("Notes", "## Recent Notes\n\n*Unavailable.*\n", f"error: {e}"))
# --- Reading List (via Mac bridge) ---
reading_config = config.get("reading_list", {})
if reading_config.get("enabled", False):
try:
from reading_list_agent import run as reading_list_run
md, summary = reading_list_run(config=reading_config)
sections.append(("Reading List", md, summary))
print(f" Reading List: {summary}")
except Exception as e:
print(f" Reading List failed: {e}", file=sys.stderr)
sections.append(("Reading List", "## Reading List\n\n*Unavailable.*\n", f"error: {e}"))
# --- Project Monitors (LLM-powered) ---
instance_id = config.get("instance_id", 0)
user_id = config.get("user_id", 0)
if user_id:
try:
# Fetch this user's project-monitor instances that are set to include in briefing
pm_instances = api_request(
f"{DASHBOARD_API}/api/instances/by-user/{user_id}?catalog_id=project-monitor",
retries=1,
)
project_sections = []
for pm in pm_instances:
pm_config = pm.get("config", {})
if str(pm_config.get("include_in_briefing", "false")).lower() != "true":
continue
try:
from project_monitor import run as pm_run
md, summary = pm_run(pm_config, user_id=user_id, instance_id=pm.get("id"))
project_sections.append(md)
print(f" Project [{pm_config.get('project_name', '?')}]: {summary[:80]}")
except Exception as e:
print(f" Project [{pm_config.get('project_name', '?')}] failed: {e}", file=sys.stderr)
if project_sections:
combined = "## Projects\n\n" + "\n\n".join(project_sections)
sections.append(("Projects", combined, f"{len(project_sections)} project(s)"))
except Exception as e:
print(f" Project monitors skipped: {e}", file=sys.stderr)
return sections
def compose_briefing(config, sections):
"""Compose the full daily briefing markdown from sub-agent sections."""
now = datetime.now(MT)
date_str = now.strftime("%A, %B %d, %Y")
person = config.get("person", "")
location = config.get("location", {})
loc_label = location.get("name", "")
if location.get("state"):
loc_label += f", {location['state']}"
md = f"# {person}'s Daily Briefing\n"
md += f"**{date_str}** | {loc_label}\n\n"
md += "---\n\n"
for _name, section_md, _summary in sections:
md += section_md + "\n\n"
md += "---\n"
md += f"*Generated at {now.strftime('%I:%M %p MT')} by {person}'s Daily Briefing Agent*\n"
return md
def post_to_wiki(config, markdown, date_str):
"""Post the briefing to wiki under Year/Month hierarchy."""
wiki_collection = config.get("wiki_collection_id", DEFAULT_WIKI_COLLECTION)
wiki_parent_id = config["wiki_parent_doc_id"]
now = datetime.now(MT)
year_str = str(now.year)
month_str = MONTH_NAMES[now.month]
year_id = ensure_child_doc(
wiki_parent_id, year_str,
f"Daily briefings for {year_str}.",
collection_id=wiki_collection,
)
month_id = ensure_child_doc(
year_id, month_str,
f"Daily briefings for {month_str} {year_str}.",
collection_id=wiki_collection,
)
title = f"Daily Briefing — {date_str}"
doc_id = find_child_doc(month_id, title, collection_id=wiki_collection)
if doc_id:
api_request(
f"{WIKI_API}/documents.update",
data={"id": doc_id, "text": markdown, "publish": True},
headers=wiki_headers(),
method="POST",
)
return doc_id, "updated"
else:
result = api_request(
f"{WIKI_API}/documents.create",
data={
"title": title,
"text": markdown,
"collectionId": wiki_collection,
"parentDocumentId": month_id,
"publish": True,
},
headers=wiki_headers(),
method="POST",
)
return result["data"]["id"], "created"
def run(config):
"""Run the full daily briefing pipeline for a given config.
Config keys:
person (str): Person's name (e.g., "Eric")
agent_id (str): Dashboard agent ID (e.g., "eric-daily-briefing")
wiki_parent_doc_id (str): Outline doc ID for this person's briefing root
location (dict): {name, state, country, lat, lon}
"""
agent_id = config["agent_id"]
instance_id = config.get("instance_id", 0)
# Fetch live config from dashboard API, merge over defaults
if instance_id:
try:
live_config = api_request(f"{DASHBOARD_API}/api/instances/{instance_id}/config")
if live_config.get("location"):
config["location"] = live_config["location"]
print(f"Using live config location: {config['location'].get('name', '?')}")
if live_config.get("calendars"):
config["calendars"] = live_config["calendars"]
except Exception as e:
print(f"Could not fetch live config, using defaults: {e}")
try:
print(f"Collecting sub-agent data for {config['person']}...")
sections = collect_sections(config)
print("Composing briefing...")
markdown = compose_briefing(config, sections)
date_str = datetime.now(MT).strftime("%Y-%m-%d")
print("Posting to wiki...")
doc_id, action = post_to_wiki(config, markdown, date_str)
print(f"Wiki doc {action}: {doc_id}")
summaries = "; ".join(f"{name}: {s}" for name, _, s in sections)
output = f"Briefing {action}. {summaries}"
log_run(agent_id, "success", output=output, instance_id=instance_id, metadata={
"wiki_doc_id": doc_id,
"action": action,
"sub_agents": [name for name, _, _ in sections],
})
print(f"Done: {output}")
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
print(f"Error: {err_msg}", file=sys.stderr)
log_run(agent_id, "failed", err=err_msg, instance_id=instance_id)
sys.exit(1)