From d77458d58bbbe77d14605b3dd838c0efc5ce88ad Mon Sep 17 00:00:00 2001 From: Brandon4466 Date: Wed, 27 Aug 2025 18:51:19 -0700 Subject: [PATCH] inital commit --- amr.py | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 amr.py diff --git a/amr.py b/amr.py new file mode 100644 index 0000000..7e35d28 --- /dev/null +++ b/amr.py @@ -0,0 +1,233 @@ +"""Auto Media Remover (listing mode) + +Script to query a Tautulli instance and list Movies and TV Shows whose last watched +( last_played ) date is older than a specified number of days (default 365 days). + +It paginates through libraries, collecting movie and show items. + +Usage examples (PowerShell): + python amr.py --url http://localhost:8181 --api-key YOUR_KEY + python amr.py --url http://tautulli.local:8181 --api-key YOUR_KEY --days 400 --output old_media.csv + python amr.py --url http://localhost:8181 --api-key YOUR_KEY --include-never-watched + python amr.py --url http://localhost:8181 --api-key YOUR_KEY --sort size + +Environment variables (fallback if CLI args not provided): + TAUTULLI_URL, TAUTULLI_API_KEY + +Note: This only lists candidates. It does NOT delete anything. + +References: https://github.com/Tautulli/Tautulli/wiki/Tautulli-API-Reference +""" +from __future__ import annotations + +import argparse +import csv +import datetime as dt +import os +import sys +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional + +try: + import requests # type: ignore +except ImportError: # pragma: no cover + print("The 'requests' library is required. Install with: pip install requests", file=sys.stderr) + sys.exit(1) + +# ----------------------------- Data Models ---------------------------------- + +@dataclass +class MediaItem: + rating_key: str + title: str + media_type: str # movie or show + library_name: str + last_played: Optional[dt.datetime] # UTC naive + added_at: Optional[dt.datetime] + year: Optional[int] + size_bytes: Optional[int] = None # added: file size in bytes + + @property + def days_since_last_played(self) -> Optional[int]: + if not self.last_played: + return None + return (dt.datetime.utcnow() - self.last_played).days + + @property + def size_gb(self) -> Optional[float]: + if self.size_bytes is None: + return None + return self.size_bytes / (1024 ** 3) + + def to_row(self) -> List[str]: + return [ + self.media_type, + self.library_name, + self.title, + str(self.year or ''), + self.last_played.isoformat() if self.last_played else '', + str(self.days_since_last_played) if self.days_since_last_played is not None else '', + self.added_at.isoformat() if self.added_at else '', + self.rating_key, + str(self.size_bytes or ''), + ] + +# ----------------------------- API Client ----------------------------------- + +class TautulliClient: + def __init__(self, base_url: str, api_key: str, timeout: int = 30): + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.timeout = timeout + + def _get(self, **params) -> Dict[str, Any]: + params = {"apikey": self.api_key, **params} + try: + r = requests.get(f"{self.base_url}/api/v2", params=params, timeout=self.timeout) + r.raise_for_status() + data = r.json() + except requests.RequestException as e: + raise RuntimeError(f"HTTP error calling Tautulli: {e}") from e + except ValueError as e: + raise RuntimeError("Invalid JSON response from Tautulli") from e + if data.get('response', {}).get('result') != 'success': + raise RuntimeError(f"Tautulli API error: {data.get('response', {}).get('message')}") + return data['response']['data'] + + def get_libraries(self) -> List[Dict[str, Any]]: + data = self._get(cmd='get_libraries') + return data # list of library dicts + + def iter_library_items(self, section_id: int, library_name: str, section_type: str) -> Iterable[MediaItem]: + # Uses get_library_media_info with pagination + length = 100 + start = 0 + total = None + while True: + data = self._get(cmd='get_library_media_info', section_id=section_id, start=start, length=length) + # DataTables style + items = data.get('data') or data # sometimes direct list + if total is None: + total = data.get('recordsTotal') or data.get('total_count') or len(items) + for it in items: + # Fields vary; attempt to normalize + last_played_ts = it.get('last_played') or it.get('last_watched') + added_at_ts = it.get('added_at') + size_val = it.get('file_size') or it.get('size') or it.get('media_size') + try: + size_bytes = int(size_val) if size_val is not None else None + except Exception: + size_bytes = None + def conv(ts): + if not ts: + return None + try: + return dt.datetime.utcfromtimestamp(int(ts)) + except Exception: + return None + last_played = conv(last_played_ts) + added_at = conv(added_at_ts) + yield MediaItem( + rating_key=str(it.get('rating_key')), + title=it.get('title') or it.get('name') or 'Unknown', + media_type=section_type, + library_name=library_name, + last_played=last_played, + added_at=added_at, + year=it.get('year'), + size_bytes=size_bytes, + ) + start += length + if total is not None and start >= total: + break + +# ----------------------------- Core Logic ------------------------------------ + +def find_old_media(client: TautulliClient, days: int, include_never_watched: bool) -> List[MediaItem]: + cutoff = dt.datetime.utcnow() - dt.timedelta(days=days) + results: List[MediaItem] = [] + libraries = client.get_libraries() + for lib in libraries: + section_type = lib.get('section_type') # movie, show, artist, photo, etc. + if section_type not in ('movie', 'show'): + continue + name = lib.get('section_name') or lib.get('name') or f"Library {lib.get('section_id')}" + section_id = lib.get('section_id') or lib.get('id') + if section_id is None: + continue + try: + for item in client.iter_library_items(section_id=int(section_id), library_name=name, section_type=section_type): + if item.last_played is None: + if include_never_watched: + results.append(item) + continue + if item.last_played < cutoff: + results.append(item) + except Exception as e: + print(f"Warning: failed to process library '{name}': {e}", file=sys.stderr) + return results + +# ----------------------------- CLI ------------------------------------------ + +def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: + p = argparse.ArgumentParser(description="List Tautulli media last watched over N days ago.") + p.add_argument('--url', dest='url', default=os.environ.get('TAUTULLI_URL'), help='Base URL to Tautulli (e.g. http://localhost:8181)') + p.add_argument('--api-key', dest='api_key', default=os.environ.get('TAUTULLI_API_KEY'), help='Tautulli API key') + p.add_argument('--days', type=int, default=365, help='Age in days since last watched (default: 365)') + p.add_argument('--include-never-watched', action='store_true', help='Include items never watched (no last_played)') + p.add_argument('--output', help='Optional CSV output file path') + p.add_argument('--sort', choices=['title', 'days', 'last_played', 'size'], default='days', help='Sort output list') + return p.parse_args(argv) + + +def main(argv: Optional[List[str]] = None) -> int: + args = parse_args(argv) + if not args.url or not args.api_key: + print('Error: --url and --api-key (or environment variables) are required.', file=sys.stderr) + return 2 + + client = TautulliClient(args.url, args.api_key) + try: + old_media = find_old_media(client, days=args.days, include_never_watched=args.include_never_watched) + except Exception as e: + print(f"Failed to query Tautulli: {e}", file=sys.stderr) + return 1 + + if args.sort == 'title': + old_media.sort(key=lambda m: (m.title.lower(), m.media_type)) + elif args.sort == 'last_played': + old_media.sort(key=lambda m: (m.last_played or dt.datetime.min)) + elif args.sort == 'size': + old_media.sort(key=lambda m: (m.size_bytes or -1), reverse=True) + else: # days + old_media.sort(key=lambda m: (m.days_since_last_played if m.days_since_last_played is not None else -1), reverse=True) + + # Console output + print(f"Found {len(old_media)} media items last watched over {args.days} days ago" + (" (including never watched)" if args.include_never_watched else '')) + print('-' * 135) + print(f"{'Type':5} {'Library':20} {'Title':40} {'Year':4} {'Last Played (UTC)':20} {'Days':5} {'Size(GB)':8}") + print('-' * 135) + for m in old_media: + lp = m.last_played.isoformat(sep=' ')[:19] if m.last_played else 'Never' + days = m.days_since_last_played if m.days_since_last_played is not None else '' + size_gb = f"{m.size_gb:.2f}" if m.size_gb is not None else '' + print(f"{m.media_type[:5]:5} {m.library_name[:20]:20} {m.title[:40]:40} {str(m.year or ''):4} {lp:20} {str(days):5} {size_gb:8}") + + # CSV output + if args.output: + try: + with open(args.output, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + writer.writerow(['media_type', 'library', 'title', 'year', 'last_played_utc', 'days_since_last_played', 'added_at_utc', 'rating_key', 'size_bytes']) + for m in old_media: + writer.writerow(m.to_row()) + print(f"\nCSV written to {args.output}") + except OSError as e: + print(f"Failed to write CSV: {e}", file=sys.stderr) + return 1 + + return 0 + + +if __name__ == '__main__': # pragma: no cover + raise SystemExit(main())