Files
ai-agents/agents/reminders_agent.py

244 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Reminders Agent
Fetches reminders (VTODO) from CalDAV servers (iCloud, Google, etc.)
Returns a markdown section for the daily briefing.
"""
import sys
from datetime import datetime, timedelta, date
from shared import MT, log_run
AGENT_ID = "reminders"
# Display mode options (configurable per user in GUI)
MODE_DUE_TODAY = "due_today_overdue"
MODE_DUE_3DAYS = "due_today_3days"
MODE_ALL_INCOMPLETE = "all_incomplete"
def fetch_reminders(source):
"""Fetch incomplete VTODO items from a CalDAV server."""
import caldav
todos = []
try:
client = caldav.DAVClient(
url=source["url"],
username=source.get("username", ""),
password=source.get("password", ""),
)
principal = client.principal()
calendars = principal.calendars()
for cal in calendars:
try:
cal_todos = cal.todos(include_completed=False)
for todo in cal_todos:
parsed = parse_vtodo(todo, source["name"], cal.name)
if parsed:
todos.append(parsed)
except Exception as e:
# Some calendars don't support VTODO
pass
except Exception as e:
print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr)
return todos
def parse_vtodo(todo, source_name, list_name):
"""Parse a CalDAV VTODO into a dict."""
from icalendar import Calendar
try:
cal = Calendar.from_ical(todo.data)
for component in cal.walk():
if component.name != "VTODO":
continue
summary = str(component.get("SUMMARY", "Untitled"))
description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else ""
priority_val = component.get("PRIORITY")
priority = int(str(priority_val)) if priority_val else 0
status = str(component.get("STATUS", "NEEDS-ACTION"))
url = str(component.get("URL", "")) if component.get("URL") else ""
# Due date
due = component.get("DUE")
due_dt = None
all_day_due = False
if due:
dt = due.dt if hasattr(due, "dt") else due
if isinstance(dt, date) and not isinstance(dt, datetime):
due_dt = datetime.combine(dt, datetime.min.time()).replace(tzinfo=MT)
all_day_due = True
else:
due_dt = dt.astimezone(MT) if dt.tzinfo else dt.replace(tzinfo=MT)
# Recurring
rrule = component.get("RRULE")
is_recurring = rrule is not None
return {
"summary": summary,
"list": list_name,
"source": source_name,
"due": due_dt,
"all_day_due": all_day_due,
"priority": priority,
"description": description[:200] if description else "",
"url": url,
"recurring": is_recurring,
"status": status,
}
except Exception:
pass
return None
def filter_reminders(todos, mode):
"""Filter reminders based on display mode."""
now = datetime.now(MT)
today = now.date()
today_end = datetime.combine(today + timedelta(days=1), datetime.min.time()).replace(tzinfo=MT)
three_days = datetime.combine(today + timedelta(days=3), datetime.min.time()).replace(tzinfo=MT)
if mode == MODE_ALL_INCOMPLETE:
return todos
filtered = []
for t in todos:
if t["due"] is None:
# No due date — include in "all incomplete" only
if mode == MODE_ALL_INCOMPLETE:
filtered.append(t)
continue
if t["due"] < now:
# Overdue — always include
t["_overdue"] = True
filtered.append(t)
elif mode == MODE_DUE_TODAY and t["due"] < today_end:
filtered.append(t)
elif mode == MODE_DUE_3DAYS and t["due"] < three_days:
filtered.append(t)
return filtered
def format_section(todos, mode):
"""Format reminders into a markdown section."""
now = datetime.now(MT)
today = now.date()
md = "## Reminders\n\n"
if not todos:
if mode == MODE_DUE_TODAY:
md += "*No reminders due today or overdue.*\n"
elif mode == MODE_DUE_3DAYS:
md += "*No reminders due in the next 3 days.*\n"
else:
md += "*No incomplete reminders.*\n"
return md, "No reminders"
# Separate overdue from upcoming
overdue = [t for t in todos if t.get("_overdue")]
upcoming = [t for t in todos if not t.get("_overdue")]
# Sort
overdue.sort(key=lambda t: t["due"] or datetime.min.replace(tzinfo=MT))
upcoming.sort(key=lambda t: t["due"] or datetime.max.replace(tzinfo=MT))
if overdue:
md += "### Overdue\n\n"
md += "| Reminder | List | Due | Priority |\n"
md += "|----------|------|-----|----------|\n"
for t in overdue:
due_str = t["due"].strftime("%b %d") if t["due"] else "-"
pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else ""
recurring = " (recurring)" if t["recurring"] else ""
md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n"
md += "\n"
if upcoming:
md += "### Upcoming\n\n"
md += "| Reminder | List | Due | Priority |\n"
md += "|----------|------|-----|----------|\n"
for t in upcoming:
if t["due"]:
if t["due"].date() == today:
due_str = "Today" + ("" if t["all_day_due"] else f" {t['due'].strftime('%-I:%M %p')}")
else:
due_str = t["due"].strftime("%b %d")
if not t["all_day_due"]:
due_str += f" {t['due'].strftime('%-I:%M %p')}"
else:
due_str = "No date"
pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else ""
recurring = " (recurring)" if t["recurring"] else ""
md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n"
md += "\n"
overdue_count = len(overdue)
upcoming_count = len(upcoming)
parts = []
if overdue_count:
parts.append(f"{overdue_count} overdue")
if upcoming_count:
parts.append(f"{upcoming_count} upcoming")
summary = ", ".join(parts) if parts else "No reminders"
return md, summary
def run(reminders_config=None, mode=MODE_DUE_3DAYS):
"""Run the reminders agent.
Args:
reminders_config: list of CalDAV source dicts [{name, url, username, password}]
mode: display mode (due_today_overdue, due_today_3days, all_incomplete)
"""
sources = reminders_config or []
if not sources:
return "## Reminders\n\n*No reminder sources configured. Add CalDAV credentials in Settings.*\n", "Not configured"
all_todos = []
for source in sources:
print(f" Fetching reminders: {source.get('name', '?')}...")
todos = fetch_reminders(source)
print(f" Got {len(todos)} incomplete reminders from {source.get('name', '?')}")
all_todos.extend(todos)
filtered = filter_reminders(all_todos, mode)
section, summary = format_section(filtered, mode)
log_run(AGENT_ID, "success", output=summary, metadata={
"sources": len(sources),
"total_incomplete": len(all_todos),
"filtered": len(filtered),
"mode": mode,
})
return section, summary
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--url", required=True, help="CalDAV URL")
parser.add_argument("--user", required=True, help="Username")
parser.add_argument("--password", required=True, help="Password")
parser.add_argument("--name", default="Test", help="Source name")
parser.add_argument("--mode", default=MODE_DUE_3DAYS, choices=[MODE_DUE_TODAY, MODE_DUE_3DAYS, MODE_ALL_INCOMPLETE])
args = parser.parse_args()
section, summary = run(
[{"name": args.name, "url": args.url, "username": args.user, "password": args.password}],
mode=args.mode,
)
print(section)
print(f"\nSummary: {summary}")