"""Shared helpers for pirate tools.""" import sys import os from urllib import request as urlreq, error as urlerror, parse as urlparse import json # Add parent agents/ to path so we can import shared.py sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from shared import DASHBOARD_API, api_request def get_service_config(service_name): """Fetch admin-configured creds for a service (sonarr, radarr, qbittorrent, etc.).""" try: return api_request(f"{DASHBOARD_API}/api/internal/services/{service_name}", retries=1) except Exception as e: raise RuntimeError(f"Service '{service_name}' is not configured in admin panel: {e}") def arr_get(service_name, path, params=None): """GET from an *arr API (Sonarr/Radarr/Lidarr/Whisparr — all use the same v3 API shape).""" cfg = get_service_config(service_name) base = cfg["base_url"].rstrip("/") key = cfg["api_key"] if not base or not key: raise RuntimeError(f"{service_name} not configured (base_url + api_key required)") url = f"{base}{path}" if params: url += "?" + urlparse.urlencode(params, quote_via=urlparse.quote) req = urlreq.Request(url, headers={"X-Api-Key": key}) with urlreq.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode()) def plex_get(path, params=None): """GET from the Plex API. Auth is via X-Plex-Token query param; returns JSON.""" cfg = get_service_config("plex") base = cfg["base_url"].rstrip("/") key = cfg["api_key"] if not base or not key: raise RuntimeError("plex not configured (base_url + api_key required)") merged = {"X-Plex-Token": key} if params: merged.update(params) url = f"{base}{path}?" + urlparse.urlencode(merged, quote_via=urlparse.quote) req = urlreq.Request(url, headers={"Accept": "application/json"}) with urlreq.urlopen(req, timeout=15) as resp: return json.loads(resp.read().decode()) def qbit_request(path, method="GET", params=None, data=None, cookies=None): """qBittorrent Web API helper. Login (if password set) returns SID cookie string.""" cfg = get_service_config("qbittorrent") base = cfg["base_url"].rstrip("/") if not base: raise RuntimeError("qbittorrent not configured") url = f"{base}{path}" if params: url += "?" + urlparse.urlencode(params, quote_via=urlparse.quote) body = None headers = {} if cookies: headers["Cookie"] = cookies if data is not None: body = urlparse.urlencode(data).encode() headers["Content-Type"] = "application/x-www-form-urlencoded" req = urlreq.Request(url, data=body, headers=headers, method=method) with urlreq.urlopen(req, timeout=15) as resp: raw = resp.read().decode() try: return json.loads(raw), resp.headers except json.JSONDecodeError: return raw, resp.headers def qbit_login_if_needed(): """If qBit has auth enabled, login and return the SID cookie string. Else return None.""" cfg = get_service_config("qbittorrent") user = cfg.get("username", "") pw = cfg.get("password", "") if not user or not pw: return None # LAN no-auth mode _, hdrs = qbit_request("/api/v2/auth/login", method="POST", data={"username": user, "password": pw}) cookies = hdrs.get("Set-Cookie", "") # Strip everything after the first semicolon (path/expiry) return cookies.split(";")[0] if cookies else None