Files
AutoMediaRemover.py/amr.py
2025-08-27 18:51:19 -07:00

234 lines
9.9 KiB
Python

"""Auto Media Remover (listing mode)
Script to query a Tautulli instance and list Movies and TV Shows whose last watched
( last_played ) date is older than a specified number of days (default 365 days).
It paginates through libraries, collecting movie and show items.
Usage examples (PowerShell):
python amr.py --url http://localhost:8181 --api-key YOUR_KEY
python amr.py --url http://tautulli.local:8181 --api-key YOUR_KEY --days 400 --output old_media.csv
python amr.py --url http://localhost:8181 --api-key YOUR_KEY --include-never-watched
python amr.py --url http://localhost:8181 --api-key YOUR_KEY --sort size
Environment variables (fallback if CLI args not provided):
TAUTULLI_URL, TAUTULLI_API_KEY
Note: This only lists candidates. It does NOT delete anything.
References: https://github.com/Tautulli/Tautulli/wiki/Tautulli-API-Reference
"""
from __future__ import annotations
import argparse
import csv
import datetime as dt
import os
import sys
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
try:
import requests # type: ignore
except ImportError: # pragma: no cover
print("The 'requests' library is required. Install with: pip install requests", file=sys.stderr)
sys.exit(1)
# ----------------------------- Data Models ----------------------------------
@dataclass
class MediaItem:
rating_key: str
title: str
media_type: str # movie or show
library_name: str
last_played: Optional[dt.datetime] # UTC naive
added_at: Optional[dt.datetime]
year: Optional[int]
size_bytes: Optional[int] = None # added: file size in bytes
@property
def days_since_last_played(self) -> Optional[int]:
if not self.last_played:
return None
return (dt.datetime.utcnow() - self.last_played).days
@property
def size_gb(self) -> Optional[float]:
if self.size_bytes is None:
return None
return self.size_bytes / (1024 ** 3)
def to_row(self) -> List[str]:
return [
self.media_type,
self.library_name,
self.title,
str(self.year or ''),
self.last_played.isoformat() if self.last_played else '',
str(self.days_since_last_played) if self.days_since_last_played is not None else '',
self.added_at.isoformat() if self.added_at else '',
self.rating_key,
str(self.size_bytes or ''),
]
# ----------------------------- API Client -----------------------------------
class TautulliClient:
def __init__(self, base_url: str, api_key: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
def _get(self, **params) -> Dict[str, Any]:
params = {"apikey": self.api_key, **params}
try:
r = requests.get(f"{self.base_url}/api/v2", params=params, timeout=self.timeout)
r.raise_for_status()
data = r.json()
except requests.RequestException as e:
raise RuntimeError(f"HTTP error calling Tautulli: {e}") from e
except ValueError as e:
raise RuntimeError("Invalid JSON response from Tautulli") from e
if data.get('response', {}).get('result') != 'success':
raise RuntimeError(f"Tautulli API error: {data.get('response', {}).get('message')}")
return data['response']['data']
def get_libraries(self) -> List[Dict[str, Any]]:
data = self._get(cmd='get_libraries')
return data # list of library dicts
def iter_library_items(self, section_id: int, library_name: str, section_type: str) -> Iterable[MediaItem]:
# Uses get_library_media_info with pagination
length = 100
start = 0
total = None
while True:
data = self._get(cmd='get_library_media_info', section_id=section_id, start=start, length=length)
# DataTables style
items = data.get('data') or data # sometimes direct list
if total is None:
total = data.get('recordsTotal') or data.get('total_count') or len(items)
for it in items:
# Fields vary; attempt to normalize
last_played_ts = it.get('last_played') or it.get('last_watched')
added_at_ts = it.get('added_at')
size_val = it.get('file_size') or it.get('size') or it.get('media_size')
try:
size_bytes = int(size_val) if size_val is not None else None
except Exception:
size_bytes = None
def conv(ts):
if not ts:
return None
try:
return dt.datetime.utcfromtimestamp(int(ts))
except Exception:
return None
last_played = conv(last_played_ts)
added_at = conv(added_at_ts)
yield MediaItem(
rating_key=str(it.get('rating_key')),
title=it.get('title') or it.get('name') or 'Unknown',
media_type=section_type,
library_name=library_name,
last_played=last_played,
added_at=added_at,
year=it.get('year'),
size_bytes=size_bytes,
)
start += length
if total is not None and start >= total:
break
# ----------------------------- Core Logic ------------------------------------
def find_old_media(client: TautulliClient, days: int, include_never_watched: bool) -> List[MediaItem]:
cutoff = dt.datetime.utcnow() - dt.timedelta(days=days)
results: List[MediaItem] = []
libraries = client.get_libraries()
for lib in libraries:
section_type = lib.get('section_type') # movie, show, artist, photo, etc.
if section_type not in ('movie', 'show'):
continue
name = lib.get('section_name') or lib.get('name') or f"Library {lib.get('section_id')}"
section_id = lib.get('section_id') or lib.get('id')
if section_id is None:
continue
try:
for item in client.iter_library_items(section_id=int(section_id), library_name=name, section_type=section_type):
if item.last_played is None:
if include_never_watched:
results.append(item)
continue
if item.last_played < cutoff:
results.append(item)
except Exception as e:
print(f"Warning: failed to process library '{name}': {e}", file=sys.stderr)
return results
# ----------------------------- CLI ------------------------------------------
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
p = argparse.ArgumentParser(description="List Tautulli media last watched over N days ago.")
p.add_argument('--url', dest='url', default=os.environ.get('TAUTULLI_URL'), help='Base URL to Tautulli (e.g. http://localhost:8181)')
p.add_argument('--api-key', dest='api_key', default=os.environ.get('TAUTULLI_API_KEY'), help='Tautulli API key')
p.add_argument('--days', type=int, default=365, help='Age in days since last watched (default: 365)')
p.add_argument('--include-never-watched', action='store_true', help='Include items never watched (no last_played)')
p.add_argument('--output', help='Optional CSV output file path')
p.add_argument('--sort', choices=['title', 'days', 'last_played', 'size'], default='days', help='Sort output list')
return p.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
args = parse_args(argv)
if not args.url or not args.api_key:
print('Error: --url and --api-key (or environment variables) are required.', file=sys.stderr)
return 2
client = TautulliClient(args.url, args.api_key)
try:
old_media = find_old_media(client, days=args.days, include_never_watched=args.include_never_watched)
except Exception as e:
print(f"Failed to query Tautulli: {e}", file=sys.stderr)
return 1
if args.sort == 'title':
old_media.sort(key=lambda m: (m.title.lower(), m.media_type))
elif args.sort == 'last_played':
old_media.sort(key=lambda m: (m.last_played or dt.datetime.min))
elif args.sort == 'size':
old_media.sort(key=lambda m: (m.size_bytes or -1), reverse=True)
else: # days
old_media.sort(key=lambda m: (m.days_since_last_played if m.days_since_last_played is not None else -1), reverse=True)
# Console output
print(f"Found {len(old_media)} media items last watched over {args.days} days ago" + (" (including never watched)" if args.include_never_watched else ''))
print('-' * 135)
print(f"{'Type':5} {'Library':20} {'Title':40} {'Year':4} {'Last Played (UTC)':20} {'Days':5} {'Size(GB)':8}")
print('-' * 135)
for m in old_media:
lp = m.last_played.isoformat(sep=' ')[:19] if m.last_played else 'Never'
days = m.days_since_last_played if m.days_since_last_played is not None else ''
size_gb = f"{m.size_gb:.2f}" if m.size_gb is not None else ''
print(f"{m.media_type[:5]:5} {m.library_name[:20]:20} {m.title[:40]:40} {str(m.year or ''):4} {lp:20} {str(days):5} {size_gb:8}")
# CSV output
if args.output:
try:
with open(args.output, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['media_type', 'library', 'title', 'year', 'last_played_utc', 'days_since_last_played', 'added_at_utc', 'rating_key', 'size_bytes'])
for m in old_media:
writer.writerow(m.to_row())
print(f"\nCSV written to {args.output}")
except OSError as e:
print(f"Failed to write CSV: {e}", file=sys.stderr)
return 1
return 0
if __name__ == '__main__': # pragma: no cover
raise SystemExit(main())