The Pirate Phase 1.b extension: Plex tools

Adds 4 read-only Plex tools using a new plex_get helper that sends
X-Plex-Token as a query param and Accept: application/json:
- plex_library_sections: per-section item counts
- plex_on_deck: in-progress items across all libraries
- plex_watch_history: recent watches with user-name resolution via /accounts
- plex_recently_added: latest additions across sections

Token sourced from Plex registry on Darrow (HKU per-user hive). Stored
in service_configs.plex. Plex paginated counts require BOTH
X-Plex-Container-Start and X-Plex-Container-Size in query string to
return totalSize — dropping Start makes Plex return the full library.

Pirate catalog: 20 → 24 tools.
This commit is contained in:
Eric Jungbauer
2026-04-20 23:02:44 +00:00
parent 20a8987dd9
commit 607168815b
3 changed files with 170 additions and 2 deletions
+2 -2
View File
@@ -7,13 +7,13 @@ The runtime loads all modules here and builds a combined catalog. The LLM picks
which tool to call; the runtime executes it and returns the result as a tool message.
"""
from . import sonarr, radarr, lidarr, whisparr, qbittorrent, storage, overseerr
from . import sonarr, radarr, lidarr, whisparr, qbittorrent, storage, overseerr, plex
def build_catalog():
"""Return a list of all available tools across all modules."""
catalog = []
for mod in (sonarr, radarr, lidarr, whisparr, qbittorrent, storage, overseerr):
for mod in (sonarr, radarr, lidarr, whisparr, qbittorrent, storage, overseerr, plex):
catalog.extend(getattr(mod, "TOOLS", []))
return catalog
+16
View File
@@ -33,6 +33,22 @@ def arr_get(service_name, path, params=None):
return json.loads(resp.read().decode())
def plex_get(path, params=None):
"""GET from the Plex API. Auth is via X-Plex-Token query param; returns JSON."""
cfg = get_service_config("plex")
base = cfg["base_url"].rstrip("/")
key = cfg["api_key"]
if not base or not key:
raise RuntimeError("plex not configured (base_url + api_key required)")
merged = {"X-Plex-Token": key}
if params:
merged.update(params)
url = f"{base}{path}?" + urlparse.urlencode(merged, quote_via=urlparse.quote)
req = urlreq.Request(url, headers={"Accept": "application/json"})
with urlreq.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
def qbit_request(path, method="GET", params=None, data=None, cookies=None):
"""qBittorrent Web API helper. Login (if password set) returns SID cookie string."""
cfg = get_service_config("qbittorrent")
+152
View File
@@ -0,0 +1,152 @@
"""Plex tools — watch history, on deck, recently added, library stats (read-only)."""
from ._common import plex_get
def _resolve_accounts():
"""Build an accountID -> display-name map. Plex's history endpoint uses numeric IDs."""
try:
data = plex_get("/accounts")
return {
a.get("id"): (a.get("name") or "").strip() or f"account_{a.get('id')}"
for a in data.get("MediaContainer", {}).get("Account", [])
}
except Exception:
return {}
def _title(m):
"""Human-readable title for a metadata item (handles episodes, movies, tracks)."""
grand = m.get("grandparentTitle") or ""
parent = m.get("parentTitle") or ""
title = m.get("title") or ""
if grand and m.get("type") == "episode":
season = m.get("parentIndex")
ep = m.get("index")
if season is not None and ep is not None:
return f"{grand} S{season:02d}E{ep:02d}{title}"
return f"{grand}{title}"
if grand and m.get("type") == "track":
return f"{grand}{title}"
return title
def plex_library_sections():
"""List Plex library sections with per-section item counts."""
data = plex_get("/library/sections")
sections = []
for s in data.get("MediaContainer", {}).get("Directory", []):
sid = s.get("key")
try:
count_data = plex_get(f"/library/sections/{sid}/all", {"X-Plex-Container-Start": 0, "X-Plex-Container-Size": 1})
total = count_data.get("MediaContainer", {}).get("totalSize", 0)
except Exception:
total = None
sections.append({
"id": sid,
"title": s.get("title"),
"type": s.get("type"),
"item_count": total,
})
return {"sections": sections}
def plex_on_deck(limit=10):
"""What's ready to resume — partially watched items across all libraries."""
data = plex_get("/library/onDeck", {"X-Plex-Container-Size": limit})
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
duration_ms = m.get("duration") or 0
offset_ms = m.get("viewOffset") or 0
pct = round(100 * offset_ms / duration_ms, 1) if duration_ms else None
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"progress_pct": pct,
"duration_min": round(duration_ms / 60000, 1) if duration_ms else None,
"last_viewed_epoch": m.get("lastViewedAt"),
})
return {"count": len(items), "items": items}
def plex_watch_history(limit=20, user=None):
"""Recent Plex watch history. Optionally filter by user display name (case-insensitive)."""
accounts = _resolve_accounts()
params = {"X-Plex-Container-Size": limit, "sort": "viewedAt:desc"}
if user:
# Find matching account id(s)
wanted = user.strip().lower()
matching_ids = [aid for aid, name in accounts.items() if name.lower() == wanted]
if matching_ids:
params["accountID"] = matching_ids[0]
data = plex_get("/status/sessions/history/all", params)
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"watched_epoch": m.get("viewedAt"),
"user": accounts.get(m.get("accountID"), f"account_{m.get('accountID')}"),
})
return {"count": len(items), "filter_user": user, "items": items}
def plex_recently_added(limit=15):
"""What was added to the Plex library most recently."""
data = plex_get("/library/recentlyAdded", {"X-Plex-Container-Size": limit})
items = []
for m in data.get("MediaContainer", {}).get("Metadata", [])[:limit]:
items.append({
"type": m.get("type"),
"title": _title(m),
"library": m.get("librarySectionTitle"),
"added_epoch": m.get("addedAt"),
"year": m.get("year"),
})
return {"count": len(items), "items": items}
TOOLS = [
{
"name": "plex_library_sections",
"description": "List Plex library sections (Movies, TV Shows, Music, etc.) with item counts per section. Use when the user asks 'how big is the Plex library', 'what libraries do we have', or 'how many movies are on Plex'.",
"input_schema": {"type": "object", "properties": {}},
"read_only": True,
"fn": plex_library_sections,
},
{
"name": "plex_on_deck",
"description": "List what the user (and household) is in the middle of — partially watched episodes and movies across all libraries. Use when the user asks 'what was I watching', 'what's on deck', 'where did I leave off'.",
"input_schema": {
"type": "object",
"properties": {"limit": {"type": "integer", "default": 10}},
},
"read_only": True,
"fn": plex_on_deck,
},
{
"name": "plex_watch_history",
"description": "Recent Plex watch history with user attribution. Optionally filter by user display name (Eric, Angela, Jaclyn, etc.). Use when the user asks 'what did we watch last night', 'what has Angela been watching', 'show me recent views'.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 20},
"user": {"type": "string", "description": "Optional Plex user display name to filter by"},
},
},
"read_only": True,
"fn": plex_watch_history,
},
{
"name": "plex_recently_added",
"description": "Latest items added to the Plex library across all sections. Use when the user asks 'what's new on Plex', 'what got added this week', 'what arrived recently'.",
"input_schema": {
"type": "object",
"properties": {"limit": {"type": "integer", "default": 15}},
},
"read_only": True,
"fn": plex_recently_added,
},
]