244 lines
8.3 KiB
Python
244 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Reminders Agent
|
|
Fetches reminders (VTODO) from CalDAV servers (iCloud, Google, etc.)
|
|
Returns a markdown section for the daily briefing.
|
|
"""
|
|
|
|
import sys
|
|
from datetime import datetime, timedelta, date
|
|
from shared import MT, log_run
|
|
|
|
AGENT_ID = "reminders"
|
|
|
|
# Display mode options (configurable per user in GUI)
|
|
MODE_DUE_TODAY = "due_today_overdue"
|
|
MODE_DUE_3DAYS = "due_today_3days"
|
|
MODE_ALL_INCOMPLETE = "all_incomplete"
|
|
|
|
|
|
def fetch_reminders(source):
|
|
"""Fetch incomplete VTODO items from a CalDAV server."""
|
|
import caldav
|
|
|
|
todos = []
|
|
try:
|
|
client = caldav.DAVClient(
|
|
url=source["url"],
|
|
username=source.get("username", ""),
|
|
password=source.get("password", ""),
|
|
)
|
|
principal = client.principal()
|
|
calendars = principal.calendars()
|
|
|
|
for cal in calendars:
|
|
try:
|
|
cal_todos = cal.todos(include_completed=False)
|
|
for todo in cal_todos:
|
|
parsed = parse_vtodo(todo, source["name"], cal.name)
|
|
if parsed:
|
|
todos.append(parsed)
|
|
except Exception as e:
|
|
# Some calendars don't support VTODO
|
|
pass
|
|
except Exception as e:
|
|
print(f" CalDAV error for {source['name']}: {e}", file=sys.stderr)
|
|
|
|
return todos
|
|
|
|
|
|
def parse_vtodo(todo, source_name, list_name):
|
|
"""Parse a CalDAV VTODO into a dict."""
|
|
from icalendar import Calendar
|
|
|
|
try:
|
|
cal = Calendar.from_ical(todo.data)
|
|
for component in cal.walk():
|
|
if component.name != "VTODO":
|
|
continue
|
|
|
|
summary = str(component.get("SUMMARY", "Untitled"))
|
|
description = str(component.get("DESCRIPTION", "")) if component.get("DESCRIPTION") else ""
|
|
priority_val = component.get("PRIORITY")
|
|
priority = int(str(priority_val)) if priority_val else 0
|
|
status = str(component.get("STATUS", "NEEDS-ACTION"))
|
|
url = str(component.get("URL", "")) if component.get("URL") else ""
|
|
|
|
# Due date
|
|
due = component.get("DUE")
|
|
due_dt = None
|
|
all_day_due = False
|
|
if due:
|
|
dt = due.dt if hasattr(due, "dt") else due
|
|
if isinstance(dt, date) and not isinstance(dt, datetime):
|
|
due_dt = datetime.combine(dt, datetime.min.time()).replace(tzinfo=MT)
|
|
all_day_due = True
|
|
else:
|
|
due_dt = dt.astimezone(MT) if dt.tzinfo else dt.replace(tzinfo=MT)
|
|
|
|
# Recurring
|
|
rrule = component.get("RRULE")
|
|
is_recurring = rrule is not None
|
|
|
|
return {
|
|
"summary": summary,
|
|
"list": list_name,
|
|
"source": source_name,
|
|
"due": due_dt,
|
|
"all_day_due": all_day_due,
|
|
"priority": priority,
|
|
"description": description[:200] if description else "",
|
|
"url": url,
|
|
"recurring": is_recurring,
|
|
"status": status,
|
|
}
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def filter_reminders(todos, mode):
|
|
"""Filter reminders based on display mode."""
|
|
now = datetime.now(MT)
|
|
today = now.date()
|
|
today_end = datetime.combine(today + timedelta(days=1), datetime.min.time()).replace(tzinfo=MT)
|
|
three_days = datetime.combine(today + timedelta(days=3), datetime.min.time()).replace(tzinfo=MT)
|
|
|
|
if mode == MODE_ALL_INCOMPLETE:
|
|
return todos
|
|
|
|
filtered = []
|
|
for t in todos:
|
|
if t["due"] is None:
|
|
# No due date — include in "all incomplete" only
|
|
if mode == MODE_ALL_INCOMPLETE:
|
|
filtered.append(t)
|
|
continue
|
|
|
|
if t["due"] < now:
|
|
# Overdue — always include
|
|
t["_overdue"] = True
|
|
filtered.append(t)
|
|
elif mode == MODE_DUE_TODAY and t["due"] < today_end:
|
|
filtered.append(t)
|
|
elif mode == MODE_DUE_3DAYS and t["due"] < three_days:
|
|
filtered.append(t)
|
|
|
|
return filtered
|
|
|
|
|
|
def format_section(todos, mode):
|
|
"""Format reminders into a markdown section."""
|
|
now = datetime.now(MT)
|
|
today = now.date()
|
|
|
|
md = "## Reminders\n\n"
|
|
|
|
if not todos:
|
|
if mode == MODE_DUE_TODAY:
|
|
md += "*No reminders due today or overdue.*\n"
|
|
elif mode == MODE_DUE_3DAYS:
|
|
md += "*No reminders due in the next 3 days.*\n"
|
|
else:
|
|
md += "*No incomplete reminders.*\n"
|
|
return md, "No reminders"
|
|
|
|
# Separate overdue from upcoming
|
|
overdue = [t for t in todos if t.get("_overdue")]
|
|
upcoming = [t for t in todos if not t.get("_overdue")]
|
|
|
|
# Sort
|
|
overdue.sort(key=lambda t: t["due"] or datetime.min.replace(tzinfo=MT))
|
|
upcoming.sort(key=lambda t: t["due"] or datetime.max.replace(tzinfo=MT))
|
|
|
|
if overdue:
|
|
md += "### Overdue\n\n"
|
|
md += "| Reminder | List | Due | Priority |\n"
|
|
md += "|----------|------|-----|----------|\n"
|
|
for t in overdue:
|
|
due_str = t["due"].strftime("%b %d") if t["due"] else "-"
|
|
pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else ""
|
|
recurring = " (recurring)" if t["recurring"] else ""
|
|
md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n"
|
|
md += "\n"
|
|
|
|
if upcoming:
|
|
md += "### Upcoming\n\n"
|
|
md += "| Reminder | List | Due | Priority |\n"
|
|
md += "|----------|------|-----|----------|\n"
|
|
for t in upcoming:
|
|
if t["due"]:
|
|
if t["due"].date() == today:
|
|
due_str = "Today" + ("" if t["all_day_due"] else f" {t['due'].strftime('%-I:%M %p')}")
|
|
else:
|
|
due_str = t["due"].strftime("%b %d")
|
|
if not t["all_day_due"]:
|
|
due_str += f" {t['due'].strftime('%-I:%M %p')}"
|
|
else:
|
|
due_str = "No date"
|
|
pri = ["", "High", "Medium", "", "", "", "", "", "", "Low"][min(t["priority"], 9)] if t["priority"] else ""
|
|
recurring = " (recurring)" if t["recurring"] else ""
|
|
md += f"| {t['summary']}{recurring} | {t['list']} | {due_str} | {pri} |\n"
|
|
md += "\n"
|
|
|
|
overdue_count = len(overdue)
|
|
upcoming_count = len(upcoming)
|
|
parts = []
|
|
if overdue_count:
|
|
parts.append(f"{overdue_count} overdue")
|
|
if upcoming_count:
|
|
parts.append(f"{upcoming_count} upcoming")
|
|
summary = ", ".join(parts) if parts else "No reminders"
|
|
|
|
return md, summary
|
|
|
|
|
|
def run(reminders_config=None, mode=MODE_DUE_3DAYS):
|
|
"""Run the reminders agent.
|
|
|
|
Args:
|
|
reminders_config: list of CalDAV source dicts [{name, url, username, password}]
|
|
mode: display mode (due_today_overdue, due_today_3days, all_incomplete)
|
|
"""
|
|
sources = reminders_config or []
|
|
|
|
if not sources:
|
|
return "## Reminders\n\n*No reminder sources configured. Add CalDAV credentials in Settings.*\n", "Not configured"
|
|
|
|
all_todos = []
|
|
for source in sources:
|
|
print(f" Fetching reminders: {source.get('name', '?')}...")
|
|
todos = fetch_reminders(source)
|
|
print(f" Got {len(todos)} incomplete reminders from {source.get('name', '?')}")
|
|
all_todos.extend(todos)
|
|
|
|
filtered = filter_reminders(all_todos, mode)
|
|
section, summary = format_section(filtered, mode)
|
|
|
|
log_run(AGENT_ID, "success", output=summary, metadata={
|
|
"sources": len(sources),
|
|
"total_incomplete": len(all_todos),
|
|
"filtered": len(filtered),
|
|
"mode": mode,
|
|
})
|
|
|
|
return section, summary
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--url", required=True, help="CalDAV URL")
|
|
parser.add_argument("--user", required=True, help="Username")
|
|
parser.add_argument("--password", required=True, help="Password")
|
|
parser.add_argument("--name", default="Test", help="Source name")
|
|
parser.add_argument("--mode", default=MODE_DUE_3DAYS, choices=[MODE_DUE_TODAY, MODE_DUE_3DAYS, MODE_ALL_INCOMPLETE])
|
|
args = parser.parse_args()
|
|
|
|
section, summary = run(
|
|
[{"name": args.name, "url": args.url, "username": args.user, "password": args.password}],
|
|
mode=args.mode,
|
|
)
|
|
print(section)
|
|
print(f"\nSummary: {summary}")
|