refactor: -p now just sends a message with an exit code of 1 instead of a tkinter pop up

feat: added --working-dir/-w arg, that will store all the working files in
feat: added a check if the user tried to use both -c and -p it would error out
feat: if working directory isn't supplied it'll mke the working directly the file input
fix: grammar
feat: now writes json data for processed stuff, to prevent re processing if it's already been done before
feat: v0.1.1
This commit is contained in:
jlw4049 2024-07-22 15:36:40 -04:00
parent 99908e7349
commit a0014bfe7b

@ -4,18 +4,17 @@ import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from pymediainfo import MediaInfo from pymediainfo import MediaInfo
from tkinter import Tk, messagebox
from typing import Optional, Tuple from typing import Optional, Tuple
__version__ = "0.1.0" __version__ = "0.1.1"
def exit_print(message: Optional[str], exit_code: Optional[int]) -> None: def exit_print(message: Optional[str] = "", exit_code: Optional[int] = 0) -> None:
print(message if message else "") print(message)
sys.exit(exit_code if exit_code else 0) sys.exit(exit_code)
def cli() -> Tuple[Path, Path, Path, bool, bool]: def parse_arguments() -> Tuple[Path, Path, Path, bool, bool, Path]:
parser = argparse.ArgumentParser(prog="DV Check") parser = argparse.ArgumentParser(prog="DV Check")
parser.add_argument("-f", "--ffmpeg", type=str, help="Path to FFMPEG executable.") parser.add_argument("-f", "--ffmpeg", type=str, help="Path to FFMPEG executable.")
parser.add_argument( parser.add_argument(
@ -25,7 +24,7 @@ def cli() -> Tuple[Path, Path, Path, bool, bool]:
"-p", "-p",
"--prompt", "--prompt",
action="store_true", action="store_true",
help="Prompt user with a GUI prompt if FEL.", help="Prompt user with an exit code of 1 if FEL with a message.",
) )
parser.add_argument( parser.add_argument(
"-c", "-c",
@ -33,10 +32,16 @@ def cli() -> Tuple[Path, Path, Path, bool, bool]:
action="store_true", action="store_true",
help="Create FEL output mkv if FEL layer exists (input name_el.ext).", help="Create FEL output mkv if FEL layer exists (input name_el.ext).",
) )
parser.add_argument(
"-w", "--working-dir", type=str, help="Directory to store working files in."
)
parser.add_argument("input", type=str, help="Path to input file.") parser.add_argument("input", type=str, help="Path to input file.")
args = parser.parse_args() args = parser.parse_args()
if args.create_fel_mkv and args.prompt:
exit_print("You can only supply one of '-c' or '-p'.", 1)
if not args.input: if not args.input:
exit_print("Program requires an input file", 1) exit_print("Program requires an input file", 1)
@ -46,58 +51,67 @@ def cli() -> Tuple[Path, Path, Path, bool, bool]:
if not args.dovi_tool: if not args.dovi_tool:
exit_print("Program requires dovi_tool", 1) exit_print("Program requires dovi_tool", 1)
working_dir = (
Path(args.working_dir) if args.working_dir else Path(args.input).parent
)
return ( return (
Path(args.input), Path(args.input),
Path(args.ffmpeg), Path(args.ffmpeg),
Path(args.dovi_tool), Path(args.dovi_tool),
args.prompt, args.prompt,
args.create_fel_mkv, args.create_fel_mkv,
working_dir,
) )
def data_path(file_input: Path, working_dir: Path) -> Path:
return working_dir / f"{file_input.stem}_el_data.json"
def load_existing_data(file_input: Path, working_dir: Path) -> Optional[dict]:
json_path = data_path(file_input, working_dir)
if not json_path.exists():
return None
try:
with open(json_path, "r") as json_file:
data = json.load(json_file)
if (
data
and file_input.name == data.get("filename")
and data.get("rpu_status")
):
return data
except json.JSONDecodeError:
json_path.unlink()
return None
def detect_el_layer(file_input: Path) -> bool: def detect_el_layer(file_input: Path) -> bool:
if not file_input.exists() or not file_input.is_file(): if not file_input.exists() or not file_input.is_file():
exit_print("Input file not found", 1) exit_print("Input file not found", 1)
if file_input.suffix not in (".mp4", ".mkv"): if file_input.suffix not in {".mp4", ".mkv"}:
exit_print("Only MP4/MKV input's are supported", 1) exit_print("Only MP4/MKV inputs are supported", 1)
media_info = MediaInfo.parse(file_input) media_info = MediaInfo.parse(file_input)
try: try:
video_track = media_info.video_tracks[0] video_track = media_info.video_tracks[0]
if "Dolby Vision" in video_track.hdr_format:
hdr_format = video_track.hdr_format if "EL" in video_track.hdr_format_settings:
if not hdr_format:
exit_print("No HDR detected", 0)
if "Dolby Vision" in hdr_format:
hdr_format_settings = video_track.hdr_format_settings
if not hdr_format_settings:
exit_print("No HDR detected", 0)
if "EL" in hdr_format_settings:
return True return True
else: exit_print("EL layer not detected", 0)
exit_print("EL layer not detected", 0) exit_print("No Dolby Vision metadata detected", 0)
else:
exit_print("No dolby vision metadata detected", 0)
except IndexError: except IndexError:
exit_print("Input has no video track", 1) exit_print("Input has no video track", 1)
return False return False
def read_rpu( def read_rpu(
file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path, working_dir: Path
) -> Optional[str]: ) -> Optional[str]:
# output_el_hevc = file_input.with_suffix(".hevc").with_name(f"{file_input.stem}_el")
output_bin = file_input.with_suffix(".bin") output_bin = file_input.with_suffix(".bin")
# Change the working directory to the directory containing the input file
file_input_parent = file_input.parent
# Define the ffmpeg command
ffmpeg_command = [ ffmpeg_command = [
str(ffmpeg_path), str(ffmpeg_path),
"-ss", "-ss",
@ -119,51 +133,54 @@ def read_rpu(
"-v", "-v",
"panic", "panic",
] ]
dovi_tool_command = [str(dovi_tool_path), "extract-rpu", "-", "-o", str(output_bin)]
# Define the dovi_tool command
dovi_tool_command = [dovi_tool_path, "extract-rpu", "-", "-o", str(output_bin)]
# Run the ffmpeg command and pipe its output to the dovi_tool command
with subprocess.Popen( with subprocess.Popen(
ffmpeg_command, cwd=file_input_parent, stdout=subprocess.PIPE ffmpeg_command, stdout=subprocess.PIPE, cwd=file_input.parent
) as ffmpeg_proc: ) as ffmpeg_proc:
with subprocess.Popen( with subprocess.Popen(
dovi_tool_command, dovi_tool_command, stdin=ffmpeg_proc.stdout, cwd=file_input.parent
cwd=file_input_parent,
stdin=ffmpeg_proc.stdout,
stdout=subprocess.DEVNULL,
) as dovi_proc: ) as dovi_proc:
ffmpeg_proc.stdout.close() # Close the ffmpeg stdout to allow it to receive a SIGPIPE if dovi_proc exits ffmpeg_proc.stdout.close()
dovi_proc.communicate() # Wait for the dovi_tool process to complete dovi_proc.communicate()
json_out = data_path(file_input, working_dir)
if output_bin.exists(): if output_bin.exists():
rpu_data_cmd = [dovi_tool_path, "info", "-f", "0", "-i", str(output_bin)] rpu_data_cmd = [str(dovi_tool_path), "info", "-f", "0", "-i", str(output_bin)]
rpu_data_job = subprocess.run(rpu_data_cmd, check=True, capture_output=True) rpu_data_job = subprocess.run(rpu_data_cmd, check=True, capture_output=True)
rpu_stdout_data = json.loads( rpu_stdout_data = json.loads(
rpu_data_job.stdout.decode().split("Parsing RPU file...")[1] rpu_data_job.stdout.decode().split("Parsing RPU file...")[1]
) )
el_type = rpu_stdout_data.get("el_type") el_type = rpu_stdout_data.get("el_type")
# clean up files json_data = {"filename": file_input.name, "rpu_status": True, "EL": el_type}
output_bin.unlink() with open(json_out, "w") as json_file:
json.dump(json_data, json_file)
output_bin.unlink()
return el_type return el_type
json_data = {"filename": file_input.name, "rpu_status": False, "EL": None}
with open(json_out, "w") as json_file:
json.dump(json_data, json_file)
return None
def prompt_user() -> None:
root = Tk() def create_fel(
root.withdraw() file_input: Path, working_dir: Path, ffmpeg_path: Path, dovi_tool_path: Path
root.attributes('-topmost', True) ) -> str:
messagebox.showinfo( data = load_existing_data(file_input, working_dir) or {}
"Alert",
"FEL source detected, please switch to a FEL enabled template to process this encode.", el_path = data.get("EL_path")
parent=root if el_path:
return el_path
el_output_path = (
working_dir / file_input.with_name(f"{file_input.stem}_el.hevc").name
) )
root.destroy() el_mkv_output_path = el_output_path.with_suffix(".mkv")
root.mainloop()
def create_fel(file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path) -> None:
ffmpeg_command = [ ffmpeg_command = [
str(ffmpeg_path), str(ffmpeg_path),
"-i", "-i",
@ -182,11 +199,8 @@ def create_fel(file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path) -> Non
"panic", "panic",
"-stats", "-stats",
] ]
el_output_path = file_input.with_name(f"{file_input.stem}_el.hevc")
dovi_tool_command = [ dovi_tool_command = [
dovi_tool_path, str(dovi_tool_path),
"demux", "demux",
"--el-only", "--el-only",
"-", "-",
@ -194,13 +208,14 @@ def create_fel(file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path) -> Non
str(el_output_path), str(el_output_path),
] ]
with subprocess.Popen(ffmpeg_command, stdout=subprocess.PIPE) as ffmpeg_proc: with subprocess.Popen(
ffmpeg_command, stdout=subprocess.PIPE, cwd=file_input.parent
) as ffmpeg_proc:
with subprocess.Popen( with subprocess.Popen(
dovi_tool_command, dovi_tool_command, stdin=ffmpeg_proc.stdout, cwd=file_input.parent
stdin=ffmpeg_proc.stdout,
) as dovi_proc: ) as dovi_proc:
ffmpeg_proc.stdout.close() # Close the ffmpeg stdout to allow it to receive a SIGPIPE if dovi_proc exits ffmpeg_proc.stdout.close()
dovi_proc.communicate() # Wait for the dovi_tool process to complete dovi_proc.communicate()
ffmpeg_mux_command = [ ffmpeg_mux_command = [
str(ffmpeg_path), str(ffmpeg_path),
@ -211,24 +226,63 @@ def create_fel(file_input: Path, ffmpeg_path: Path, dovi_tool_path: Path) -> Non
"copy", "copy",
"-f", "-f",
"hevc", "hevc",
str(el_output_path.with_suffix(".mkv")), str(el_mkv_output_path),
] ]
subprocess.run(ffmpeg_mux_command) subprocess.run(ffmpeg_mux_command)
data["EL_path"] = str(el_mkv_output_path)
with open(data_path(file_input, working_dir), "w") as json_file:
json.dump(data, json_file)
return el_mkv_output_path
if __name__ == "__main__": if __name__ == "__main__":
file_input, ffmpeg, dovi_tool, prompt, create_fel_mkv = cli() file_input, ffmpeg, dovi_tool, prompt, create_fel_mkv, working_dir = (
detect_el = detect_el_layer(file_input) parse_arguments()
if detect_el: )
rpu = read_rpu(file_input, ffmpeg, dovi_tool)
if rpu:
rpu = rpu.strip()
print(rpu)
if prompt and rpu == "FEL":
prompt_user()
if create_fel_mkv:
create_fel(file_input, ffmpeg, dovi_tool)
else:
print("No detected EL layer")
exit_print("", 0) working_dir.mkdir(exist_ok=True, parents=True)
if detect_el_layer(file_input):
existing_data = load_existing_data(file_input, working_dir)
if existing_data:
el_type = existing_data.get("EL")
if el_type:
if el_type != "FEL" and create_fel:
exit_print("EL layer is not FEL, select a different template", 1)
print(f"Data loaded, EL type: {el_type}")
if el_type == "FEL":
if prompt:
exit_print(
"You are required to use a FEL template to process this file",
1,
)
if create_fel_mkv:
created_fel = create_fel(
file_input, working_dir, ffmpeg, dovi_tool
)
exit_print(f"FEL layer: {created_fel}", 0)
else:
rpu = read_rpu(file_input, ffmpeg, dovi_tool, working_dir)
if rpu:
if rpu != "FEL" and create_fel:
exit_print("EL layer is not FEL, select a different template", 1)
if rpu == "MEL":
exit_print("EL layer is MEL, no processing needed", 0)
elif rpu == "FEL":
if prompt:
exit_print(
"You are required to use a FEL template to process this file",
1,
)
if create_fel_mkv:
created_fel = create_fel(
file_input, working_dir, ffmpeg, dovi_tool
)
exit_print(f"FEL layer: {created_fel}", 0)
else:
exit_print("No RPU data extracted", 1)
else:
exit_print("No EL layer detected", 0)