import re import shutil from random import choice from pathlib import Path from typing import Tuple from numpy import linspace from unidecode import unidecode import awsmfunc import vapoursynth as vs from frame_forge.exceptions import FrameForgeError from frame_forge.utils import get_working_dir, hex_to_bgr class GenerateImages: def __init__( self, source_file: Path, encode_file: Path, image_dir: Path, indexer: str, index_directory: None | str, sub_size: int, left_crop: int, right_crop: int, top_crop: int, bottom_crop: int, adv_resize_left: float, adv_resize_right: float, adv_resize_top: float, adv_resize_bottom: float, tone_map: bool, re_sync: str, comparison_count: int, subtitle_color: str, release_sub_title: str | None, ): self.source_file = source_file self.source_node = None self.reference_source_file = None self.encode_file = encode_file self.encode_node = None self.image_dir = image_dir self.indexer = indexer self.index_dir = index_directory self.sub_size = sub_size self.left_crop = left_crop self.right_crop = right_crop self.top_crop = top_crop self.bottom_crop = bottom_crop self.adv_resize_left = adv_resize_left self.adv_resize_right = adv_resize_right self.adv_resize_top = adv_resize_top self.adv_resize_bottom = adv_resize_bottom self.tone_map = tone_map self.re_sync = re_sync self.comparison_count = comparison_count self.subtitle_color = subtitle_color self.release_sub_title = release_sub_title self.core = vs.core self.load_plugins() def process_images(self): if self.indexer == "lsmash": self.index_lsmash() elif self.indexer == "ffms2": self.index_ffms2() num_source_frames = len(self.source_node) num_encode_frames = len(self.encode_node) # ASS subtitle styles # Font Name, Font Size, Primary Color, Secondary Color, Outline Color, Back Color, Bold, # Italic, Underline, Strikeout, Scale X, Scale Y, Spacing, Angle, Border Style, Outline Width, # Shadow Depth, Alignment, Left Margin, Right Margin, Vertical Margin, Encoding # bgr color color = "&H000ac7f5" if self.subtitle_color: color = hex_to_bgr(self.subtitle_color) selected_sub_style = ( f"Segoe UI,{self.sub_size},{color},&H00000000,&H00000000,&H00000000," "1,0,0,0,100,100,0,0,1,1,0,7,10,10,10,1" ) sync_sub_base = ( "Segoe UI,{size},&H31FF31&,&H00000000,&H00000000,&H00000000," "1,0,0,0,100,100,0,0,1,1,0,{pos},10,10,10,1" ) selected_sub_style_ref = sync_sub_base.format( size=str(self.sub_size + 5), pos="7" ) selected_sub_style_sync = sync_sub_base.format( size=str(self.sub_size + 5), pos="9" ) self.check_de_interlaced(num_source_frames, num_encode_frames) b_frames = self.get_b_frames(num_source_frames) screenshot_comparison_dir, screenshot_sync_dir = self.generate_folders() self.handle_crop() self.handle_resize() self.handle_hdr() vs_source_info, vs_encode_info = self.handle_subtitles(selected_sub_style) img_job = self.generate_screens( b_frames, vs_source_info, vs_encode_info, screenshot_comparison_dir, screenshot_sync_dir, selected_sub_style_ref, selected_sub_style_sync, ) return img_job @staticmethod def screen_gen_callback(sg_call_back): print( str(sg_call_back).replace("ScreenGen: ", "").replace("\n", "").strip(), flush=True, ) def generate_ref_screens( self, selected_sub_style_ref, frames: list, screenshot_sync_dir ): """Generates reference frames""" for ref_frame in frames: vs_encode_ref_info = self.core.sub.Subtitle( clip=self.encode_node, text=f"Reference\nFrame: {ref_frame}", style=selected_sub_style_ref, ) awsmfunc.ScreenGen( vs_encode_ref_info, frame_numbers=[ref_frame], fpng_compression=1, folder=screenshot_sync_dir, suffix="b_encode__%d", callback=self.screen_gen_callback, ) def generate_sync_screens( self, frame_list, selected_sub_style_sync, screenshot_sync_dir ): """Generates sync frames""" for sync_frame in frame_list: vs_sync_info = self.core.sub.Subtitle( clip=self.source_node, text=f"Sync\nFrame: {sync_frame}", style=selected_sub_style_sync, ) awsmfunc.ScreenGen( vs_sync_info, frame_numbers=[sync_frame], fpng_compression=1, folder=Path(screenshot_sync_dir), suffix="a_source__%d", callback=self.screen_gen_callback, ) def generate_screens( self, b_frames, vs_source_info, vs_encode_info, screenshot_comparison_dir, screenshot_sync_dir, selected_sub_style_ref, selected_sub_style_sync, ) -> str: print("\nGenerating screenshots, please wait", flush=True) # handle re_sync if needed sync_frames = [] if self.re_sync: get_sync_digits = re.search(r"(\d+)", self.re_sync) sync_digits = int(get_sync_digits.group(1)) if get_sync_digits else 0 for x_frames in b_frames: if self.re_sync.startswith("-"): sync_frames.append(int(x_frames) - sync_digits) else: sync_frames.append(int(x_frames) + sync_digits) else: sync_frames = b_frames # generate source images awsmfunc.ScreenGen( vs_source_info, frame_numbers=sync_frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="a_source__%d", callback=self.screen_gen_callback, ) # generate encode images awsmfunc.ScreenGen( vs_encode_info, frame_numbers=b_frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="b_encode__%d", callback=self.screen_gen_callback, ) # generate some sync frames print("\nGenerating a few sync frames", flush=True) # select two frames randomly from list get_sync_1 = choice(b_frames) remove_sync1 = b_frames.copy() remove_sync1.remove(get_sync_1) get_sync_2 = choice(remove_sync1) # sync list ref_sync_list = sorted([get_sync_1, get_sync_2]) # reference subs self.generate_ref_screens( selected_sub_style_ref, ref_sync_list, screenshot_sync_dir ) # sync subs 1 sync_subs_1 = [ref_sync_list[0] + i for i in range(-5, 6)] self.generate_sync_screens( sync_subs_1, selected_sub_style_sync, Path(Path(screenshot_sync_dir) / "sync1"), ) # sync subs 2 sync_subs_2 = [ref_sync_list[1] + i for i in range(-5, 6)] self.generate_sync_screens( sync_subs_2, selected_sub_style_sync, Path(Path(screenshot_sync_dir) / "sync2"), ) print("Screen generation completed", flush=True) return str(screenshot_comparison_dir) def handle_subtitles(self, selected_sub_style): vs_source_info = self.core.sub.Subtitle( clip=self.source_node, text="Source", style=selected_sub_style ) vs_encode_info = awsmfunc.FrameInfo( clip=self.encode_node, title=self.release_sub_title if self.release_sub_title else "", style=selected_sub_style, ) return vs_source_info, vs_encode_info def handle_hdr(self): if self.tone_map: self.source_node = awsmfunc.DynamicTonemap( clip=self.source_node, libplacebo=False ) self.encode_node = awsmfunc.DynamicTonemap( clip=self.encode_node, reference=self.reference_source_file, libplacebo=False, ) def handle_resize(self): if ( self.source_node.width != self.encode_node.width and self.source_node.height != self.encode_node.height or any( [ self.adv_resize_left, self.adv_resize_right, self.adv_resize_top, self.adv_resize_bottom, ] ) ): # advanced resize offset vars advanced_resize_left = self.adv_resize_left if self.adv_resize_left else 0 advanced_resize_top = self.adv_resize_top if self.adv_resize_top else 0 advanced_resize_width = ( self.adv_resize_right if self.adv_resize_right else 0 ) advanced_resize_height = ( self.adv_resize_bottom if self.adv_resize_bottom else 0 ) # resize source to match encode for screenshots self.source_node = self.core.resize.Spline36( self.source_node, width=int(self.encode_node.width), height=int(self.encode_node.height), src_left=advanced_resize_left, src_top=advanced_resize_top, src_width=float( self.source_node.width - (advanced_resize_left + advanced_resize_width) ), src_height=float( self.source_node.height - (advanced_resize_top + advanced_resize_height) ), dither_type="error_diffusion", ) def handle_crop(self): if any([self.left_crop, self.right_crop, self.top_crop, self.bottom_crop]): self.source_node = self.core.std.Crop( self.source_node, left=self.left_crop if self.left_crop else 0, right=self.right_crop if self.right_crop else 0, top=self.top_crop if self.top_crop else 0, bottom=self.bottom_crop if self.bottom_crop else 0, ) def generate_folders(self): print("\nCreating folders for images", flush=True) if self.image_dir: image_output_dir = Path(self.image_dir) else: image_output_dir = Path( Path(self.encode_file).parent / f"{Path(self.encode_file).stem}_images" ) # remove any accent characters from path image_output_dir = Path(unidecode(str(image_output_dir))) # check if temp image dir exists, if so delete it! if image_output_dir.exists(): shutil.rmtree(image_output_dir, ignore_errors=True) # create main image dir image_output_dir.mkdir(exist_ok=True, parents=True) # create comparison image directory and define it as variable Path(Path(image_output_dir) / "img_comparison").mkdir(exist_ok=True) screenshot_comparison_dir = str(Path(Path(image_output_dir) / "img_comparison")) # create selected image directory and define it as variable Path(Path(image_output_dir) / "img_selected").mkdir(exist_ok=True) # create sync image directory and define it as variable Path(Path(image_output_dir) / "img_sync").mkdir(exist_ok=True) screenshot_sync_dir = str(Path(Path(image_output_dir) / "img_sync")) # create sub directories Path(Path(image_output_dir) / "img_sync/sync1").mkdir(exist_ok=True) Path(Path(image_output_dir) / "img_sync/sync2").mkdir(exist_ok=True) print("Folder creation completed", flush=True) return screenshot_comparison_dir, screenshot_sync_dir def get_b_frames(self, num_source_frames): print( f"\nGenerating {self.comparison_count} 'B' frames for " "comparison images", flush=True, ) b_frames = list( linspace( int(num_source_frames * 0.15), int(num_source_frames * 0.75), int(self.comparison_count), ).astype(int) ) try: for i, frame in enumerate(b_frames): while ( self.encode_node.get_frame(frame).props["_PictType"].decode() != "B" ): frame += 1 b_frames[i] = frame except ValueError: raise FrameForgeError( "Error! Your encode file is likely an incomplete or corrupted encode" ) print(f"Finished generating {self.comparison_count} 'B' frames", flush=True) return b_frames def check_de_interlaced(self, num_source_frames, num_encode_frames): print("\nChecking if encode has been de-interlaced", flush=True) try: source_fps = float(self.source_node.fps) encode_fps = float(self.encode_node.fps) if source_fps != encode_fps: if num_source_frames == num_encode_frames: self.source_node = self.core.std.AssumeFPS( self.source_node, fpsnum=self.encode_node.fps.numerator, fpsden=self.encode_node.fps.denominator, ) print( "Adjusting source fps to match the encode using AssumeFPS() on the source", flush=True, ) else: even_frames_for = "" if num_source_frames != num_encode_frames: file_differences = float(num_source_frames / num_encode_frames) if file_differences > 1.01: even_frames_for = "source" self.source_node = self.core.std.SelectEvery( self.source_node, cycle=2, offsets=0 ) elif file_differences < 0.99: even_frames_for = "encode" self.encode_node = self.core.std.SelectEvery( self.encode_node, cycle=2, offsets=0 ) print( f"Source: FPS={source_fps} Frames={num_source_frames}\n" f"Encode: FPS={encode_fps} Frames={num_encode_frames}\n" "Source vs Encode appears to be different, we're going to assume encode has been" f" de-interlaced, automatically generating even frames for {even_frames_for}", flush=True, ) else: print("No de-interlacing detected", flush=True) except ValueError: print( "There was an error while detecting source or encode fps, attempting to continue", flush=True, ) def index_lsmash(self): print("Indexing source", flush=True) # index source file # if index is found in the StaxRip temp working directory, attempt to use it if ( Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir() and Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi" ).is_file() ): print("Index found in StaxRip temp, attempting to use", flush=True) # define cache path lwi_cache_path = Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi" ) # try to use index on source file with the cache path try: self.source_node = self.core.lsmas.LWLibavSource( source=self.source_file, cachefile=lwi_cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( source=self.source_file, cachefile=lwi_cache_path ) print("Using existing index", flush=True) # if index cannot be used except vs.Error: print("L-Smash version miss-match, indexing source again", flush=True) # index source file self.source_node = self.core.lsmas.LWLibavSource(self.source_file) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file ) # if no existing index is found index source file else: cache_path = Path(Path(self.source_file).with_suffix(".lwi")) try: # create index self.source_node = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) except vs.Error: # delete index Path(self.source_file).with_suffix(".lwi").unlink(missing_ok=True) # create index self.source_node = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) print("Source index completed\n\nIndexing encode", flush=True) # define a path for encode index to go if self.index_dir: index_base_path = Path(self.index_dir) / Path(self.encode_file).name cache_path_enc = index_base_path.with_suffix(".lwi") else: cache_path_enc = Path(Path(self.encode_file).with_suffix(".lwi")) try: # create index self.encode_node = self.core.lsmas.LWLibavSource( self.encode_file, cachefile=cache_path_enc ) except vs.Error: # delete index cache_path_enc.unlink(missing_ok=True) # create index self.encode_node = self.core.lsmas.LWLibavSource( self.encode_file, cachefile=cache_path_enc ) print("Encode index completed", flush=True) def index_ffms2(self): print("Indexing source", flush=True) # index source file # if index is found in the StaxRip temp working directory, attempt to use it if ( Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir() and Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex" ).is_file() ): print("Index found in StaxRip temp, attempting to use", flush=True) # define cache path ffindex_cache_path = Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex" ) # try to use index on source file with the cache path try: self.source_node = self.core.ffms2.Source( source=self.source_file, cachefile=ffindex_cache_path ) self.reference_source_file = self.core.ffms2.Source( source=self.source_file, cachefile=ffindex_cache_path ) print("Using existing index", flush=True) # if index cannot be used except vs.Error: print("FFMS2 version miss-match, indexing source again", flush=True) # index source file self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) # if no existing index is found index source file else: try: # create index print( "FFMS2 library doesn't allow progress, please wait while the index is completed", flush=True, ) self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) except vs.Error: # delete index Path(self.source_file).with_suffix(".ffindex").unlink(missing_ok=True) # create index print( "FFMS2 library doesn't allow progress, please wait while the index is completed", flush=True, ) self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) print("Source index completed\n\nIndexing encode", flush=True) # define a path for encode index to go if self.index_dir: index_base_path = Path(self.index_dir) / Path(self.encode_file).name cache_path_enc = Path(str(index_base_path) + ".ffindex") else: cache_path_enc = Path(self.encode_file + ".ffindex") try: self.encode_node = self.core.ffms2.Source( self.encode_file, cachefile=cache_path_enc ) except vs.Error: cache_path_enc.unlink(missing_ok=True) self.encode_node = self.core.ffms2.Source( self.encode_file, cachefile=cache_path_enc ) print("Encode index completed", flush=True) def load_plugins(self): plugin_path = get_working_dir() / "img_plugins" if not plugin_path.is_dir() and not plugin_path.exists(): raise FrameForgeError("Can not detect plugin directory") else: for plugin in plugin_path.glob("*.dll"): self.core.std.LoadPlugin(Path(plugin).resolve())