Files
Eric Jungbauer 607168815b The Pirate Phase 1.b extension: Plex tools
Adds 4 read-only Plex tools using a new plex_get helper that sends
X-Plex-Token as a query param and Accept: application/json:
- plex_library_sections: per-section item counts
- plex_on_deck: in-progress items across all libraries
- plex_watch_history: recent watches with user-name resolution via /accounts
- plex_recently_added: latest additions across sections

Token sourced from Plex registry on Darrow (HKU per-user hive). Stored
in service_configs.plex. Plex paginated counts require BOTH
X-Plex-Container-Start and X-Plex-Container-Size in query string to
return totalSize — dropping Start makes Plex return the full library.

Pirate catalog: 20 → 24 tools.
2026-04-20 23:02:44 +00:00

153 lines
6.0 KiB
Python

"""Plex tools — watch history, on deck, recently added, library stats (read-only)."""
from ._common import plex_get
def _resolve_accounts():
"""Build an accountID -> display-name map. Plex's history endpoint uses numeric IDs."""
try:
data = plex_get("/accounts")
return {
a.get("id"): (a.get("name") or "").strip() or f"account_{a.get('id')}"
for a in data.get("MediaContainer", {}).get("Account", [])
}
except Exception:
return {}
def _title(m):
"""Human-readable title for a metadata item (handles episodes, movies, tracks)."""
grand = m.get("grandparentTitle") or ""
parent = m.get("parentTitle") or ""
title = m.get("title") or ""
if grand and m.get("type") == "episode":
season = m.get("parentIndex")
ep = m.get("index")
if season is not None and ep is not None:
return f"{grand} S{season:02d}E{ep:02d}{title}"
return f"{grand}{title}"
if grand and m.get("type") == "track":
return f"{grand}{title}"
return title
def plex_library_sections():
"""List Plex library sections with per-section item counts."""
data = plex_get("/library/sections")
sections = []
for s in data.get("MediaContainer", {}).get("Directory", []):
sid = s.get("key")
try:
count_data = plex_get(f"/library/sections/{sid}/all", {"X-Plex-Container-Start": 0, "X-Plex-Container-Size": 1})
total = count_data.get("MediaContainer", {}).get("totalSize", 0)
except Exception:
total = None
sections.append({
"id": sid,
"title": s.get("title"),
"type": s.get("type"),
"item_count": total,
})
return {"sections": sections}
def plex_on_deck(limit=10):
"""What's ready to resume — partially watched items across all libraries."""
data = plex_get("/library/onDeck", {"X-Plex-Container-Size": limit})
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
duration_ms = m.get("duration") or 0
offset_ms = m.get("viewOffset") or 0
pct = round(100 * offset_ms / duration_ms, 1) if duration_ms else None
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"progress_pct": pct,
"duration_min": round(duration_ms / 60000, 1) if duration_ms else None,
"last_viewed_epoch": m.get("lastViewedAt"),
})
return {"count": len(items), "items": items}
def plex_watch_history(limit=20, user=None):
"""Recent Plex watch history. Optionally filter by user display name (case-insensitive)."""
accounts = _resolve_accounts()
params = {"X-Plex-Container-Size": limit, "sort": "viewedAt:desc"}
if user:
# Find matching account id(s)
wanted = user.strip().lower()
matching_ids = [aid for aid, name in accounts.items() if name.lower() == wanted]
if matching_ids:
params["accountID"] = matching_ids[0]
data = plex_get("/status/sessions/history/all", params)
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"watched_epoch": m.get("viewedAt"),
"user": accounts.get(m.get("accountID"), f"account_{m.get('accountID')}"),
})
return {"count": len(items), "filter_user": user, "items": items}
def plex_recently_added(limit=15):
"""What was added to the Plex library most recently."""
data = plex_get("/library/recentlyAdded", {"X-Plex-Container-Size": limit})
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"added_epoch": m.get("addedAt"),
"year": m.get("year"),
})
return {"count": len(items), "items": items}
TOOLS = [
{
"name": "plex_library_sections",
"description": "List Plex library sections (Movies, TV Shows, Music, etc.) with item counts per section. Use when the user asks 'how big is the Plex library', 'what libraries do we have', or 'how many movies are on Plex'.",
"input_schema": {"type": "object", "properties": {}},
"read_only": True,
"fn": plex_library_sections,
},
{
"name": "plex_on_deck",
"description": "List what the user (and household) is in the middle of — partially watched episodes and movies across all libraries. Use when the user asks 'what was I watching', 'what's on deck', 'where did I leave off'.",
"input_schema": {
"type": "object",
"properties": {"limit": {"type": "integer", "default": 10}},
},
"read_only": True,
"fn": plex_on_deck,
},
{
"name": "plex_watch_history",
"description": "Recent Plex watch history with user attribution. Optionally filter by user display name (Eric, Angela, Jaclyn, etc.). Use when the user asks 'what did we watch last night', 'what has Angela been watching', 'show me recent views'.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 20},
"user": {"type": "string", "description": "Optional Plex user display name to filter by"},
},
},
"read_only": True,
"fn": plex_watch_history,
},
{
"name": "plex_recently_added",
"description": "Latest items added to the Plex library across all sections. Use when the user asks 'what's new on Plex', 'what got added this week', 'what arrived recently'.",
"input_schema": {
"type": "object",
"properties": {"limit": {"type": "integer", "default": 15}},
},
"read_only": True,
"fn": plex_recently_added,
},
]