Files
ai-agents/agents/calendar_agent.py
T

278 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Calendar Agent
Fetches events from CalDAV servers and ICS URLs.
Returns a markdown section for the daily briefing.
"""
import sys
from datetime import datetime, timedelta, date
from urllib import request as urlreq
from shared import MT, log_run
AGENT_ID = "calendar"
LOOKAHEAD_DAYS = 2 # Today + 2 days
def fetch_caldav_events(source, start_dt, end_dt):
"""Fetch events from a CalDAV server."""
import caldav
events = []
try:
client = caldav.DAVClient(
url=source["url"],
username=source.get("username", ""),
password=source.get("password", ""),
)
principal = client.principal()
calendars = principal.calendars()
for cal in calendars:
try:
results = cal.search(
start=start_dt,
end=end_dt,
event=True,
expand=True,
)
for event in results:
parsed = parse_vevent(event.data, source["name"])
if parsed:
events.extend(parsed)
except Exception as e:
print(f" Warning: calendar {cal.name} error: {e}", file=sys.stderr)
except Exception as e:
print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr)
return events
def fetch_ics_events(source, start_dt, end_dt):
"""Fetch events from an ICS URL."""
from icalendar import Calendar
import recurring_ical_events
events = []
try:
headers = {}
if source.get("username") and source.get("password"):
import base64
creds = base64.b64encode(f"{source['username']}:{source['password']}".encode()).decode()
headers["Authorization"] = f"Basic {creds}"
req = urlreq.Request(source["url"], headers=headers)
with urlreq.urlopen(req, timeout=30) as resp:
ics_data = resp.read()
cal = Calendar.from_ical(ics_data)
recurring = recurring_ical_events.of(cal).between(start_dt, end_dt)
for event in recurring:
ev = parse_ical_event(event, source["name"])
if ev:
events.append(ev)
except Exception as e:
print(f" ICS error for {source['name']}: {e}", file=sys.stderr)
return events
def parse_vevent(ical_str, source_name):
"""Parse a VEVENT string into event dicts."""
from icalendar import Calendar
events = []
try:
cal = Calendar.from_ical(ical_str)
for component in cal.walk():
if component.name == "VEVENT":
ev = parse_ical_event(component, source_name)
if ev:
events.append(ev)
except Exception:
pass
return events
def parse_ical_event(component, source_name):
"""Parse an icalendar VEVENT component into a dict."""
try:
summary = str(component.get("SUMMARY", "No title"))
dtstart = component.get("DTSTART")
dtend = component.get("DTEND")
location = str(component.get("LOCATION", "")) if component.get("LOCATION") else ""
if dtstart is None:
return None
start = dtstart.dt if hasattr(dtstart, "dt") else dtstart
end = dtend.dt if dtend and hasattr(dtend, "dt") else None
# Determine if all-day
all_day = isinstance(start, date) and not isinstance(start, datetime)
if all_day:
start_dt = datetime.combine(start, datetime.min.time()).replace(tzinfo=MT)
end_dt = datetime.combine(end, datetime.min.time()).replace(tzinfo=MT) if end else start_dt
else:
if start.tzinfo is None:
start_dt = start.replace(tzinfo=MT)
else:
start_dt = start.astimezone(MT)
if end:
end_dt = end.astimezone(MT) if end.tzinfo else end.replace(tzinfo=MT)
else:
end_dt = start_dt
return {
"summary": summary,
"start": start_dt,
"end": end_dt,
"all_day": all_day,
"location": location,
"source": source_name,
}
except Exception:
return None
def format_section(events):
"""Format events into a markdown section grouped by day."""
now = datetime.now(MT)
today = now.date()
tomorrow = today + timedelta(days=1)
day_after = today + timedelta(days=2)
day_labels = {
today: "Today",
tomorrow: "Tomorrow",
day_after: (today + timedelta(days=2)).strftime("%A"),
}
# Group events by day
by_day = {today: [], tomorrow: [], day_after: []}
for ev in events:
ev_date = ev["start"].date()
if ev_date in by_day:
by_day[ev_date].append(ev)
# Sort each day's events
for day in by_day:
by_day[day].sort(key=lambda e: (not e["all_day"], e["start"]))
md = "## Calendar\n\n"
total = sum(len(v) for v in by_day.values())
if total == 0:
md += "*No events in the next 3 days.*\n"
return md, "No upcoming events"
for day in [today, tomorrow, day_after]:
day_events = by_day[day]
label = day_labels[day]
date_str = day.strftime("%B %d")
md += f"### {label}{date_str}\n\n"
if not day_events:
md += "*No events*\n\n"
continue
md += "| Time | Event | Calendar |\n"
md += "|------|-------|----------|\n"
for ev in day_events:
if ev["all_day"]:
time_str = "All day"
else:
start_time = ev["start"].strftime("%-I:%M %p")
end_time = ev["end"].strftime("%-I:%M %p")
time_str = f"{start_time} - {end_time}"
summary = ev["summary"]
if ev["location"]:
summary += f" ({ev['location']})"
md += f"| {time_str} | {summary} | {ev['source']} |\n"
md += "\n"
today_count = len(by_day[today])
tomorrow_count = len(by_day[tomorrow])
summary = f"{today_count} event{'s' if today_count != 1 else ''} today"
if tomorrow_count:
summary += f", {tomorrow_count} tomorrow"
return md, summary
def run(calendars_config=None):
"""Run the calendar agent.
Args:
calendars_config: list of calendar source dicts, each with:
name, type (caldav/ics), url, username (optional), password (optional)
Returns:
(markdown_section, summary)
"""
calendars = calendars_config or []
if not calendars:
return "## Calendar\n\n*No calendar sources configured. Add them in Settings.*\n", "No calendars configured"
now = datetime.now(MT)
start_dt = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = start_dt + timedelta(days=LOOKAHEAD_DAYS + 1)
all_events = []
for source in calendars:
src_type = source.get("type", "ics")
print(f" Fetching {src_type}: {source.get('name', '?')}...")
if src_type == "caldav":
events = fetch_caldav_events(source, start_dt, end_dt)
elif src_type == "ics":
events = fetch_ics_events(source, start_dt, end_dt)
else:
print(f" Unknown type: {src_type}", file=sys.stderr)
continue
print(f" Got {len(events)} events from {source.get('name', '?')}")
all_events.extend(events)
section, summary = format_section(all_events)
log_run(AGENT_ID, "success", output=summary, metadata={
"sources": len(calendars),
"events": len(all_events),
})
return section, summary
if __name__ == "__main__":
# Standalone test with a sample ICS
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--ics", help="ICS URL to test")
parser.add_argument("--caldav", help="CalDAV URL to test")
parser.add_argument("--user", help="Username")
parser.add_argument("--password", help="Password")
parser.add_argument("--name", default="Test", help="Calendar name")
args = parser.parse_args()
sources = []
if args.ics:
sources.append({"name": args.name, "type": "ics", "url": args.ics})
if args.caldav:
sources.append({"name": args.name, "type": "caldav", "url": args.caldav,
"username": args.user or "", "password": args.password or ""})
if sources:
section, summary = run(sources)
print(section)
print(f"\nSummary: {summary}")
else:
print("Provide --ics or --caldav URL")