#!/usr/bin/env python3 import os import shutil import subprocess from pathlib import Path import inquirer from typing import Optional, List import re import urllib.parse import urllib.request import urllib.error import json import ssl # ---------- Cible globale ---------- # Objectif: ~1,7 Gio par heure (taille totale fichier) ≈ 4 Mb/s global TARGET_GIB_PER_HOUR = 1.7 TARGET_BYTES_PER_SECOND = TARGET_GIB_PER_HOUR * (1024**3) / 3600.0 # ---------- Réglages HandBrake (SDR uniquement) ---------- HB_VIDEO_ENCODER = "nvenc_av1" HB_QUALITY = "30" # sera surchargé par fichier HB_PRESET = "slow" # priorité qualité HB_AUDIO_MODE = "av_aac" # transcode tout l'audio en AAC HB_LANGS = "fra,eng" # langues audio/sous-titres à garder (priorité) HB_CONTAINER_EXT = ".mkv" # conteneur de sortie # ---------- Réglages Retry / Telegram ---------- MAX_RETRIES = 10 # nombre de réessais APRÈS le premier échec TELEGRAM_ENABLED = True TELEGRAM_BOT_TOKEN = os.environ.get( "TELEGRAM_BOT_TOKEN", "8284311026:AAEWrrzpJ71wKsGJiTwhi84gWJ7ej2KORsM" ) TELEGRAM_CHAT_ID = os.environ.get( "TELEGRAM_CHAT_ID", "8418649348" ) TELEGRAM_PROGRESS_UPDATES = True # ⚠️ CONTEXTE SSL SANS VÉRIFICATION ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # Extensions vidéo prises en charge VIDEO_EXTS = { ".mp4", ".mkv", ".mov", ".avi", ".wmv", ".m4v", ".ts", ".m2ts", ".flv", ".webm", ".mpg", ".mpeg", ".vob" } # ---------- Outils requis ---------- def check_handbrake(): candidates = ["HandBrakeCLI", "HandBrakeCLI.exe"] found = any(shutil.which(c) is not None for c in candidates) if not found: raise SystemExit( "Erreur: HandBrakeCLI introuvable dans le PATH.\n" "- Vérifie que HandBrakeCLI (version CLI) est installé.\n" "- Ajoute son dossier au PATH, ou lance ce script depuis un terminal où " "'HandBrakeCLI --version' fonctionne." ) def check_mediainfo(): if shutil.which("mediainfo") is None: print("⚠️ Attention: 'mediainfo' introuvable dans le PATH.") print(" Le script fonctionnera mais sans cible 1,7 Gio/h ni CRF intelligent.") return False return True def check_ffmpeg(): if shutil.which("ffmpeg") is None: print("⚠️ Attention: 'ffmpeg' introuvable dans le PATH.") print(" Les fichiers HDR ne pourront pas être convertis (ils seront skip).") return False return True # ---------- Utilitaires fichiers ---------- def build_output_path(src: Path) -> Path: stem = src.stem out_stem = stem if stem.endswith("_converted") else f"{stem}_converted" return src.with_name(out_stem + HB_CONTAINER_EXT) def is_video_file(p: Path) -> bool: return p.is_file() and p.suffix.lower() in VIDEO_EXTS def is_original_video_file(p: Path) -> bool: return is_video_file(p) and not p.stem.endswith("_converted") def list_videos_in_dir(directory: Path, recursive: bool) -> list[Path]: if recursive: return [p for p in directory.rglob("*") if is_original_video_file(p)] else: return [p for p in directory.iterdir() if is_original_video_file(p)] # ---------- Détection Film / Série ---------- def is_series(src: Path, duration_s: Optional[float] = None) -> bool: name = src.stem # Pattern SxxExx if re.search(r"[Ss](\d{1,2})[ ._-]?[Ee](\d{1,2})", name): return True # Pattern 1x05 if re.search(r"\b\d{1,2}x\d{1,2}\b", name): return True # Durée courte -> probablement épisode if duration_s is not None and duration_s < 40 * 60: return True return False def is_movie(src: Path, duration_s: Optional[float] = None) -> bool: return not is_series(src, duration_s=duration_s) def classify_media(src: Path) -> str: name = src.stem m = re.search(r"[Ss](\d{1,2})[ ._-]?[Ee](\d{1,2})", name) if m: s = int(m.group(1)) e = int(m.group(2)) return f"Série: {name} (S{s:02d}E{e:02d})" else: return f"Film: {name}" # ---------- Mediainfo / HDR / CRF ---------- def run_mediainfo_json(path: Path) -> Optional[dict]: try: result = subprocess.run( ["mediainfo", "--Output=JSON", str(path)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, encoding="utf-8", errors="replace", check=False, ) if result.returncode != 0: print(f"⚠️ mediainfo a retourné un code {result.returncode} pour {path}") return None return json.loads(result.stdout) except FileNotFoundError: print("⚠️ mediainfo introuvable, CRF par défaut sera utilisé.") return None except Exception as e: print(f"⚠️ Erreur mediainfo sur {path}: {e}") return None def extract_video_params(mi: dict) -> tuple[Optional[int], Optional[int], Optional[int], Optional[str]]: try: tracks = mi.get("media", {}).get("track", []) video_track = next((t for t in tracks if t.get("@type") == "Video"), None) if not video_track: return None, None, None, None def _parse_dim(val: str) -> Optional[int]: if not val: return None part = str(val).split()[0] return int(part) width = _parse_dim(video_track.get("Width", "")) height = _parse_dim(video_track.get("Height", "")) br_str = video_track.get("BitRate") or video_track.get("Bit rate") bitrate_bits = int(br_str) if br_str and str(br_str).isdigit() else None codec = video_track.get("Format") return width, height, bitrate_bits, codec except Exception: return None, None, None, None def extract_duration_seconds(mi: dict) -> Optional[float]: try: tracks = mi.get("media", {}).get("track", []) general = next((t for t in tracks if t.get("@type") == "General"), None) if not general: general = next((t for t in tracks if "Duration" in t), None) if not general: return None dur = general.get("Duration") if not dur: return None dur_str = str(dur).strip() if dur_str.isdigit(): return float(dur_str) / 1000.0 return float(dur_str) except Exception: return None def detect_hdr_type(mi: dict) -> str: """ Retourne: - "DV" : Dolby Vision - "HDR10" : HDR10 / HDR10+ - "HLG" : Hybrid Log-Gamma - "SDR" : sinon """ try: tracks = mi.get("media", {}).get("track", []) video_track = next((t for t in tracks if t.get("@type") == "Video"), None) if not video_track: return "SDR" primaries = ( video_track.get("Color_primaries") or video_track.get("Color primaries") or "" ).upper() transfer = ( video_track.get("Transfer_characteristics") or video_track.get("Transfer characteristics") or "" ).upper() hdr_format = ( video_track.get("HDR_Format") or video_track.get("HDR format") or "" ).upper() has_bt2020 = "BT.2020" in primaries has_pq = "PQ" in transfer or "SMPTE ST 2084" in transfer has_hlg = "HLG" in transfer if "DOLBY VISION" in hdr_format: return "DV" if has_bt2020 and has_hlg: return "HLG" if has_bt2020 and has_pq: return "HDR10" return "SDR" except Exception: return "SDR" def decide_crf_from_height(height: Optional[int], high_quality: bool) -> int: """ high_quality = True -> profil film/haut de gamme high_quality = False -> profil série (plus compressé). """ if height is None: return 26 if high_quality else 30 if height >= 2000: # ~4K return 30 if high_quality else 34 elif height >= 1000: # ~1080p return 24 if high_quality else 30 elif height >= 700: # ~720p return 22 if high_quality else 28 else: # SD return 20 if high_quality else 26 def choose_crf_and_duration(src: Path) -> tuple[int, Optional[float], bool, str]: """ Utilise mediainfo pour choisir un CRF de base + récupérer la durée. Retourne: (crf, duration_s, is_movie_flag, hdr_type) """ mi = run_mediainfo_json(src) if not mi: print(" ⚠️ mediainfo indisponible, CRF de base = 30, durée inconnue.") print(" [DEBUG] hdr_type=SDR (fallback, mediainfo KO)") return 30, None, True, "SDR" width, height, bitrate_bits, codec = extract_video_params(mi) vbr_kbps = int(bitrate_bits / 1000) if bitrate_bits else None duration_s = extract_duration_seconds(mi) hdr_type = detect_hdr_type(mi) print(f" [DEBUG] hdr_type={hdr_type}") series_flag = is_series(src, duration_s=duration_s) movie_flag = not series_flag kind_base = "Film" if movie_flag else "Série" if hdr_type == "DV": kind = f"{kind_base} Dolby Vision" elif hdr_type == "HDR10": kind = f"{kind_base} HDR10" elif hdr_type == "HLG": kind = f"{kind_base} HLG" else: kind = kind_base if duration_s: print( f" [{kind}] mediainfo: {width}x{height}, " f"~{vbr_kbps or '?'} kb/s, codec={codec}, " f"durée ≈ {duration_s/60.0:.1f} min" ) else: print( f" [{kind}] mediainfo: {width}x{height}, " f"~{vbr_kbps or '?'} kb/s, codec={codec}, durée inconnue" ) # Tous les films + tous les HDR -> profil "high_quality" high_quality = movie_flag or hdr_type != "SDR" crf = decide_crf_from_height(height, high_quality=high_quality) print(f" → CRF de base choisi ({kind}): {crf}") return crf, duration_s, movie_flag, hdr_type # ---------- Telegram : helpers ---------- def _telegram_base_url(method: str) -> str: return f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}" def send_telegram_message(message: str): if not TELEGRAM_ENABLED: return if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: print("⚠️ Telegram non configuré (TOKEN/CHAT_ID manquant).") return data = urllib.parse.urlencode({ "chat_id": TELEGRAM_CHAT_ID, "text": message, }).encode("utf-8") try: with urllib.request.urlopen( _telegram_base_url("sendMessage"), data=data, timeout=10, context=ssl_context ) as resp: body = resp.read() if resp.status != 200: print(f"⚠️ Échec envoi Telegram: HTTP {resp.status} — {body!r}") except urllib.error.HTTPError as e: try: detail = e.read().decode("utf-8", errors="ignore") except Exception: detail = "" print(f"⚠️ Erreur envoi Telegram: {e} — réponse: {detail}") except Exception as e: print(f"⚠️ Erreur envoi Telegram: {e}") def send_telegram_message_with_result(message: str) -> Optional[int]: if not TELEGRAM_ENABLED: return None if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: print("⚠️ Telegram non configuré (TOKEN/CHAT_ID manquant).") return None data = urllib.parse.urlencode({ "chat_id": TELEGRAM_CHAT_ID, "text": message, }).encode("utf-8") try: with urllib.request.urlopen( _telegram_base_url("sendMessage"), data=data, timeout=10, context=ssl_context ) as resp: body = resp.read() payload = json.loads(body.decode("utf-8")) if not payload.get("ok"): print(f"⚠️ Réponse Telegram non OK: {payload}") return None return payload["result"]["message_id"] except urllib.error.HTTPError as e: try: detail = e.read().decode("utf-8", errors="ignore") except Exception: detail = "" print(f"⚠️ Erreur envoi Telegram (with_result): {e} — réponse: {detail}") return None except Exception as e: print(f"⚠️ Erreur envoi Telegram (with_result): {e}") return None def edit_telegram_message(message_id: int, text: str): if not TELEGRAM_ENABLED: return if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: return if message_id is None: return data = urllib.parse.urlencode({ "chat_id": TELEGRAM_CHAT_ID, "message_id": message_id, "text": text, }).encode("utf-8") try: with urllib.request.urlopen( _telegram_base_url("editMessageText"), data=data, timeout=10, context=ssl_context ) as resp: _ = resp.read() except urllib.error.HTTPError as e: try: detail = e.read().decode("utf-8", errors="ignore") except Exception: detail = "" if e.code == 400 and "message is not modified" in detail: return print(f"⚠️ Erreur edit Telegram: {e} — réponse: {detail}") except Exception as e: print(f"⚠️ Erreur edit Telegram: {e}") # ---------- Progress Telegram ---------- def build_progress_bar(done: int, total: int, length: int = 20) -> tuple[float, str]: if total <= 0: return 0.0, "░" * length ratio = done / total ratio = max(0.0, min(1.0, ratio)) filled = int(round(ratio * length)) bar = "█" * filled + "░" * (length - filled) percent = ratio * 100.0 return percent, bar def send_progress_start(total_files: int) -> Optional[int]: if not (TELEGRAM_ENABLED and TELEGRAM_PROGRESS_UPDATES): return None msg = ( "Transcodage en cours\n" f"0.00% (0/{total_files} fichiers)\n" "[░░░░░░░░░░░░░░░░░░░░]" ) return send_telegram_message_with_result(msg) def update_progress_message( message_id: int, done: int, total: int, success_no_retry: int, success_with_retry: int, errors: int, current_media: Optional[str] = None, last_size_info: Optional[str] = None, ): if not (TELEGRAM_ENABLED and TELEGRAM_PROGRESS_UPDATES): return percent, bar = build_progress_bar(done, total) text = ( "Transcodage en cours\n" f"{percent:.2f}% ({done}/{total} fichiers)\n" f"[{bar}]" ) if current_media: text += f"\n{current_media}" if last_size_info: text += f"\n{last_size_info}" text += ( "\n\n" f"✓ Sans retry : {success_no_retry}\n" f"✓ Avec retry : {success_with_retry}\n" f"✗ Échecs définitifs : {errors}" ) edit_telegram_message(message_id, text) # ---------- Navigateur de dossiers ---------- def list_subdirs(path: Path) -> List[str]: items = [] try: for entry in sorted(path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower())): if entry.is_dir(): items.append(entry.name + "/") except PermissionError: print(f"⚠️ Permission refusée: {path}") return items def browse_dir(start: str = "/") -> Path: current = Path(start).resolve() if not current.exists(): current = Path("/") while True: choices = [] header = f"[ {str(current)} ]" choices.append(header) if current != current.anchor: choices.append("⬆️ ..") subdirs = list_subdirs(current) if subdirs: choices.extend(subdirs) else: choices.append("(aucun sous-dossier)") choices.append("✅ Valider ici") ans = inquirer.prompt([inquirer.List("sel", message="Navigue puis valide", choices=choices)]) if not ans: raise SystemExit("Annulé.") sel = ans["sel"] if sel == "✅ Valider ici": return current if sel == "⬆️ ..": parent = current.parent if parent != current: current = parent continue if sel == "(aucun sous-dossier)" or sel == header: continue next_dir = current / sel.rstrip("/") if next_dir.exists() and next_dir.is_dir(): current = next_dir else: print(f"⚠️ Inaccessible: {next_dir}") def prompt_path_dir() -> Path: return browse_dir("Z:\\") def prompt_path_file() -> Path: d = browse_dir("Z:\\") files = [p for p in d.iterdir() if is_original_video_file(p)] if not files: raise SystemExit("Aucune vidéo originale (sans _converted) dans ce dossier.") choices = [f.name for f in sorted(files, key=lambda p: p.name.lower())] ans = inquirer.prompt([inquirer.List("file", message=f"Fichier dans {d}", choices=choices)]) if not ans: raise SystemExit("Annulé.") return d / ans["file"] def browse_dirs_multi(start: str = "Z:\\") -> List[Path]: parent = browse_dir(start) subdirs = [p for p in parent.iterdir() if p.is_dir()] if not subdirs: raise SystemExit(f"Aucun sous-dossier dans {parent}") CHOICE_PARENT = "📁 [Inclure aussi ce dossier]" choices = [CHOICE_PARENT] choices.extend(d.name + "/" for d in sorted(subdirs, key=lambda p: p.name.lower())) ans = inquirer.prompt([ inquirer.Checkbox( "dirs", message=f"Sélectionne un ou plusieurs dossiers dans {parent}", choices=choices, ) ]) if not ans or not ans["dirs"]: raise SystemExit("Aucun dossier sélectionné.") selected_paths: List[Path] = [] for item in ans["dirs"]: if item == CHOICE_PARENT: selected_paths.append(parent) else: selected_paths.append(parent / item.rstrip("/")) return selected_paths # ---------- Transcodage SDR via HandBrake ---------- def transcode_file(src: Path, overwrite: bool = False) -> tuple[int, Path, Optional[str]]: out_path = build_output_path(src) if out_path.exists() and not overwrite: print(f"[SKIP] Sortie existe déjà: {out_path}") return 0, out_path, None cmd = [ "HandBrakeCLI", "-i", str(src), "-o", str(out_path), "-e", HB_VIDEO_ENCODER, "-q", HB_QUALITY, "--encoder-preset", HB_PRESET, "-E", HB_AUDIO_MODE, "--all-audio", "--all-subtitles", "--subtitle-burned=none", "--optimize", ] print(f"[RUN] (SDR/AV1) {src.name} -> {out_path.name} (CRF={HB_QUALITY})") try: result = subprocess.run( cmd, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) error_msg = None if result.returncode == 0: print(f"[OK ] {out_path}") else: stderr_lines = [l.strip() for l in result.stderr.splitlines() if l.strip()] if stderr_lines: error_msg = stderr_lines[-1] else: error_msg = f"HandBrakeCLI exit code {result.returncode}" print(f"[ERR] Code {result.returncode} pour {src} — {error_msg}") return result.returncode, out_path, error_msg except FileNotFoundError: msg = "Erreur: HandBrakeCLI introuvable." print(msg) return 127, out_path, msg # ---------- Transcodage HDR -> SDR via ffmpeg + AV1 NVENC ---------- def transcode_hdr_file_ffmpeg( src: Path, crf: int, overwrite: bool, hdr_type: str, ) -> tuple[int, Path, Optional[str]]: """ Convertit HDR10/HLG en SDR BT.709 10 bits via ffmpeg + tone-mapping, puis encode en AV1 via av1_nvenc (GPU NVIDIA). - Tone-mapping générique: input HDR (BT.2020 + PQ/HLG) -> zscale -> linéaire -> tonemap=hable -> zscale -> BT.709 SDR -> format=p010le (10 bits) - Video: -c:v av1_nvenc -rc vbr -cq (cq ~ "CRF" : plus grand = plus compressé) - Audio: AAC 384kbps - Sous-titres: copy """ out_path = build_output_path(src) if out_path.exists() and not overwrite: print(f"[SKIP] Sortie existe déjà: {out_path}") return 0, out_path, None # Filtre tonemap générique HDR->SDR vf = ( "zscale=transfer=linear:npl=100," "tonemap=hable," "zscale=primaries=bt709:transfer=bt709:matrix=bt709," "format=p010le" ) cmd = [ "ffmpeg", "-y" if overwrite else "-n", "-i", str(src), "-map", "0:v:0", "-map", "0:a?", "-map", "0:s?", "-vf", vf, "-c:v", "av1_nvenc", "-rc", "vbr", "-cq", str(crf), "-pix_fmt", "p010le", "-c:a", "aac", "-b:a", "384k", "-c:s", "copy", str(out_path), ] print(f"[RUN] (HDR->{hdr_type}->SDR/AV1_NVENC) {src.name} -> {out_path.name} (CQ={crf})") try: result = subprocess.run( cmd, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) error_msg = None if result.returncode == 0: print(f"[OK ] {out_path}") else: stderr_lines = [l.strip() for l in result.stderr.splitlines() if l.strip()] if stderr_lines: error_msg = stderr_lines[-1] else: error_msg = f"ffmpeg exit code {result.returncode}" print(f"[ERR] Code {result.returncode} pour {src} — {error_msg}") return result.returncode, out_path, error_msg except FileNotFoundError: msg = "Erreur: ffmpeg introuvable." print(msg) return 127, out_path, msg # ---------- Prompts ---------- def ask_transcode_options() -> tuple[bool, bool]: questions = [ inquirer.List( "overwrite", message="Un fichier _converted existe déjà — que faire ?", choices=["Ne pas écraser (skip)", "Écraser"], default="Ne pas écraser (skip)", ), inquirer.Confirm( "del_src", message="Supprimer le fichier source après transcodage réussi ?", default=True, ), ] ans = inquirer.prompt(questions) if not ans: raise SystemExit("Annulé.") overwrite = ans["overwrite"] == "Écraser" delete_source_on_success = bool(ans["del_src"]) return overwrite, delete_source_on_success def select_mode() -> str: q = [inquirer.List( "mode", message="Que veux-tu faire ?", choices=[ "Convert : Fichier", "Convert : Répertoire", "Convert : Répertoire Récursif", "Delete : Delete Origine File", ], )] ans = inquirer.prompt(q) if not ans: raise SystemExit("Annulé.") return ans["mode"] # ---------- Suppression séparée ---------- def delete_sources_if_converted_exists(): d = prompt_path_dir() originals = [p for p in d.rglob("*") if is_original_video_file(p)] if not originals: print("Aucun fichier source trouvé.") return deletable, missing = [], [] print(f"\nAnalyse de {len(originals)} fichiers source… (vérif existence du converted)") for idx, src in enumerate(originals, 1): converted = build_output_path(src) print(f"[{idx}/{len(originals)}] {src.name}") if converted.exists() and converted.is_file(): deletable.append((src, converted)) print(" ✅ Converted correspondant trouvé") else: missing.append(src) print(" ⏭️ Pas de converted -> SKIP") print("\n--- Aperçu (dry-run) ---") print(f"Originaux total : {len(originals)}") print(f"Supprimables (existe converted) : {len(deletable)}") print(f"Sans converted : {len(missing)}") if not deletable: print("\nRien à supprimer en toute sécurité.") return q = [inquirer.Confirm("proceed", message=f"Supprimer définitivement {len(deletable)} fichier(s) source ?", default=False)] ans = inquirer.prompt(q) if not ans or not ans["proceed"]: print("Annulé.") return deleted = failed = 0 for src, _converted in deletable: try: os.remove(src) deleted += 1 print(f"🗑️ Supprimé: {src}") except Exception as e: failed += 1 print(f"❌ Échec suppression {src}: {e}") print("\n--- Résumé suppression ---") print(f"Supprimés: {deleted} | Échecs: {failed} | Conservés: {len(originals) - deleted}") # ---------- Programme principal ---------- def main(): check_handbrake() check_mediainfo() ffmpeg_available = check_ffmpeg() mode = select_mode() if mode == "Delete : Delete Origine File": delete_sources_if_converted_exists() return overwrite, delete_source_on_success = ask_transcode_options() if mode == "Convert : Fichier": files = [prompt_path_file()] elif mode == "Convert : Répertoire": dirs = browse_dirs_multi("Z:\\") files: list[Path] = [] for d in dirs: files.extend(list_videos_in_dir(d, recursive=False)) if not files: print("Aucun fichier vidéo original trouvé dans les dossiers sélectionnés.") return elif mode == "Convert : Répertoire Récursif": dirs = browse_dirs_multi("Z:\\") files: list[Path] = [] for d in dirs: files.extend(list_videos_in_dir(d, recursive=True)) if not files: print("Aucun fichier vidéo original trouvé dans les dossiers sélectionnés (récursif).") return else: raise SystemExit("Mode inconnu.") total_files = len(files) print(f"\nFichiers à traiter (originaux seulement) : {total_files}") progress_msg_id: Optional[int] = send_progress_start(total_files) errors = 0 success_no_retry = 0 success_with_retry = 0 total_retries = 0 deleted_sources = 0 failed_files: list[tuple[str, str]] = [] for idx, f in enumerate(files, 1): print(f"\n[{idx}/{total_files}] {f}") try: original_size = f.stat().st_size except FileNotFoundError: print(" ⚠️ Fichier introuvable, skip.") errors += 1 done = idx update_progress_message( progress_msg_id, done, total_files, success_no_retry, success_with_retry, errors, current_media=f.name, last_size_info=None, ) continue # --- Choix CRF + durée + type HDR --- crf_for_file, duration_s, is_movie_flag, hdr_type = choose_crf_and_duration(f) # --- Cas Dolby Vision : SKIP + notif Telegram --- if hdr_type == "DV": print(" ⚠️ Dolby Vision détecté. SKIP pour éviter une conversion foireuse.") media_label = classify_media(f) send_telegram_message( "Média Dolby Vision détecté (non converti)\n" f"{media_label}\n" f"Fichier : {f}" ) done = idx update_progress_message( progress_msg_id, done, total_files, success_no_retry, success_with_retry, errors, current_media=f.name, last_size_info="Dolby Vision : SKIP (original conservé)", ) continue # --- Cible de taille (commune HDR/SDR) --- target_size: Optional[int] = None if duration_s and duration_s > 0: target_size = int(duration_s * TARGET_BYTES_PER_SECOND) print( f" Cible taille ~{TARGET_GIB_PER_HOUR:.2f} Gio/h -> " f"{target_size / (1024**3):.2f} Gio pour ce média" ) else: print(" Durée inconnue, aucune cible taille ne sera appliquée.") use_ffmpeg_hdr = hdr_type in ("HDR10", "HLG") if use_ffmpeg_hdr and not ffmpeg_available: print(" ⚠️ HDR détecté mais ffmpeg indisponible. SKIP.") media_label = classify_media(f) send_telegram_message( "Média HDR détecté mais ffmpeg introuvable (non converti)\n" f"{media_label}\n" f"Fichier : {f}" ) done = idx update_progress_message( progress_msg_id, done, total_files, success_no_retry, success_with_retry, errors, current_media=f.name, last_size_info=f"{hdr_type} : SKIP (ffmpeg manquant)", ) continue global HB_QUALITY original_global_quality = HB_QUALITY if not use_ffmpeg_hdr: HB_QUALITY = str(crf_for_file) last_error_msg: Optional[str] = None rc = 0 attempts = 0 last_out_path: Optional[Path] = None # --- Boucle de transcodage (HandBrake AV1 pour SDR, ffmpeg AV1_NVENC pour HDR) --- for attempt in range(MAX_RETRIES + 1): attempts = attempt + 1 if attempt > 0: print(f" ↻ Retentative {attempt}/{MAX_RETRIES}") overwrite_flag = overwrite or (attempt > 0) if use_ffmpeg_hdr: rc, out_path, err_msg = transcode_hdr_file_ffmpeg( f, crf_for_file, overwrite_flag, hdr_type, ) else: rc, out_path, err_msg = transcode_file(f, overwrite_flag) last_out_path = out_path last_error_msg = err_msg if rc == 0: break HB_QUALITY = original_global_quality if rc != 0: errors += 1 short_reason = last_error_msg or (("ffmpeg" if use_ffmpeg_hdr else "HandBrakeCLI") + f" exit code {rc}") media_label = classify_media(f) failed_files.append((media_label, short_reason)) msg = ( "Échec transcodage après plusieurs essais\n" f"{media_label}\n" f"Tentatives: {attempts} (retries: {max(0, attempts - 1)})\n" f"Raison: {short_reason}" ) print(msg) send_telegram_message(msg) done = idx update_progress_message( progress_msg_id, done, total_files, success_no_retry, success_with_retry, errors, current_media=f.name, last_size_info=None, ) continue retries_for_file = max(0, attempts - 1) total_retries += retries_for_file if retries_for_file == 0: success_no_retry += 1 else: success_with_retry += 1 print(f" ✅ Succès après {retries_for_file} retry(s)") final_out_path = last_out_path if final_out_path and final_out_path.exists(): try: converted_size = final_out_path.stat().st_size except FileNotFoundError: converted_size = None else: converted_size = None # --- Comparaison taille / cible + éventuel re-encodage plus compressé --- if target_size is not None and converted_size is not None: converted_gib = converted_size / (1024**3) original_gib = original_size / (1024**3) target_gib = target_size / (1024**3) print( f" Taille originale : {original_gib:.2f} Gio\n" f" Taille convertie : {converted_gib:.2f} Gio\n" f" Taille cible : {target_gib:.2f} Gio" ) threshold_factor = 1.20 if is_movie_flag else 1.05 if converted_size > target_size * threshold_factor: print( f" ⚠️ Fichier converti au-dessus de la cible " f"(>{threshold_factor*100:.0f}% de la taille visée), " "tentative de re-encodage plus compressé." ) higher_crf = min(crf_for_file + 4, 40) print(f" → Re-encodage avec CRF/CQ plus élevé: {higher_crf}") if use_ffmpeg_hdr: rc2, out_path2, err_msg2 = transcode_hdr_file_ffmpeg( f, higher_crf, overwrite=True, hdr_type=hdr_type, ) else: HB_QUALITY = str(higher_crf) rc2, out_path2, err_msg2 = transcode_file(f, overwrite=True) HB_QUALITY = original_global_quality if rc2 == 0 and out_path2.exists(): new_size = out_path2.stat().st_size new_gib = new_size / (1024**3) if new_size < converted_size: print( f" ✅ Nouvelle version plus petite " f"({new_gib:.2f} Gio, vs {converted_gib:.2f} Gio)" ) final_out_path = out_path2 converted_size = new_size else: print( " ⚠️ Nouvelle version pas plus petite, " "on garde la première conversion." ) else: print( f" ⚠️ Re-encodage pour réduction de taille a échoué: " f"{err_msg2 or 'erreur inconnue'}" ) else: print(" ✅ Fichier dans la cible de taille pour ce média, pas de re-encode supplémentaire.") else: print(" (Pas de comparaison à la cible de taille : durée ou taille convertie inconnue.)") # --- Suppression source si demandé --- if delete_source_on_success: try: os.remove(f) deleted_sources += 1 print(f" 🗑️ Source supprimée: {f}") except Exception as e: print(f" ⚠️ Échec suppression source {f}: {e}") # --- Info taille pour Telegram --- last_size_info = None if final_out_path and converted_size is not None: orig_gib = original_size / (1024**3) conv_gib = converted_size / (1024**3) s = f"{orig_gib:.2f} Go -> {conv_gib:.2f} Go" last_size_info = s.replace(".", ",") done = idx update_progress_message( progress_msg_id, done, total_files, success_no_retry, success_with_retry, errors, current_media=f.name, last_size_info=last_size_info, ) # --- Résumé console --- print("\n--- Résumé ---") print(f"Total fichiers : {total_files}") print(f"Succès sans retry : {success_no_retry}") print(f"Succès avec retry : {success_with_retry}") print(f"Échecs définitifs : {errors}") print(f"Total retries effectués : {total_retries}") print(f"Sources supprimées : {deleted_sources}") if failed_files: print("\nFichiers en échec définitif :") for label, reason in failed_files: print(f" - {label} — {reason}") summary_msg = ( "Résumé transcodage\n" f"Total fichiers : {total_files}\n" f"Succès sans retry : {success_no_retry}\n" f"Succès avec retry : {success_with_retry}\n" f"Échecs définitifs : {errors}\n" f"Total retries effectués : {total_retries}\n" f"Sources supprimées : {deleted_sources}" ) if failed_files: summary_msg += "\n\nFichiers en échec définitif :\n" for label, reason in failed_files: summary_msg += f"• {label} — {reason}\n" send_telegram_message(summary_msg) if __name__ == "__main__": main()