607168815b
Adds 4 read-only Plex tools using a new plex_get helper that sends X-Plex-Token as a query param and Accept: application/json: - plex_library_sections: per-section item counts - plex_on_deck: in-progress items across all libraries - plex_watch_history: recent watches with user-name resolution via /accounts - plex_recently_added: latest additions across sections Token sourced from Plex registry on Darrow (HKU per-user hive). Stored in service_configs.plex. Plex paginated counts require BOTH X-Plex-Container-Start and X-Plex-Container-Size in query string to return totalSize — dropping Start makes Plex return the full library. Pirate catalog: 20 → 24 tools.
89 lines
3.5 KiB
Python
89 lines
3.5 KiB
Python
"""Shared helpers for pirate tools."""
|
|
|
|
import sys
|
|
import os
|
|
from urllib import request as urlreq, error as urlerror, parse as urlparse
|
|
import json
|
|
|
|
# Add parent agents/ to path so we can import shared.py
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
from shared import DASHBOARD_API, api_request
|
|
|
|
|
|
def get_service_config(service_name):
|
|
"""Fetch admin-configured creds for a service (sonarr, radarr, qbittorrent, etc.)."""
|
|
try:
|
|
return api_request(f"{DASHBOARD_API}/api/internal/services/{service_name}", retries=1)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Service '{service_name}' is not configured in admin panel: {e}")
|
|
|
|
|
|
def arr_get(service_name, path, params=None):
|
|
"""GET from an *arr API (Sonarr/Radarr/Lidarr/Whisparr — all use the same v3 API shape)."""
|
|
cfg = get_service_config(service_name)
|
|
base = cfg["base_url"].rstrip("/")
|
|
key = cfg["api_key"]
|
|
if not base or not key:
|
|
raise RuntimeError(f"{service_name} not configured (base_url + api_key required)")
|
|
url = f"{base}{path}"
|
|
if params:
|
|
url += "?" + urlparse.urlencode(params, quote_via=urlparse.quote)
|
|
req = urlreq.Request(url, headers={"X-Api-Key": key})
|
|
with urlreq.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def plex_get(path, params=None):
|
|
"""GET from the Plex API. Auth is via X-Plex-Token query param; returns JSON."""
|
|
cfg = get_service_config("plex")
|
|
base = cfg["base_url"].rstrip("/")
|
|
key = cfg["api_key"]
|
|
if not base or not key:
|
|
raise RuntimeError("plex not configured (base_url + api_key required)")
|
|
merged = {"X-Plex-Token": key}
|
|
if params:
|
|
merged.update(params)
|
|
url = f"{base}{path}?" + urlparse.urlencode(merged, quote_via=urlparse.quote)
|
|
req = urlreq.Request(url, headers={"Accept": "application/json"})
|
|
with urlreq.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def qbit_request(path, method="GET", params=None, data=None, cookies=None):
|
|
"""qBittorrent Web API helper. Login (if password set) returns SID cookie string."""
|
|
cfg = get_service_config("qbittorrent")
|
|
base = cfg["base_url"].rstrip("/")
|
|
if not base:
|
|
raise RuntimeError("qbittorrent not configured")
|
|
url = f"{base}{path}"
|
|
if params:
|
|
url += "?" + urlparse.urlencode(params, quote_via=urlparse.quote)
|
|
body = None
|
|
headers = {}
|
|
if cookies:
|
|
headers["Cookie"] = cookies
|
|
if data is not None:
|
|
body = urlparse.urlencode(data).encode()
|
|
headers["Content-Type"] = "application/x-www-form-urlencoded"
|
|
req = urlreq.Request(url, data=body, headers=headers, method=method)
|
|
with urlreq.urlopen(req, timeout=15) as resp:
|
|
raw = resp.read().decode()
|
|
try:
|
|
return json.loads(raw), resp.headers
|
|
except json.JSONDecodeError:
|
|
return raw, resp.headers
|
|
|
|
|
|
def qbit_login_if_needed():
|
|
"""If qBit has auth enabled, login and return the SID cookie string. Else return None."""
|
|
cfg = get_service_config("qbittorrent")
|
|
user = cfg.get("username", "")
|
|
pw = cfg.get("password", "")
|
|
if not user or not pw:
|
|
return None # LAN no-auth mode
|
|
_, hdrs = qbit_request("/api/v2/auth/login", method="POST",
|
|
data={"username": user, "password": pw})
|
|
cookies = hdrs.get("Set-Cookie", "")
|
|
# Strip everything after the first semicolon (path/expiry)
|
|
return cookies.split(";")[0] if cookies else None
|