#!/usr/bin/env python3 import os import json import ssl import shutil import subprocess from pathlib import Path from typing import Optional, List import re import inquirer import urllib.parse import urllib.request import urllib.error # ============================================================ # CONFIGURATION GLOBALE # ============================================================ TARGET_GIB_PER_HOUR = 1.7 TARGET_BYTES_PER_SECOND = TARGET_GIB_PER_HOUR * (1024**3) / 3600.0 HB_ENCODER = "nvenc_av1" # AV1 via NVENC HB_PRESET = "slow" HB_AUDIO = "av_aac" HB_CONTAINER = ".mkv" DEFAULT_CRF = 30 MAX_RETRIES = 10 TELEGRAM_ENABLED = True TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") TELEGRAM_PROGRESS_UPDATES = True ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE VIDEO_EXT = { ".mp4", ".mkv", ".mov", ".avi", ".wmv", ".m4v", ".ts", ".m2ts", ".flv", ".webm", ".mpg", ".mpeg", ".vob" } # ============================================================ # FICHIERS & OUTILS GENERAUX # ============================================================ def is_video_file(p: Path) -> bool: return p.is_file() and p.suffix.lower() in VIDEO_EXT def is_original_file(p: Path) -> bool: return is_video_file(p) and not p.stem.endswith("_converted") def build_output(src: Path) -> Path: stem = src.stem if not stem.endswith("_converted"): stem = f"{stem}_converted" return src.with_name(stem + HB_CONTAINER) def list_videos(directory: Path, recursive: bool) -> List[Path]: if recursive: return [p for p in directory.rglob("*") if is_original_file(p)] else: return [p for p in directory.iterdir() if is_original_file(p)] def check_handbrake(): for b in ("HandBrakeCLI", "HandBrakeCLI.exe"): if shutil.which(b): return raise SystemExit("❌ HandBrakeCLI introuvable dans le PATH.") def check_mediainfo() -> bool: if not shutil.which("mediainfo"): print("⚠️ mediainfo introuvable → CRF intelligent désactivé.") return False return True # ============================================================ # TELEGRAM # ============================================================ def telegram_send(msg: str): if not (TELEGRAM_ENABLED and TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID): return data = { "chat_id": TELEGRAM_CHAT_ID, "text": msg, } try: req = urllib.request.Request( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", data=urllib.parse.urlencode(data).encode("utf-8"), ) urllib.request.urlopen(req, context=ssl_ctx, timeout=10) except Exception: pass def telegram_send_with_result(msg: str) -> Optional[int]: if not (TELEGRAM_ENABLED and TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID): return None data = { "chat_id": TELEGRAM_CHAT_ID, "text": msg, } try: req = urllib.request.Request( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", data=urllib.parse.urlencode(data).encode("utf-8"), ) resp = urllib.request.urlopen(req, context=ssl_ctx, timeout=10) payload = json.loads(resp.read().decode("utf-8", errors="ignore")) if not payload.get("ok"): return None return payload["result"]["message_id"] except Exception: return None def telegram_edit(msg_id: Optional[int], text: str): if not msg_id: return if not (TELEGRAM_ENABLED and TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID): return data = { "chat_id": TELEGRAM_CHAT_ID, "message_id": msg_id, "text": text, } try: req = urllib.request.Request( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/editMessageText", data=urllib.parse.urlencode(data).encode("utf-8"), ) urllib.request.urlopen(req, context=ssl_ctx, timeout=10) except urllib.error.HTTPError as e: try: body = e.read().decode("utf-8", errors="ignore") if "message is not modified" in body: return except Exception: pass except Exception: pass def build_progress(done: int, total: int, length: int = 20): if total <= 0: return 0.0, "░" * length ratio = max(0.0, min(1.0, done / total)) fill = int(round(ratio * length)) bar = "█" * fill + "░" * (length - fill) return ratio * 100.0, bar def telegram_start_progress(total_files: int) -> Optional[int]: if not TELEGRAM_PROGRESS_UPDATES: return None msg = ( "Transcodage en cours…\n" f"0.00% (0/{total_files})\n" "[░░░░░░░░░░░░░░░░░░░░]" ) return telegram_send_with_result(msg) def telegram_update_progress( msg_id: Optional[int], done: int, total: int, succ_no_retry: int, succ_with_retry: int, errors: int, current_media: Optional[str] = None, size_info: Optional[str] = None, ): if not TELEGRAM_PROGRESS_UPDATES or not msg_id: return pct, bar = build_progress(done, total) text = ( "Transcodage en cours…\n" f"{pct:.2f}% ({done}/{total})\n" f"[{bar}]" ) if current_media: text += f"\n{current_media}" if size_info: text += f"\n{size_info}" text += ( "\n\n" f"✓ Sans retry : {succ_no_retry}\n" f"✓ Avec retry : {succ_with_retry}\n" f"✗ Erreurs : {errors}" ) telegram_edit(msg_id, text) # ============================================================ # MEDIINFO & HDR # ============================================================ def mediainfo_json(path: Path) -> Optional[dict]: try: p = subprocess.run( ["mediainfo", "--Output=JSON", str(path)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, encoding="utf-8", errors="replace", check=False, ) if p.returncode != 0: return None return json.loads(p.stdout) except Exception: return None def extract_duration_seconds(mi: dict) -> Optional[float]: try: tracks = mi.get("media", {}).get("track", []) general = next((t for t in tracks if t.get("@type", "").lower() == "general"), None) if not general: general = next((t for t in tracks if "Duration" in t), None) if not general: return None dur = general.get("Duration") if not dur: return None s = str(dur).strip() if s.isdigit(): return float(s) / 1000.0 return float(s) except Exception: return None def extract_video_params(mi: dict): try: tracks = mi.get("media", {}).get("track", []) vtracks = [t for t in tracks if t.get("@type", "").lower() == "video"] if not vtracks: vtracks = [t for t in tracks if "Width" in t or "Height" in t] if not vtracks: return None, None, None, None def _w(t): w = t.get("Width") or t.get("width") if not w: return 0 try: return int(str(w).split()[0]) except Exception: return 0 v = max(vtracks, key=_w) def _dim(val): if not val: return None try: return int(str(val).split()[0]) except Exception: return None width = _dim(v.get("Width") or v.get("width")) height = _dim(v.get("Height") or v.get("height")) br = v.get("BitRate") or v.get("Bit rate") bitrate_bits = None if br: s = str(br).replace(" ", "") if s.isdigit(): bitrate_bits = int(s) codec = v.get("Format") or v.get("CodecID") or v.get("Codec ID") return width, height, bitrate_bits, codec except Exception: return None, None, None, None def detect_hdr_type(mi: dict) -> str: try: tracks = mi.get("media", {}).get("track", []) if not tracks: return "SDR" vtracks = [t for t in tracks if t.get("@type", "").lower() == "video"] if not vtracks: vtracks = [t for t in tracks if "Width" in t or "Height" in t] or [tracks[0]] def _w(t): for k, v in t.items(): if "WIDTH" in k.upper(): try: return int(str(v).split()[0]) except Exception: pass return 0 v = max(vtracks, key=_w) primaries_parts = [] transfer_parts = [] hdr_parts = [] for k, val in v.items(): ku = k.upper() s = str(val) if "PRIMARIES" in ku: primaries_parts.append(s) if "TRANSFER" in ku: transfer_parts.append(s) if "HDR" in ku or "MASTERING" in ku or "MAXCLL" in ku or "MAXFALL" in ku: hdr_parts.append(s) primaries = " ".join(primaries_parts).upper() transfer = " ".join(transfer_parts).upper() hdr_fields = " ".join(hdr_parts).upper() print( " [DEBUG] video_track HDR scan: " f"primaries={primaries or '?'} | " f"transfer={transfer or '?'} | " f"hdr_fields={hdr_fields or '?'}" ) combined = f"{primaries} {transfer} {hdr_fields}" if "DOLBY VISION" in combined or "DV PROFILE" in combined: return "DV" if "HDR10+" in combined or "HDR10" in combined or "HDR 10" in combined: return "HDR10" if "HLG" in combined: return "HLG" has_bt2020 = "BT.2020" in primaries has_pq = "PQ" in transfer or "SMPTE ST 2084" in transfer or "ST2084" in transfer if has_bt2020 and has_pq: return "HDR10" if ("MASTERING DISPLAY" in hdr_fields or "MAXCLL" in hdr_fields or "MAXFALL" in hdr_fields) and has_bt2020: return "HDR10" return "SDR" except Exception as e: print(f" [DEBUG] detect_hdr_type exception: {e}") return "SDR" # ============================================================ # DETECTION FILM / SERIE & CRF INTELLIGENT # ============================================================ def is_series_name(name: str) -> bool: if re.search(r"[Ss](\d{1,2})[ ._-]?[Ee](\d{1,2})", name): return True if re.search(r"\b\d{1,2}x\d{1,2}\b", name): return True return False def is_series_file(path: Path, duration_s: Optional[float]) -> bool: if is_series_name(path.stem): return True if duration_s is not None and duration_s < 40 * 60: return True return False def classify_media(path: Path, duration_s: Optional[float]) -> str: name = path.stem if is_series_file(path, duration_s): m = re.search(r"[Ss](\d{1,2})[ ._-]?[Ee](\d{1,2})", name) if m: s = int(m.group(1)) e = int(m.group(2)) return f"Série: {name} (S{s:02d}E{e:02d})" return f"Série: {name}" return f"Film: {name}" def choose_crf(height: Optional[int], is_movie: bool, hdr_type: str) -> int: high_quality = is_movie or hdr_type in ("HDR10", "HLG", "DV") if height is None: return 26 if high_quality else 30 if height >= 2000: return 28 if high_quality else 32 if height >= 1000: return 24 if high_quality else 28 if height >= 700: return 22 if high_quality else 26 return 20 if high_quality else 24 def analyze_file(path: Path): mi = mediainfo_json(path) if not mi: print(" ⚠️ mediainfo indisponible → CRF par défaut.") return ( DEFAULT_CRF, None, "SDR", "Media SDR", None, None, None, None, ) duration_s = extract_duration_seconds(mi) width, height, bitrate_bits, codec = extract_video_params(mi) hdr_type = detect_hdr_type(mi) is_movie = not is_series_file(path, duration_s) kind = "Film" if is_movie else "Série" if hdr_type == "SDR": kind_text = f"{kind} SDR" else: kind_text = f"{kind} {hdr_type}" if duration_s: print( f" [{kind_text}] mediainfo: " f"{width}x{height}, ~{(bitrate_bits/1000) if bitrate_bits else '?'} kb/s, " f"codec={codec}, durée ≈ {duration_s/60:.1f} min" ) else: print( f" [{kind_text}] mediainfo: " f"{width}x{height}, ~{(bitrate_bits/1000) if bitrate_bits else '?'} kb/s, " f"codec={codec}, durée inconnue" ) crf = choose_crf(height, is_movie, hdr_type) print(f" → CRF choisi: {crf}") return crf, duration_s, hdr_type, kind_text, width, height, bitrate_bits, codec # ============================================================ # CONSTRUCTION COMMANDE HandBrake AV1 NVENC # ============================================================ def build_nvenc_av1_command(src: Path, out: Path, crf: int, hdr_type: str, width: Optional[int], height: Optional[int]) -> list: cmd = [ "HandBrakeCLI", "-i", str(src), "-o", str(out), "-e", HB_ENCODER, "-q", str(crf), "--encoder-preset", HB_PRESET, "-E", HB_AUDIO, "--all-audio", "--all-subtitles", "--subtitle-burned=none", "--optimize", ] if hdr_type == "SDR": print(" • Encodage SDR → AV1 SDR") return cmd if hdr_type in ("HDR10", "DV"): print(" • Encodage HDR10/DV → AV1 HDR10 (BT.2020 + PQ)") cmd += [ "--color-matrix", "bt2020nc", "--color-transfer", "smpte2084", "--color-primaries", "bt2020", ] return cmd if hdr_type == "HLG": print(" • Encodage HLG → AV1 HLG") cmd += [ "--color-matrix", "bt2020nc", "--color-transfer", "arib-std-b67", "--color-primaries", "bt2020", ] return cmd print(" ⚠️ HDR inconnu → fallback SDR") return cmd def transcode_once(src: Path, out: Path, crf: int, hdr_type: str, width: Optional[int], height: Optional[int], overwrite: bool) -> tuple[int, Optional[str]]: if out.exists() and not overwrite: print(f"[SKIP] {out} existe déjà.") return 0, None cmd = build_nvenc_av1_command(src, out, crf, hdr_type, width, height) print(f"[RUN] {src.name} -> {out.name} (CRF={crf}, HDR={hdr_type})") try: p = subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", check=False, ) if p.returncode == 0: print(f"[OK ] {out}") return 0, None lines = [l.strip() for l in p.stderr.splitlines() if l.strip()] err = lines[-1] if lines else f"Exit {p.returncode}" print(f"[ERR] {err}") return p.returncode, err except Exception as e: print(f"[ERR] Exception HandBrakeCLI: {e}") return 1, str(e) def transcode_file(src: Path, crf: int, hdr_type: str, width: Optional[int], height: Optional[int], overwrite: bool): out = build_output(src) last_err = None for att in range(MAX_RETRIES + 1): if att > 0: print(f" ↻ Retentative {att}/{MAX_RETRIES}") rc, err = transcode_once(src, out, crf, hdr_type, width, height, overwrite or att > 0) if rc == 0: return True, out, att, None last_err = err return False, out, MAX_RETRIES, last_err # ============================================================ # TAILLE CIBLE (1.7 Gio/h) & RE-ENCODE # ============================================================ def check_target_size(src_size: int, out_size: int, duration_s: Optional[float], is_movie: bool, crf_used: int): if not duration_s or duration_s <= 0: return False, crf_used target_bytes = duration_s * TARGET_BYTES_PER_SECOND threshold = 1.15 if is_movie else 1.05 if out_size > target_bytes * threshold: print(" ⚠️ Au-dessus de la cible, on augmente le CRF.") new_crf = min(crf_used + 4, 40) return True, new_crf print(" ✓ Taille conforme, pas de re-encode.") return False, crf_used # ============================================================ # NAVIGATION DOSSIERS (inquirer) # ============================================================ def list_subdirs(path: Path) -> List[str]: out = [] try: for p in sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())): if p.is_dir(): out.append(p.name + "/") except PermissionError: print("⚠️ Permission refusée :", path) return out def browse_dir(start: str = "Z:\\") -> Path: cur = Path(start).resolve() if not cur.exists(): cur = Path("Z:\\") while True: header = f"[ {cur} ]" choices = [header] if cur != cur.anchor: choices.append("⬆️ ..") subs = list_subdirs(cur) if subs: choices.extend(subs) else: choices.append("(aucun sous-dossier)") choices.append("✅ Valider ici") ans = inquirer.prompt([ inquirer.List("sel", message="Navigue puis valide", choices=choices) ]) if not ans: raise SystemExit("Annulé.") sel = ans["sel"] if sel == "⬆️ ..": cur = cur.parent continue if sel == "✅ Valider ici": return cur if sel in ("(aucun sous-dossier)", header): continue nxt = cur / sel.rstrip("/") if nxt.exists() and nxt.is_dir(): cur = nxt else: print("⚠️ Inaccessible :", nxt) def browse_dirs_multi(start: str = "Z:\\") -> List[Path]: parent = browse_dir(start) subs = [p for p in parent.iterdir() if p.is_dir()] if not subs: raise SystemExit("Aucun sous-dossier dans ce dossier.") CHOICE_PARENT = "📁 [Inclure aussi ce dossier]" choices = [CHOICE_PARENT] + [p.name + "/" for p in sorted(subs, key=lambda x: x.name.lower())] ans = inquirer.prompt([ inquirer.Checkbox("dirs", message=f"Sélection dans {parent}", choices=choices) ]) if not ans or not ans["dirs"]: raise SystemExit("Annulé.") out = [] for c in ans["dirs"]: if c == CHOICE_PARENT: out.append(parent) else: out.append(parent / c.rstrip("/")) return out # ============================================================ # PROMPTS UTILISATEUR & DELETE # ============================================================ def ask_transcode_options(): q = [ inquirer.List( "overwrite", message="Un fichier _converted existe déjà, que faire ?", choices=["Ne pas écraser (skip)", "Écraser"], default="Ne pas écraser (skip)", ), inquirer.Confirm( "del_src", message="Supprimer le fichier source après conversion réussie ?", default=True, ), ] ans = inquirer.prompt(q) if not ans: raise SystemExit("Annulé.") overwrite = (ans["overwrite"] == "Écraser") del_src = bool(ans["del_src"]) return overwrite, del_src def ask_mode(): q = [inquirer.List( "mode", message="Que veux-tu faire ?", choices=[ "Convert : Fichier", "Convert : Répertoire", "Convert : Répertoire Récursif", "Delete : Delete Origine File", ], )] ans = inquirer.prompt(q) if not ans: raise SystemExit("Annulé.") return ans["mode"] def delete_sources_if_converted(): root = browse_dir("Z:\\") originals = [p for p in root.rglob("*") if is_original_file(p)] if not originals: print("Aucun fichier source trouvé.") return deletable = [] missing = [] for p in originals: conv = build_output(p) if conv.exists(): deletable.append(p) else: missing.append(p) print("\n--- DRY RUN ---") print("Total originaux :", len(originals)) print("Supprimables (converted présent) :", len(deletable)) print("Sans converted :", len(missing)) q = [inquirer.Confirm( "go", message=f"Supprimer {len(deletable)} fichiers ?", default=False, )] ans = inquirer.prompt(q) if not ans or not ans["go"]: print("Annulé.") return deleted = 0 for p in deletable: try: os.remove(p) deleted += 1 print("🗑️ Supprimé :", p) except Exception as e: print("❌ Erreur suppression :", e) print("\n--- Résultat ---") print("Supprimés :", deleted) print("Échecs :", len(deletable) - deleted) # ============================================================ # MAIN # ============================================================ def main(): check_handbrake() check_mediainfo() mode = ask_mode() if mode == "Delete : Delete Origine File": delete_sources_if_converted() return overwrite, delete_source = ask_transcode_options() if mode == "Convert : Fichier": root = browse_dir("Z:\\") cand = [p for p in root.iterdir() if is_original_file(p)] if not cand: print("Aucun fichier original ici.") return ans = inquirer.prompt([ inquirer.List("file", message="Choisir fichier", choices=[p.name for p in cand]) ]) if not ans: print("Annulé.") return files = [root / ans["file"]] elif mode == "Convert : Répertoire": dirs = browse_dirs_multi("Z:\\") files = [] for d in dirs: files.extend(list_videos(d, recursive=False)) elif mode == "Convert : Répertoire Récursif": dirs = browse_dirs_multi("Z:\\") files = [] for d in dirs: files.extend(list_videos(d, recursive=True)) else: print("Mode inconnu.") return if not files: print("Aucun fichier à convertir.") return total = len(files) print(f"\nFichiers à traiter : {total}") msg_id = telegram_start_progress(total) succ_no_retry = 0 succ_with_retry = 0 errors = 0 total_retries = 0 deleted_count = 0 failed = [] for idx, f in enumerate(files, 1): print(f"\n[{idx}/{total}] {f}") try: orig_size = f.stat().st_size except Exception: print("⚠️ Impossible de lire la taille du fichier.") errors += 1 telegram_update_progress(msg_id, idx, total, succ_no_retry, succ_with_retry, errors, f.name, None) continue crf, duration_s, hdr_type, kind_text, width, height, br, codec = analyze_file(f) ok, out, retries, err_msg = transcode_file( f, crf, hdr_type, width, height, overwrite ) if not ok: errors += 1 reason = err_msg or "Erreur inconnue" failed.append((kind_text, reason)) telegram_send(f"⚠️ Échec transcoding\n{kind_text}\n{reason}") telegram_update_progress(msg_id, idx, total, succ_no_retry, succ_with_retry, errors, f.name, None) continue if retries == 0: succ_no_retry += 1 else: succ_with_retry += 1 total_retries += retries try: conv_size = out.stat().st_size except Exception: conv_size = None if conv_size is not None and duration_s is not None: is_movie = ("Film" in kind_text) needs_re, new_crf = check_target_size( orig_size, conv_size, duration_s, is_movie, crf ) if needs_re: print(f" → Re-encode avec CRF={new_crf}") ok2, out2, _, err2 = transcode_file( f, new_crf, hdr_type, width, height, overwrite=True ) if ok2 and out2.exists(): new_size = out2.stat().st_size if new_size < conv_size: print(" ✓ Nouvelle version plus petite.") out = out2 conv_size = new_size else: print(" ❌ Nouvelle version plus grande, on garde l'ancienne.") else: print(" ❌ Re-encoding échoué :", err2 or "?") size_info = None if conv_size is not None: from_gib = orig_size / (1024**3) to_gib = conv_size / (1024**3) size_info = f"{from_gib:.2f} Go → {to_gib:.2f} Go".replace(".", ",") if delete_source: try: os.remove(f) deleted_count += 1 print(" 🗑️ Source supprimée.") except Exception as e: print(" ⚠️ Suppression source impossible :", e) telegram_update_progress( msg_id, idx, total, succ_no_retry, succ_with_retry, errors, f.name, size_info, ) print("\n--- Résumé final ---") print("Total fichiers :", total) print("Succès sans retry :", succ_no_retry) print("Succès avec retry :", succ_with_retry) print("Erreurs :", errors) print("Total retries :", total_retries) print("Sources supprimées :", deleted_count) if failed: print("\nFichiers en échec :") for label, reason in failed: print(" -", label, ":", reason) summary = ( "Résumé transcodage\n" f"Total fichiers : {total}\n" f"Succès sans retry : {succ_no_retry}\n" f"Succès avec retry : {succ_with_retry}\n" f"Erreurs : {errors}\n" f"Total retries : {total_retries}\n" f"Sources supprimées : {deleted_count}" ) if failed: summary += "\n\nÉchecs :\n" for label, reason in failed: summary += f" • {label} — {reason}\n" telegram_send(summary) if __name__ == "__main__": main()