FrameForge/frame_forge/__init__.py

607 lines
22 KiB
Python
Raw Permalink Normal View History

2024-01-24 12:11:06 -05:00
import re
import shutil
from random import choice
from pathlib import Path
from typing import Tuple
2024-01-24 12:11:06 -05:00
from numpy import linspace
from unidecode import unidecode
import awsmfunc
import vapoursynth as vs
2024-01-24 12:31:46 -05:00
from frame_forge.exceptions import FrameForgeError
from frame_forge.utils import get_working_dir, hex_to_bgr
2024-01-24 12:11:06 -05:00
class GenerateImages:
def __init__(
self,
source_file: Path,
encode_file: Path,
image_dir: Path,
indexer: str,
index_directory: None | str,
sub_size: int,
left_crop: int,
right_crop: int,
top_crop: int,
bottom_crop: int,
adv_resize_left: float,
adv_resize_right: float,
adv_resize_top: float,
adv_resize_bottom: float,
tone_map: bool,
re_sync: str,
comparison_count: int,
subtitle_color: str,
release_sub_title: str | None,
):
self.source_file = source_file
self.source_node = None
self.reference_source_file = None
self.encode_file = encode_file
self.encode_node = None
self.image_dir = image_dir
self.indexer = indexer
self.index_dir = index_directory
self.sub_size = sub_size
self.left_crop = left_crop
self.right_crop = right_crop
self.top_crop = top_crop
self.bottom_crop = bottom_crop
self.adv_resize_left = adv_resize_left
self.adv_resize_right = adv_resize_right
self.adv_resize_top = adv_resize_top
self.adv_resize_bottom = adv_resize_bottom
self.tone_map = tone_map
self.re_sync = re_sync
self.comparison_count = comparison_count
self.subtitle_color = subtitle_color
self.release_sub_title = release_sub_title
self.core = vs.core
self.load_plugins()
def process_images(self):
if self.indexer == "lsmash":
self.index_lsmash()
elif self.indexer == "ffms2":
self.index_ffms2()
num_source_frames = len(self.source_node)
num_encode_frames = len(self.encode_node)
# ASS subtitle styles
# Font Name, Font Size, Primary Color, Secondary Color, Outline Color, Back Color, Bold,
# Italic, Underline, Strikeout, Scale X, Scale Y, Spacing, Angle, Border Style, Outline Width,
# Shadow Depth, Alignment, Left Margin, Right Margin, Vertical Margin, Encoding
2024-01-24 12:11:06 -05:00
# bgr color
color = "&H000ac7f5"
if self.subtitle_color:
color = hex_to_bgr(self.subtitle_color)
selected_sub_style = (
f"Segoe UI,{self.sub_size},{color},&H00000000,&H00000000,&H00000000,"
"1,0,0,0,100,100,0,0,1,1,0,7,10,10,10,1"
2024-01-24 12:11:06 -05:00
)
sync_sub_base = (
"Segoe UI,{size},&H31FF31&,&H00000000,&H00000000,&H00000000,"
"1,0,0,0,100,100,0,0,1,1,0,{pos},10,10,10,1"
)
selected_sub_style_ref = sync_sub_base.format(
size=str(self.sub_size + 5), pos="7"
)
selected_sub_style_sync = sync_sub_base.format(
size=str(self.sub_size + 5), pos="9"
)
2024-01-24 12:11:06 -05:00
self.check_de_interlaced(num_source_frames, num_encode_frames)
b_frames = self.get_b_frames(num_source_frames)
screenshot_comparison_dir, screenshot_sync_dir = self.generate_folders()
self.handle_crop()
self.handle_resize()
self.handle_hdr()
vs_source_info, vs_encode_info = self.handle_subtitles(selected_sub_style)
img_job = self.generate_screens(
b_frames,
vs_source_info,
vs_encode_info,
screenshot_comparison_dir,
screenshot_sync_dir,
selected_sub_style_ref,
selected_sub_style_sync,
2024-01-24 12:11:06 -05:00
)
return img_job
@staticmethod
def screen_gen_callback(sg_call_back):
print(
str(sg_call_back).replace("ScreenGen: ", "").replace("\n", "").strip(),
flush=True,
)
def generate_ref_screens(
self, selected_sub_style_ref, frames: list, screenshot_sync_dir
):
"""Generates reference frames"""
for ref_frame in frames:
vs_encode_ref_info = self.core.sub.Subtitle(
clip=self.encode_node,
text=f"Reference\nFrame: {ref_frame}",
style=selected_sub_style_ref,
)
awsmfunc.ScreenGen(
vs_encode_ref_info,
frame_numbers=[ref_frame],
fpng_compression=1,
folder=screenshot_sync_dir,
suffix="b_encode__%d",
callback=self.screen_gen_callback,
)
def generate_sync_screens(
self, frame_list, selected_sub_style_sync, screenshot_sync_dir
):
"""Generates sync frames"""
for sync_frame in frame_list:
vs_sync_info = self.core.sub.Subtitle(
clip=self.source_node,
text=f"Sync\nFrame: {sync_frame}",
style=selected_sub_style_sync,
)
awsmfunc.ScreenGen(
vs_sync_info,
frame_numbers=[sync_frame],
fpng_compression=1,
folder=Path(screenshot_sync_dir),
suffix="a_source__%d",
callback=self.screen_gen_callback,
)
2024-01-24 12:11:06 -05:00
def generate_screens(
self,
b_frames,
vs_source_info,
vs_encode_info,
screenshot_comparison_dir,
screenshot_sync_dir,
selected_sub_style_ref,
selected_sub_style_sync,
) -> str:
2024-01-24 12:11:06 -05:00
print("\nGenerating screenshots, please wait", flush=True)
# handle re_sync if needed
sync_frames = []
if self.re_sync:
get_sync_digits = re.search(r"(\d+)", self.re_sync)
sync_digits = int(get_sync_digits.group(1)) if get_sync_digits else 0
for x_frames in b_frames:
if self.re_sync.startswith("-"):
sync_frames.append(int(x_frames) - sync_digits)
else:
sync_frames.append(int(x_frames) + sync_digits)
else:
sync_frames = b_frames
# generate source images
awsmfunc.ScreenGen(
vs_source_info,
frame_numbers=sync_frames,
fpng_compression=1,
folder=screenshot_comparison_dir,
suffix="a_source__%d",
callback=self.screen_gen_callback,
)
# generate encode images
awsmfunc.ScreenGen(
vs_encode_info,
frame_numbers=b_frames,
fpng_compression=1,
folder=screenshot_comparison_dir,
suffix="b_encode__%d",
callback=self.screen_gen_callback,
)
# generate some sync frames
print("\nGenerating a few sync frames", flush=True)
# select two frames randomly from list
get_sync_1 = choice(b_frames)
2024-01-24 12:11:06 -05:00
remove_sync1 = b_frames.copy()
remove_sync1.remove(get_sync_1)
get_sync_2 = choice(remove_sync1)
# sync list
ref_sync_list = sorted([get_sync_1, get_sync_2])
2024-01-24 12:11:06 -05:00
# reference subs
self.generate_ref_screens(
selected_sub_style_ref, ref_sync_list, screenshot_sync_dir
2024-01-24 12:11:06 -05:00
)
# sync subs 1
sync_subs_1 = [ref_sync_list[0] + i for i in range(-5, 6)]
2024-01-24 12:11:06 -05:00
self.generate_sync_screens(
sync_subs_1,
selected_sub_style_sync,
Path(Path(screenshot_sync_dir) / "sync1"),
2024-01-24 12:11:06 -05:00
)
# sync subs 2
sync_subs_2 = [ref_sync_list[1] + i for i in range(-5, 6)]
self.generate_sync_screens(
sync_subs_2,
selected_sub_style_sync,
Path(Path(screenshot_sync_dir) / "sync2"),
2024-01-24 12:11:06 -05:00
)
print("Screen generation completed", flush=True)
return str(screenshot_comparison_dir)
def handle_subtitles(self, selected_sub_style):
vs_source_info = self.core.sub.Subtitle(
clip=self.source_node, text="Source", style=selected_sub_style
)
vs_encode_info = awsmfunc.FrameInfo(
clip=self.encode_node,
title=self.release_sub_title if self.release_sub_title else "",
style=selected_sub_style,
)
return vs_source_info, vs_encode_info
def handle_hdr(self):
if self.tone_map:
self.source_node = awsmfunc.DynamicTonemap(
clip=self.source_node, libplacebo=False
)
self.encode_node = awsmfunc.DynamicTonemap(
clip=self.encode_node,
reference=self.reference_source_file,
libplacebo=False,
)
def handle_resize(self):
if (
self.source_node.width != self.encode_node.width
and self.source_node.height != self.encode_node.height
or any(
[
self.adv_resize_left,
self.adv_resize_right,
self.adv_resize_top,
self.adv_resize_bottom,
]
)
):
# advanced resize offset vars
advanced_resize_left = self.adv_resize_left if self.adv_resize_left else 0
advanced_resize_top = self.adv_resize_top if self.adv_resize_top else 0
advanced_resize_width = (
self.adv_resize_right if self.adv_resize_right else 0
)
advanced_resize_height = (
self.adv_resize_bottom if self.adv_resize_bottom else 0
)
# resize source to match encode for screenshots
self.source_node = self.core.resize.Spline36(
self.source_node,
width=int(self.encode_node.width),
height=int(self.encode_node.height),
src_left=advanced_resize_left,
src_top=advanced_resize_top,
src_width=float(
self.source_node.width
- (advanced_resize_left + advanced_resize_width)
),
src_height=float(
self.source_node.height
- (advanced_resize_top + advanced_resize_height)
),
dither_type="error_diffusion",
)
def handle_crop(self):
if any([self.left_crop, self.right_crop, self.top_crop, self.bottom_crop]):
self.source_node = self.core.std.Crop(
self.source_node,
left=self.left_crop if self.left_crop else 0,
right=self.right_crop if self.right_crop else 0,
top=self.top_crop if self.top_crop else 0,
bottom=self.bottom_crop if self.bottom_crop else 0,
)
def generate_folders(self):
print("\nCreating folders for images", flush=True)
if self.image_dir:
image_output_dir = Path(self.image_dir)
else:
image_output_dir = Path(
Path(self.encode_file).parent / f"{Path(self.encode_file).stem}_images"
)
# remove any accent characters from path
image_output_dir = Path(unidecode(str(image_output_dir)))
# check if temp image dir exists, if so delete it!
if image_output_dir.exists():
shutil.rmtree(image_output_dir, ignore_errors=True)
# create main image dir
image_output_dir.mkdir(exist_ok=True, parents=True)
# create comparison image directory and define it as variable
Path(Path(image_output_dir) / "img_comparison").mkdir(exist_ok=True)
screenshot_comparison_dir = str(Path(Path(image_output_dir) / "img_comparison"))
# create selected image directory and define it as variable
Path(Path(image_output_dir) / "img_selected").mkdir(exist_ok=True)
# create sync image directory and define it as variable
Path(Path(image_output_dir) / "img_sync").mkdir(exist_ok=True)
screenshot_sync_dir = str(Path(Path(image_output_dir) / "img_sync"))
# create sub directories
Path(Path(image_output_dir) / "img_sync/sync1").mkdir(exist_ok=True)
Path(Path(image_output_dir) / "img_sync/sync2").mkdir(exist_ok=True)
print("Folder creation completed", flush=True)
return screenshot_comparison_dir, screenshot_sync_dir
def get_b_frames(self, num_source_frames):
print(
f"\nGenerating {self.comparison_count} 'B' frames for " "comparison images",
flush=True,
)
b_frames = list(
linspace(
int(num_source_frames * 0.15),
int(num_source_frames * 0.75),
int(self.comparison_count),
).astype(int)
)
try:
for i, frame in enumerate(b_frames):
while (
self.encode_node.get_frame(frame).props["_PictType"].decode() != "B"
):
frame += 1
b_frames[i] = frame
except ValueError:
2024-01-24 12:31:46 -05:00
raise FrameForgeError(
2024-01-24 12:11:06 -05:00
"Error! Your encode file is likely an incomplete or corrupted encode"
)
print(f"Finished generating {self.comparison_count} 'B' frames", flush=True)
return b_frames
def check_de_interlaced(self, num_source_frames, num_encode_frames):
print("\nChecking if encode has been de-interlaced", flush=True)
try:
source_fps = float(self.source_node.fps)
encode_fps = float(self.encode_node.fps)
if source_fps != encode_fps:
if num_source_frames == num_encode_frames:
self.source_node = self.core.std.AssumeFPS(
self.source_node,
fpsnum=self.encode_node.fps.numerator,
fpsden=self.encode_node.fps.denominator,
)
print(
"Adjusting source fps to match the encode using AssumeFPS() on the source",
flush=True,
)
else:
even_frames_for = ""
if num_source_frames != num_encode_frames:
file_differences = float(num_source_frames / num_encode_frames)
if file_differences > 1.01:
even_frames_for = "source"
self.source_node = self.core.std.SelectEvery(
self.source_node, cycle=2, offsets=0
)
elif file_differences < 0.99:
even_frames_for = "encode"
self.encode_node = self.core.std.SelectEvery(
self.encode_node, cycle=2, offsets=0
)
print(
f"Source: FPS={source_fps} Frames={num_source_frames}\n"
f"Encode: FPS={encode_fps} Frames={num_encode_frames}\n"
"Source vs Encode appears to be different, we're going to assume encode has been"
f" de-interlaced, automatically generating even frames for {even_frames_for}",
flush=True,
)
else:
print("No de-interlacing detected", flush=True)
except ValueError:
print(
"There was an error while detecting source or encode fps, attempting to continue",
flush=True,
)
def index_lsmash(self):
print("Indexing source", flush=True)
# index source file
# if index is found in the StaxRip temp working directory, attempt to use it
if (
Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir()
and Path(
str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi"
).is_file()
):
print("Index found in StaxRip temp, attempting to use", flush=True)
# define cache path
lwi_cache_path = Path(
str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi"
)
# try to use index on source file with the cache path
try:
self.source_node = self.core.lsmas.LWLibavSource(
source=self.source_file, cachefile=lwi_cache_path
)
self.reference_source_file = self.core.lsmas.LWLibavSource(
source=self.source_file, cachefile=lwi_cache_path
)
print("Using existing index", flush=True)
# if index cannot be used
except vs.Error:
print("L-Smash version miss-match, indexing source again", flush=True)
# index source file
self.source_node = self.core.lsmas.LWLibavSource(self.source_file)
self.reference_source_file = self.core.lsmas.LWLibavSource(
self.source_file
)
# if no existing index is found index source file
else:
cache_path = Path(Path(self.source_file).with_suffix(".lwi"))
try:
# create index
self.source_node = self.core.lsmas.LWLibavSource(
self.source_file, cachefile=cache_path
)
self.reference_source_file = self.core.lsmas.LWLibavSource(
self.source_file, cachefile=cache_path
)
except vs.Error:
# delete index
Path(self.source_file).with_suffix(".lwi").unlink(missing_ok=True)
# create index
self.source_node = self.core.lsmas.LWLibavSource(
self.source_file, cachefile=cache_path
)
self.reference_source_file = self.core.lsmas.LWLibavSource(
self.source_file, cachefile=cache_path
)
print("Source index completed\n\nIndexing encode", flush=True)
# define a path for encode index to go
if self.index_dir:
index_base_path = Path(self.index_dir) / Path(self.encode_file).name
cache_path_enc = index_base_path.with_suffix(".lwi")
else:
cache_path_enc = Path(Path(self.encode_file).with_suffix(".lwi"))
try:
# create index
self.encode_node = self.core.lsmas.LWLibavSource(
self.encode_file, cachefile=cache_path_enc
)
except vs.Error:
# delete index
cache_path_enc.unlink(missing_ok=True)
# create index
self.encode_node = self.core.lsmas.LWLibavSource(
self.encode_file, cachefile=cache_path_enc
)
print("Encode index completed", flush=True)
def index_ffms2(self):
print("Indexing source", flush=True)
# index source file
# if index is found in the StaxRip temp working directory, attempt to use it
if (
Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir()
and Path(
str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex"
).is_file()
):
print("Index found in StaxRip temp, attempting to use", flush=True)
# define cache path
ffindex_cache_path = Path(
str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex"
)
# try to use index on source file with the cache path
try:
self.source_node = self.core.ffms2.Source(
source=self.source_file, cachefile=ffindex_cache_path
)
self.reference_source_file = self.core.ffms2.Source(
source=self.source_file, cachefile=ffindex_cache_path
)
print("Using existing index", flush=True)
# if index cannot be used
except vs.Error:
print("FFMS2 version miss-match, indexing source again", flush=True)
# index source file
self.source_node = self.core.ffms2.Source(self.source_file)
self.reference_source_file = self.core.ffms2.Source(self.source_file)
# if no existing index is found index source file
else:
try:
# create index
print(
"FFMS2 library doesn't allow progress, please wait while the index is completed",
flush=True,
)
self.source_node = self.core.ffms2.Source(self.source_file)
self.reference_source_file = self.core.ffms2.Source(self.source_file)
except vs.Error:
# delete index
Path(self.source_file).with_suffix(".ffindex").unlink(missing_ok=True)
# create index
print(
"FFMS2 library doesn't allow progress, please wait while the index is completed",
flush=True,
)
self.source_node = self.core.ffms2.Source(self.source_file)
self.reference_source_file = self.core.ffms2.Source(self.source_file)
print("Source index completed\n\nIndexing encode", flush=True)
# define a path for encode index to go
if self.index_dir:
index_base_path = Path(self.index_dir) / Path(self.encode_file).name
cache_path_enc = Path(str(index_base_path) + ".ffindex")
else:
cache_path_enc = Path(self.encode_file + ".ffindex")
try:
self.encode_node = self.core.ffms2.Source(
self.encode_file, cachefile=cache_path_enc
)
except vs.Error:
cache_path_enc.unlink(missing_ok=True)
self.encode_node = self.core.ffms2.Source(
self.encode_file, cachefile=cache_path_enc
)
print("Encode index completed", flush=True)
def load_plugins(self):
plugin_path = get_working_dir() / "img_plugins"
if not plugin_path.is_dir() and not plugin_path.exists():
2024-01-24 12:31:46 -05:00
raise FrameForgeError("Can not detect plugin directory")
2024-01-24 12:11:06 -05:00
else:
for plugin in plugin_path.glob("*.dll"):
self.core.std.LoadPlugin(Path(plugin).resolve())