Add calendar agent (CalDAV + ICS), wire into daily briefing
This commit is contained in:
@@ -0,0 +1,277 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Calendar Agent
|
||||
Fetches events from CalDAV servers and ICS URLs.
|
||||
Returns a markdown section for the daily briefing.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from datetime import datetime, timedelta, date
|
||||
from urllib import request as urlreq
|
||||
from shared import MT, log_run
|
||||
|
||||
AGENT_ID = "calendar"
|
||||
LOOKAHEAD_DAYS = 2 # Today + 2 days
|
||||
|
||||
|
||||
def fetch_caldav_events(source, start_dt, end_dt):
|
||||
"""Fetch events from a CalDAV server."""
|
||||
import caldav
|
||||
|
||||
events = []
|
||||
try:
|
||||
client = caldav.DAVClient(
|
||||
url=source["url"],
|
||||
username=source.get("username", ""),
|
||||
password=source.get("password", ""),
|
||||
)
|
||||
principal = client.principal()
|
||||
calendars = principal.calendars()
|
||||
|
||||
for cal in calendars:
|
||||
try:
|
||||
results = cal.search(
|
||||
start=start_dt,
|
||||
end=end_dt,
|
||||
event=True,
|
||||
expand=True,
|
||||
)
|
||||
for event in results:
|
||||
parsed = parse_vevent(event.data, source["name"])
|
||||
if parsed:
|
||||
events.extend(parsed)
|
||||
except Exception as e:
|
||||
print(f" Warning: calendar {cal.name} error: {e}", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr)
|
||||
|
||||
return events
|
||||
|
||||
|
||||
def fetch_ics_events(source, start_dt, end_dt):
|
||||
"""Fetch events from an ICS URL."""
|
||||
from icalendar import Calendar
|
||||
import recurring_ical_events
|
||||
|
||||
events = []
|
||||
try:
|
||||
headers = {}
|
||||
if source.get("username") and source.get("password"):
|
||||
import base64
|
||||
creds = base64.b64encode(f"{source['username']}:{source['password']}".encode()).decode()
|
||||
headers["Authorization"] = f"Basic {creds}"
|
||||
|
||||
req = urlreq.Request(source["url"], headers=headers)
|
||||
with urlreq.urlopen(req, timeout=30) as resp:
|
||||
ics_data = resp.read()
|
||||
|
||||
cal = Calendar.from_ical(ics_data)
|
||||
recurring = recurring_ical_events.of(cal).between(start_dt, end_dt)
|
||||
|
||||
for event in recurring:
|
||||
ev = parse_ical_event(event, source["name"])
|
||||
if ev:
|
||||
events.append(ev)
|
||||
except Exception as e:
|
||||
print(f" ICS error for {source['name']}: {e}", file=sys.stderr)
|
||||
|
||||
return events
|
||||
|
||||
|
||||
def parse_vevent(ical_str, source_name):
|
||||
"""Parse a VEVENT string into event dicts."""
|
||||
from icalendar import Calendar
|
||||
|
||||
events = []
|
||||
try:
|
||||
cal = Calendar.from_ical(ical_str)
|
||||
for component in cal.walk():
|
||||
if component.name == "VEVENT":
|
||||
ev = parse_ical_event(component, source_name)
|
||||
if ev:
|
||||
events.append(ev)
|
||||
except Exception:
|
||||
pass
|
||||
return events
|
||||
|
||||
|
||||
def parse_ical_event(component, source_name):
|
||||
"""Parse an icalendar VEVENT component into a dict."""
|
||||
try:
|
||||
summary = str(component.get("SUMMARY", "No title"))
|
||||
dtstart = component.get("DTSTART")
|
||||
dtend = component.get("DTEND")
|
||||
location = str(component.get("LOCATION", "")) if component.get("LOCATION") else ""
|
||||
|
||||
if dtstart is None:
|
||||
return None
|
||||
|
||||
start = dtstart.dt if hasattr(dtstart, "dt") else dtstart
|
||||
end = dtend.dt if dtend and hasattr(dtend, "dt") else None
|
||||
|
||||
# Determine if all-day
|
||||
all_day = isinstance(start, date) and not isinstance(start, datetime)
|
||||
|
||||
if all_day:
|
||||
start_dt = datetime.combine(start, datetime.min.time()).replace(tzinfo=MT)
|
||||
end_dt = datetime.combine(end, datetime.min.time()).replace(tzinfo=MT) if end else start_dt
|
||||
else:
|
||||
if start.tzinfo is None:
|
||||
start_dt = start.replace(tzinfo=MT)
|
||||
else:
|
||||
start_dt = start.astimezone(MT)
|
||||
if end:
|
||||
end_dt = end.astimezone(MT) if end.tzinfo else end.replace(tzinfo=MT)
|
||||
else:
|
||||
end_dt = start_dt
|
||||
|
||||
return {
|
||||
"summary": summary,
|
||||
"start": start_dt,
|
||||
"end": end_dt,
|
||||
"all_day": all_day,
|
||||
"location": location,
|
||||
"source": source_name,
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def format_section(events):
|
||||
"""Format events into a markdown section grouped by day."""
|
||||
now = datetime.now(MT)
|
||||
today = now.date()
|
||||
tomorrow = today + timedelta(days=1)
|
||||
day_after = today + timedelta(days=2)
|
||||
|
||||
day_labels = {
|
||||
today: "Today",
|
||||
tomorrow: "Tomorrow",
|
||||
day_after: (today + timedelta(days=2)).strftime("%A"),
|
||||
}
|
||||
|
||||
# Group events by day
|
||||
by_day = {today: [], tomorrow: [], day_after: []}
|
||||
for ev in events:
|
||||
ev_date = ev["start"].date()
|
||||
if ev_date in by_day:
|
||||
by_day[ev_date].append(ev)
|
||||
|
||||
# Sort each day's events
|
||||
for day in by_day:
|
||||
by_day[day].sort(key=lambda e: (not e["all_day"], e["start"]))
|
||||
|
||||
md = "## Calendar\n\n"
|
||||
|
||||
total = sum(len(v) for v in by_day.values())
|
||||
if total == 0:
|
||||
md += "*No events in the next 3 days.*\n"
|
||||
return md, "No upcoming events"
|
||||
|
||||
for day in [today, tomorrow, day_after]:
|
||||
day_events = by_day[day]
|
||||
label = day_labels[day]
|
||||
date_str = day.strftime("%B %d")
|
||||
|
||||
md += f"### {label} — {date_str}\n\n"
|
||||
|
||||
if not day_events:
|
||||
md += "*No events*\n\n"
|
||||
continue
|
||||
|
||||
md += "| Time | Event | Calendar |\n"
|
||||
md += "|------|-------|----------|\n"
|
||||
|
||||
for ev in day_events:
|
||||
if ev["all_day"]:
|
||||
time_str = "All day"
|
||||
else:
|
||||
start_time = ev["start"].strftime("%-I:%M %p")
|
||||
end_time = ev["end"].strftime("%-I:%M %p")
|
||||
time_str = f"{start_time} - {end_time}"
|
||||
|
||||
summary = ev["summary"]
|
||||
if ev["location"]:
|
||||
summary += f" ({ev['location']})"
|
||||
|
||||
md += f"| {time_str} | {summary} | {ev['source']} |\n"
|
||||
|
||||
md += "\n"
|
||||
|
||||
today_count = len(by_day[today])
|
||||
tomorrow_count = len(by_day[tomorrow])
|
||||
summary = f"{today_count} event{'s' if today_count != 1 else ''} today"
|
||||
if tomorrow_count:
|
||||
summary += f", {tomorrow_count} tomorrow"
|
||||
|
||||
return md, summary
|
||||
|
||||
|
||||
def run(calendars_config=None):
|
||||
"""Run the calendar agent.
|
||||
|
||||
Args:
|
||||
calendars_config: list of calendar source dicts, each with:
|
||||
name, type (caldav/ics), url, username (optional), password (optional)
|
||||
|
||||
Returns:
|
||||
(markdown_section, summary)
|
||||
"""
|
||||
calendars = calendars_config or []
|
||||
|
||||
if not calendars:
|
||||
return "## Calendar\n\n*No calendar sources configured. Add them in Settings.*\n", "No calendars configured"
|
||||
|
||||
now = datetime.now(MT)
|
||||
start_dt = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
end_dt = start_dt + timedelta(days=LOOKAHEAD_DAYS + 1)
|
||||
|
||||
all_events = []
|
||||
|
||||
for source in calendars:
|
||||
src_type = source.get("type", "ics")
|
||||
print(f" Fetching {src_type}: {source.get('name', '?')}...")
|
||||
|
||||
if src_type == "caldav":
|
||||
events = fetch_caldav_events(source, start_dt, end_dt)
|
||||
elif src_type == "ics":
|
||||
events = fetch_ics_events(source, start_dt, end_dt)
|
||||
else:
|
||||
print(f" Unknown type: {src_type}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
print(f" Got {len(events)} events from {source.get('name', '?')}")
|
||||
all_events.extend(events)
|
||||
|
||||
section, summary = format_section(all_events)
|
||||
log_run(AGENT_ID, "success", output=summary, metadata={
|
||||
"sources": len(calendars),
|
||||
"events": len(all_events),
|
||||
})
|
||||
return section, summary
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Standalone test with a sample ICS
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--ics", help="ICS URL to test")
|
||||
parser.add_argument("--caldav", help="CalDAV URL to test")
|
||||
parser.add_argument("--user", help="Username")
|
||||
parser.add_argument("--password", help="Password")
|
||||
parser.add_argument("--name", default="Test", help="Calendar name")
|
||||
args = parser.parse_args()
|
||||
|
||||
sources = []
|
||||
if args.ics:
|
||||
sources.append({"name": args.name, "type": "ics", "url": args.ics})
|
||||
if args.caldav:
|
||||
sources.append({"name": args.name, "type": "caldav", "url": args.caldav,
|
||||
"username": args.user or "", "password": args.password or ""})
|
||||
|
||||
if sources:
|
||||
section, summary = run(sources)
|
||||
print(section)
|
||||
print(f"\nSummary: {summary}")
|
||||
else:
|
||||
print("Provide --ics or --caldav URL")
|
||||
@@ -33,14 +33,16 @@ def collect_sections(config):
|
||||
print(f" Weather failed: {e}", file=sys.stderr)
|
||||
sections.append(("Weather", "## Weather\n\n*Weather data unavailable.*\n", f"error: {e}"))
|
||||
|
||||
# --- Future sub-agents go here ---
|
||||
# Each can receive config params as needed
|
||||
# try:
|
||||
# from calendar_agent import run as calendar_run
|
||||
# md, summary = calendar_run(calendar_id=config.get("calendar_id"))
|
||||
# sections.append(("Calendar", md, summary))
|
||||
# except Exception as e:
|
||||
# sections.append(("Calendar", "## Calendar\n\n*Unavailable.*\n", f"error: {e}"))
|
||||
# --- Calendar ---
|
||||
calendars = config.get("calendars", [])
|
||||
try:
|
||||
from calendar_agent import run as calendar_run
|
||||
md, summary = calendar_run(calendars_config=calendars)
|
||||
sections.append(("Calendar", md, summary))
|
||||
print(f" Calendar: {summary}")
|
||||
except Exception as e:
|
||||
print(f" Calendar failed: {e}", file=sys.stderr)
|
||||
sections.append(("Calendar", "## Calendar\n\n*Calendar data unavailable.*\n", f"error: {e}"))
|
||||
|
||||
return sections
|
||||
|
||||
|
||||
Reference in New Issue
Block a user