DVCheck/dv_check.py
2024-07-23 16:36:53 -04:00

314 lines
8.8 KiB
Python

import argparse
import json
import subprocess
import sys
from pathlib import Path
from pymediainfo import MediaInfo
from typing import Optional, Tuple
__version__ = "0.1.3"
def exit_print(message: Optional[str] = "", exit_code: Optional[int] = 0) -> None:
print(message)
sys.exit(exit_code)
def parse_arguments() -> Tuple[Path, Path, Path, bool, bool, Path]:
parser = argparse.ArgumentParser(prog="DV Check")
parser.add_argument("-f", "--ffmpeg", type=str, help="Path to FFMPEG executable.")
parser.add_argument(
"-d", "--dovi-tool", type=str, help="Path to dovi_tool executable."
)
parser.add_argument(
"-p",
"--prompt",
action="store_true",
help="Prompt user with an exit code of 1 if FEL with a message.",
)
parser.add_argument(
"-c",
"--create-fel-mkv",
action="store_true",
help="Create FEL output mkv if FEL layer exists (input name_el.ext).",
)
parser.add_argument(
"-w", "--working-dir", type=str, help="Directory to store working files in."
)
parser.add_argument(
"-v", "--version", action="version", version=f"%(prog)s {__version__}"
)
parser.add_argument("input", type=str, help="Path to input file.")
args = parser.parse_args()
if args.create_fel_mkv and args.prompt:
exit_print("You can only supply one of '-c' or '-p'.", 1)
if not args.input:
exit_print("Program requires an input file", 1)
if not args.ffmpeg:
exit_print("Program requires FFEMPG", 1)
if not args.dovi_tool:
exit_print("Program requires dovi_tool", 1)
working_dir = (
Path(args.working_dir) if args.working_dir else Path(args.input).parent
)
return (
Path(args.input),
Path(args.ffmpeg),
Path(args.dovi_tool),
args.prompt,
args.create_fel_mkv,
working_dir,
)
def data_path(file_input: Path, working_dir: Path) -> Path:
json_output = working_dir / "dv_check" / f"{file_input.stem}_el_data.json"
json_output.parent.mkdir(exist_ok=True, parents=True)
return json_output
def load_existing_data(file_input: Path, working_dir: Path) -> Optional[dict]:
json_path = data_path(file_input, working_dir)
if not json_path.exists():
return None
try:
with open(json_path, "r") as json_file:
data = json.load(json_file)
if (
data
and file_input.name == data.get("filename")
and data.get("rpu_status")
):
return data
except json.JSONDecodeError:
json_path.unlink()
return None
def detect_el_layer(file_input: Path) -> bool:
if not file_input.exists() or not file_input.is_file():
exit_print("Input file not found", 1)
if file_input.suffix not in {".mp4", ".mkv"}:
exit_print("Only MP4/MKV inputs are supported", 1)
media_info = MediaInfo.parse(file_input)
try:
video_track = media_info.video_tracks[0]
if "Dolby Vision" in video_track.hdr_format:
if "EL" in video_track.hdr_format_settings:
return True
exit_print("EL layer not detected", 0)
exit_print("No Dolby Vision metadata detected", 0)
except IndexError:
exit_print("Input has no video track", 1)
return False
def read_rpu(
file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path, working_dir: Path
) -> Optional[str]:
output_bin = file_input.with_suffix(".bin")
ffmpeg_command = [
str(ffmpeg_path),
"-ss",
"00:00:10",
"-i",
str(file_input),
"-t",
"00:00:01",
"-map",
"0:v:0",
"-c:v:0",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"-f",
"hevc",
"-",
"-hide_banner",
"-v",
"panic",
]
dovi_tool_command = [str(dovi_tool_path), "extract-rpu", "-", "-o", str(output_bin)]
with subprocess.Popen(
ffmpeg_command, stdout=subprocess.PIPE, cwd=file_input.parent
) as ffmpeg_proc:
with subprocess.Popen(
dovi_tool_command, stdin=ffmpeg_proc.stdout, cwd=file_input.parent
) as dovi_proc:
ffmpeg_proc.stdout.close()
dovi_proc.communicate()
json_out = data_path(file_input, working_dir)
if output_bin.exists():
rpu_data_cmd = [str(dovi_tool_path), "info", "-f", "0", "-i", str(output_bin)]
rpu_data_job = subprocess.run(rpu_data_cmd, check=True, capture_output=True)
rpu_stdout_data = json.loads(
rpu_data_job.stdout.decode().split("Parsing RPU file...")[1]
)
el_type = rpu_stdout_data.get("el_type")
json_data = {"filename": file_input.name, "rpu_status": True, "EL": el_type}
with open(json_out, "w") as json_file:
json.dump(json_data, json_file)
output_bin.unlink()
return el_type
json_data = {"filename": file_input.name, "rpu_status": False, "EL": None}
with open(json_out, "w") as json_file:
json.dump(json_data, json_file)
return None
def create_fel(
file_input: Path, working_dir: Path, ffmpeg_path: Path, dovi_tool_path: Path
) -> str:
data = load_existing_data(file_input, working_dir) or {}
el_path = data.get("EL_path")
if el_path:
return el_path
el_output_path = (
working_dir
/ "dv_check"
/ file_input.with_name(f"{file_input.stem}_el.hevc").name
)
el_mkv_output_path = el_output_path.with_suffix(".mkv")
ffmpeg_command = [
str(ffmpeg_path),
"-i",
str(file_input),
"-map",
"0:v:0",
"-c:v:0",
"copy",
"-bsf:v",
"hevc_mp4toannexb",
"-f",
"hevc",
"-",
"-hide_banner",
"-v",
"panic",
"-stats",
]
dovi_tool_command = [
str(dovi_tool_path),
"demux",
"--el-only",
"-",
"-e",
str(el_output_path),
]
with subprocess.Popen(
ffmpeg_command, stdout=subprocess.PIPE, cwd=file_input.parent
) as ffmpeg_proc:
with subprocess.Popen(
dovi_tool_command, stdin=ffmpeg_proc.stdout, cwd=file_input.parent
) as dovi_proc:
ffmpeg_proc.stdout.close()
dovi_proc.communicate()
ffmpeg_mux_command = [
str(ffmpeg_path),
"-y",
"-i",
str(el_output_path),
"-c",
"copy",
"-f",
"hevc",
str(el_mkv_output_path),
]
subprocess.run(ffmpeg_mux_command)
data["EL_path"] = str(el_mkv_output_path)
with open(data_path(file_input, working_dir), "w") as json_file:
json.dump(data, json_file)
return el_mkv_output_path
def handle_fel_detection(
el_type: str,
prompt: bool,
create_fel_mkv: bool,
file_input: Path,
working_dir: Path,
ffmpeg: Path,
dovi_tool: Path,
) -> None:
if el_type != "FEL" and create_fel_mkv:
exit_print(
"You have loaded a Dolby Vision source file that does NOT have a Full "
"Enhancement Layer (FEL). You must load a standard template to continue "
"processing this file.",
1,
)
if el_type == "FEL":
if prompt:
exit_print(
"You have loaded a Dolby Vision source file with a Full Enhancement "
"Layer (FEL). You must load a FEL-enabled template to continue "
"processing this file.",
1,
)
if create_fel_mkv:
created_fel = create_fel(file_input, working_dir, ffmpeg, dovi_tool)
exit_print(f"FEL layer: {created_fel}", 0)
if __name__ == "__main__":
file_input, ffmpeg, dovi_tool, prompt, create_fel_mkv, working_dir = (
parse_arguments()
)
working_dir.mkdir(exist_ok=True, parents=True)
if detect_el_layer(file_input):
existing_data = load_existing_data(file_input, working_dir)
if existing_data:
el_type = existing_data.get("EL")
if el_type:
print(f"Data loaded, EL type: {el_type}")
handle_fel_detection(
el_type,
prompt,
create_fel_mkv,
file_input,
working_dir,
ffmpeg,
dovi_tool,
)
else:
rpu = read_rpu(file_input, ffmpeg, dovi_tool, working_dir)
if rpu:
handle_fel_detection(
rpu,
prompt,
create_fel_mkv,
file_input,
working_dir,
ffmpeg,
dovi_tool,
)
else:
exit_print("No RPU data extracted", 1)
else:
exit_print("No EL layer detected", 0)