Files
ai-agents/agents/daily_briefing.py
T

140 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""
Daily Briefing Agent (Orchestrator)
Calls sub-agents (weather, etc.), collates their output into a single
markdown briefing, and posts it to the Outline wiki.
Hierarchy: Eric's Daily Briefing → {Year}{Month} → Daily Briefing — {date}
"""
import sys
from datetime import datetime
from shared import (
MT, WIKI_API, WIKI_COLLECTION_ID, WIKI_PARENT_DOC_ID, MONTH_NAMES,
api_request, log_run, wiki_headers, find_child_doc, ensure_child_doc,
)
AGENT_ID = "daily-briefing"
def collect_sections():
"""Run each sub-agent and collect markdown sections.
Returns a list of (section_name, markdown, summary) tuples.
Failed sub-agents are logged but don't stop the briefing.
"""
sections = []
# --- Weather ---
try:
from weather_agent import run as weather_run
md, summary = weather_run()
sections.append(("Weather", md, summary))
print(f" Weather: {summary}")
except Exception as e:
print(f" Weather failed: {e}", file=sys.stderr)
sections.append(("Weather", "## Weather\n\n*Weather data unavailable.*\n", f"error: {e}"))
# --- Future sub-agents go here ---
# try:
# from calendar_agent import run as calendar_run
# md, summary = calendar_run()
# sections.append(("Calendar", md, summary))
# except Exception as e:
# sections.append(("Calendar", "## Calendar\n\n*Calendar data unavailable.*\n", f"error: {e}"))
return sections
def compose_briefing(sections):
"""Compose the full daily briefing markdown from sub-agent sections."""
now = datetime.now(MT)
date_str = now.strftime("%A, %B %d, %Y")
md = f"# Daily Briefing\n"
md += f"**{date_str}** | Providence, Utah\n\n"
md += "---\n\n"
for _name, section_md, _summary in sections:
md += section_md + "\n\n"
md += "---\n"
md += f"*Generated at {now.strftime('%I:%M %p MT')} by Daily Briefing Agent*\n"
return md
def post_to_wiki(markdown, date_str):
"""Post the briefing to wiki under Year/Month hierarchy."""
now = datetime.now(MT)
year_str = str(now.year)
month_str = MONTH_NAMES[now.month]
year_id = ensure_child_doc(
WIKI_PARENT_DOC_ID, year_str,
f"Daily briefings for {year_str}.",
)
month_id = ensure_child_doc(
year_id, month_str,
f"Daily briefings for {month_str} {year_str}.",
)
title = f"Daily Briefing — {date_str}"
doc_id = find_child_doc(month_id, title)
if doc_id:
api_request(
f"{WIKI_API}/documents.update",
data={"id": doc_id, "text": markdown, "publish": True},
headers=wiki_headers(),
method="POST",
)
return doc_id, "updated"
else:
result = api_request(
f"{WIKI_API}/documents.create",
data={
"title": title,
"text": markdown,
"collectionId": WIKI_COLLECTION_ID,
"parentDocumentId": month_id,
"publish": True,
},
headers=wiki_headers(),
method="POST",
)
return result["data"]["id"], "created"
def main():
try:
print("Collecting sub-agent data...")
sections = collect_sections()
print("Composing briefing...")
markdown = compose_briefing(sections)
date_str = datetime.now(MT).strftime("%Y-%m-%d")
print("Posting to wiki...")
doc_id, action = post_to_wiki(markdown, date_str)
print(f"Wiki doc {action}: {doc_id}")
summaries = "; ".join(f"{name}: {s}" for name, _, s in sections)
output = f"Briefing {action}. {summaries}"
log_run(AGENT_ID, "success", output=output, metadata={
"wiki_doc_id": doc_id,
"action": action,
"sub_agents": [name for name, _, _ in sections],
})
print(f"Done: {output}")
except Exception as e:
err_msg = f"{type(e).__name__}: {e}"
print(f"Error: {err_msg}", file=sys.stderr)
log_run(AGENT_ID, "failed", err=err_msg)
sys.exit(1)
if __name__ == "__main__":
main()