#!/usr/bin/env python3 """ Calendar Agent Fetches events from CalDAV servers and ICS URLs. Returns a markdown section for the daily briefing. """ import sys from datetime import datetime, timedelta, date from urllib import request as urlreq from shared import MT, log_run AGENT_ID = "calendar" LOOKAHEAD_DAYS = 2 # Today + 2 days def fetch_caldav_events(source, start_dt, end_dt): """Fetch events from a CalDAV server.""" import caldav events = [] try: client = caldav.DAVClient( url=source["url"], username=source.get("username", ""), password=source.get("password", ""), ) principal = client.principal() calendars = principal.calendars() for cal in calendars: try: results = cal.search( start=start_dt, end=end_dt, event=True, expand=True, ) for event in results: parsed = parse_vevent(event.data, source["name"]) if parsed: events.extend(parsed) except Exception as e: print(f" Warning: calendar {cal.name} error: {e}", file=sys.stderr) except Exception as e: print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr) return events def fetch_ics_events(source, start_dt, end_dt): """Fetch events from an ICS URL.""" from icalendar import Calendar import recurring_ical_events events = [] try: headers = {} if source.get("username") and source.get("password"): import base64 creds = base64.b64encode(f"{source['username']}:{source['password']}".encode()).decode() headers["Authorization"] = f"Basic {creds}" req = urlreq.Request(source["url"], headers=headers) with urlreq.urlopen(req, timeout=30) as resp: ics_data = resp.read() cal = Calendar.from_ical(ics_data) recurring = recurring_ical_events.of(cal).between(start_dt, end_dt) for event in recurring: ev = parse_ical_event(event, source["name"]) if ev: events.append(ev) except Exception as e: print(f" ICS error for {source['name']}: {e}", file=sys.stderr) return events def parse_vevent(ical_str, source_name): """Parse a VEVENT string into event dicts.""" from icalendar import Calendar events = [] try: cal = Calendar.from_ical(ical_str) for component in cal.walk(): if component.name == "VEVENT": ev = parse_ical_event(component, source_name) if ev: events.append(ev) except Exception: pass return events def parse_ical_event(component, source_name): """Parse an icalendar VEVENT component into a dict.""" try: summary = str(component.get("SUMMARY", "No title")) dtstart = component.get("DTSTART") dtend = component.get("DTEND") location = str(component.get("LOCATION", "")) if component.get("LOCATION") else "" if dtstart is None: return None start = dtstart.dt if hasattr(dtstart, "dt") else dtstart end = dtend.dt if dtend and hasattr(dtend, "dt") else None # Determine if all-day all_day = isinstance(start, date) and not isinstance(start, datetime) if all_day: start_dt = datetime.combine(start, datetime.min.time()).replace(tzinfo=MT) end_dt = datetime.combine(end, datetime.min.time()).replace(tzinfo=MT) if end else start_dt else: if start.tzinfo is None: start_dt = start.replace(tzinfo=MT) else: start_dt = start.astimezone(MT) if end: end_dt = end.astimezone(MT) if end.tzinfo else end.replace(tzinfo=MT) else: end_dt = start_dt return { "summary": summary, "start": start_dt, "end": end_dt, "all_day": all_day, "location": location, "source": source_name, } except Exception: return None def format_section(events): """Format events into a markdown section grouped by day.""" now = datetime.now(MT) today = now.date() tomorrow = today + timedelta(days=1) day_after = today + timedelta(days=2) day_labels = { today: "Today", tomorrow: "Tomorrow", day_after: (today + timedelta(days=2)).strftime("%A"), } # Group events by day by_day = {today: [], tomorrow: [], day_after: []} for ev in events: ev_date = ev["start"].date() if ev_date in by_day: by_day[ev_date].append(ev) # Sort each day's events for day in by_day: by_day[day].sort(key=lambda e: (not e["all_day"], e["start"])) md = "## Calendar\n\n" total = sum(len(v) for v in by_day.values()) if total == 0: md += "*No events in the next 3 days.*\n" return md, "No upcoming events" for day in [today, tomorrow, day_after]: day_events = by_day[day] label = day_labels[day] date_str = day.strftime("%B %d") md += f"### {label} — {date_str}\n\n" if not day_events: md += "*No events*\n\n" continue md += "| Time | Event | Calendar |\n" md += "|------|-------|----------|\n" for ev in day_events: if ev["all_day"]: time_str = "All day" else: start_time = ev["start"].strftime("%-I:%M %p") end_time = ev["end"].strftime("%-I:%M %p") time_str = f"{start_time} - {end_time}" summary = ev["summary"] if ev["location"]: summary += f" ({ev['location']})" md += f"| {time_str} | {summary} | {ev['source']} |\n" md += "\n" today_count = len(by_day[today]) tomorrow_count = len(by_day[tomorrow]) summary = f"{today_count} event{'s' if today_count != 1 else ''} today" if tomorrow_count: summary += f", {tomorrow_count} tomorrow" return md, summary def run(calendars_config=None): """Run the calendar agent. Args: calendars_config: list of calendar source dicts, each with: name, type (caldav/ics), url, username (optional), password (optional) Returns: (markdown_section, summary) """ calendars = calendars_config or [] if not calendars: return "## Calendar\n\n*No calendar sources configured. Add them in Settings.*\n", "No calendars configured" now = datetime.now(MT) start_dt = now.replace(hour=0, minute=0, second=0, microsecond=0) end_dt = start_dt + timedelta(days=LOOKAHEAD_DAYS + 1) all_events = [] for source in calendars: src_type = source.get("type", "ics") print(f" Fetching {src_type}: {source.get('name', '?')}...") if src_type == "caldav": events = fetch_caldav_events(source, start_dt, end_dt) elif src_type == "ics": events = fetch_ics_events(source, start_dt, end_dt) else: print(f" Unknown type: {src_type}", file=sys.stderr) continue print(f" Got {len(events)} events from {source.get('name', '?')}") all_events.extend(events) section, summary = format_section(all_events) log_run(AGENT_ID, "success", output=summary, metadata={ "sources": len(calendars), "events": len(all_events), }) return section, summary if __name__ == "__main__": # Standalone test with a sample ICS import argparse parser = argparse.ArgumentParser() parser.add_argument("--ics", help="ICS URL to test") parser.add_argument("--caldav", help="CalDAV URL to test") parser.add_argument("--user", help="Username") parser.add_argument("--password", help="Password") parser.add_argument("--name", default="Test", help="Calendar name") args = parser.parse_args() sources = [] if args.ics: sources.append({"name": args.name, "type": "ics", "url": args.ics}) if args.caldav: sources.append({"name": args.name, "type": "caldav", "url": args.caldav, "username": args.user or "", "password": args.password or ""}) if sources: section, summary = run(sources) print(section) print(f"\nSummary: {summary}") else: print("Provide --ics or --caldav URL")