#!/usr/bin/env python3 """ Reminders Agent Fetches reminders (VTODO) from CalDAV servers (iCloud, Google, etc.) Returns a markdown section for the daily briefing. """ import sys from datetime import datetime, timedelta, date from shared import MT, log_run AGENT_ID = "reminders" # Display mode options (configurable per user in GUI) MODE_DUE_TODAY = "due_today_overdue" MODE_DUE_3DAYS = "due_today_3days" MODE_ALL_INCOMPLETE = "all_incomplete" def fetch_reminders(source): """Fetch incomplete VTODO items from a CalDAV server.""" import caldav todos = [] try: client = caldav.DAVClient( url=source["url"], username=source.get("username", ""), password=source.get("password", ""), ) principal = client.principal() calendars = principal.calendars() for cal in calendars: try: cal_todos = cal.todos(include_completed=False) for todo in cal_todos: parsed = parse_vtodo(todo, source["name"], cal.name) if parsed: todos.append(parsed) except Exception as e: # Some calendars don't support VTODO pass except Exception as e: print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr) return todos def parse_vtodo(todo, source_name, list_name): """Parse a CalDAV VTODO into a dict.""" from icalendar import Calendar try: cal = Calendar.from_ical(todo.data) for component in cal.walk(): if component.name != "VTODO": continue summary = str(component.get("SUMMARY", "Untitled")) description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else "" priority_val = component.get("PRIORITY") priority = int(str(priority_val)) if priority_val else 0 status = str(component.get("STATUS", "NEEDS-ACTION")) url = str(component.get("URL", "")) if component.get("URL") else "" # Due date due = component.get("DUE") due_dt = None all_day_due = False if due: dt = due.dt if hasattr(due, "dt") else due if isinstance(dt, date) and not isinstance(dt, datetime): due_dt = datetime.combine(dt, datetime.min.time()).replace(tzinfo=MT) all_day_due = True else: due_dt = dt.astimezone(MT) if dt.tzinfo else dt.replace(tzinfo=MT) # Recurring rrule = component.get("RRULE") is_recurring = rrule is not None return { "summary": summary, "list": list_name, "source": source_name, "due": due_dt, "all_day_due": all_day_due, "priority": priority, "description": description[:200] if description else "", "url": url, "recurring": is_recurring, "status": status, } except Exception: pass return None def filter_reminders(todos, mode): """Filter reminders based on display mode.""" now = datetime.now(MT) today = now.date() today_end = datetime.combine(today + timedelta(days=1), datetime.min.time()).replace(tzinfo=MT) three_days = datetime.combine(today + timedelta(days=3), datetime.min.time()).replace(tzinfo=MT) if mode == MODE_ALL_INCOMPLETE: return todos filtered = [] for t in todos: if t["due"] is None: # No due date — include in "all incomplete" only if mode == MODE_ALL_INCOMPLETE: filtered.append(t) continue if t["due"] < now: # Overdue — always include t["_overdue"] = True filtered.append(t) elif mode == MODE_DUE_TODAY and t["due"] < today_end: filtered.append(t) elif mode == MODE_DUE_3DAYS and t["due"] < three_days: filtered.append(t) return filtered def format_section(todos, mode): """Format reminders into a markdown section.""" now = datetime.now(MT) today = now.date() md = "## Reminders\n\n" if not todos: if mode == MODE_DUE_TODAY: md += "*No reminders due today or overdue.*\n" elif mode == MODE_DUE_3DAYS: md += "*No reminders due in the next 3 days.*\n" else: md += "*No incomplete reminders.*\n" return md, "No reminders" # Separate overdue from upcoming overdue = [t for t in todos if t.get("_overdue")] upcoming = [t for t in todos if not t.get("_overdue")] # Sort overdue.sort(key=lambda t: t["due"] or datetime.min.replace(tzinfo=MT)) upcoming.sort(key=lambda t: t["due"] or datetime.max.replace(tzinfo=MT)) if overdue: md += "### Overdue\n\n" md += "| Reminder | List | Due | Priority |\n" md += "|----------|------|-----|----------|\n" for t in overdue: due_str = t["due"].strftime("%b %d") if t["due"] else "-" pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else "" recurring = " (recurring)" if t["recurring"] else "" md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n" md += "\n" if upcoming: md += "### Upcoming\n\n" md += "| Reminder | List | Due | Priority |\n" md += "|----------|------|-----|----------|\n" for t in upcoming: if t["due"]: if t["due"].date() == today: due_str = "Today" + ("" if t["all_day_due"] else f" {t['due'].strftime('%-I:%M %p')}") else: due_str = t["due"].strftime("%b %d") if not t["all_day_due"]: due_str += f" {t['due'].strftime('%-I:%M %p')}" else: due_str = "No date" pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else "" recurring = " (recurring)" if t["recurring"] else "" md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n" md += "\n" overdue_count = len(overdue) upcoming_count = len(upcoming) parts = [] if overdue_count: parts.append(f"{overdue_count} overdue") if upcoming_count: parts.append(f"{upcoming_count} upcoming") summary = ", ".join(parts) if parts else "No reminders" return md, summary def run(reminders_config=None, mode=MODE_DUE_3DAYS): """Run the reminders agent. Args: reminders_config: list of CalDAV source dicts [{name, url, username, password}] mode: display mode (due_today_overdue, due_today_3days, all_incomplete) """ sources = reminders_config or [] if not sources: return "## Reminders\n\n*No reminder sources configured. Add CalDAV credentials in Settings.*\n", "Not configured" all_todos = [] for source in sources: print(f" Fetching reminders: {source.get('name', '?')}...") todos = fetch_reminders(source) print(f" Got {len(todos)} incomplete reminders from {source.get('name', '?')}") all_todos.extend(todos) filtered = filter_reminders(all_todos, mode) section, summary = format_section(filtered, mode) log_run(AGENT_ID, "success", output=summary, metadata={ "sources": len(sources), "total_incomplete": len(all_todos), "filtered": len(filtered), "mode": mode, }) return section, summary if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--url", required=True, help="CalDAV URL") parser.add_argument("--user", required=True, help="Username") parser.add_argument("--password", required=True, help="Password") parser.add_argument("--name", default="Test", help="Source name") parser.add_argument("--mode", default=MODE_DUE_3DAYS, choices=[MODE_DUE_TODAY, MODE_DUE_3DAYS, MODE_ALL_INCOMPLETE]) args = parser.parse_args() section, summary = run( [{"name": args.name, "url": args.url, "username": args.user, "password": args.password}], mode=args.mode, ) print(section) print(f"\nSummary: {summary}")