import re import shutil from random import choice from pathlib import Path from numpy import linspace from unidecode import unidecode import awsmfunc import vapoursynth as vs from frame_forge.exceptions import FrameForgeError from frame_forge.utils import get_working_dir, hex_to_bgr class GenerateImages: def __init__( self, source_file: Path, encode_file: Path, frames: str, image_dir: Path, indexer: str, index_directory: None | str, sub_size: int, left_crop: int, right_crop: int, top_crop: int, bottom_crop: int, adv_resize_left: float, adv_resize_right: float, adv_resize_top: float, adv_resize_bottom: float, tone_map: bool, re_sync: str, comparison_count: int, subtitle_color: str, release_sub_title: str | None, ): self.source_file = source_file self.source_node = None self.reference_source_file = None self.encode_file = encode_file self.frames = frames self.encode_node = None self.image_dir = image_dir self.indexer = indexer self.index_dir = index_directory self.sub_size = sub_size self.left_crop = left_crop self.right_crop = right_crop self.top_crop = top_crop self.bottom_crop = bottom_crop self.adv_resize_left = adv_resize_left self.adv_resize_right = adv_resize_right self.adv_resize_top = adv_resize_top self.adv_resize_bottom = adv_resize_bottom self.tone_map = tone_map self.re_sync = re_sync self.comparison_count = comparison_count self.subtitle_color = subtitle_color self.release_sub_title = release_sub_title self.core = vs.core self.load_plugins() def process_images(self): if self.indexer == "lsmash": self.index_lsmash() elif self.indexer == "ffms2": self.index_ffms2() num_source_frames = len(self.source_node) num_encode_frames = len(self.encode_node) # ASS subtitle styles # Font Name, Font Size, Primary Color, Secondary Color, Outline Color, Back Color, Bold, # Italic, Underline, Strikeout, Scale X, Scale Y, Spacing, Angle, Border Style, Outline Width, # Shadow Depth, Alignment, Left Margin, Right Margin, Vertical Margin, Encoding # bgr color color = "&H000ac7f5" if self.subtitle_color: color = hex_to_bgr(self.subtitle_color) selected_sub_style = ( f"Segoe UI,{self.sub_size},{color},&H00000000,&H00000000,&H00000000," "1,0,0,0,100,100,0,0,1,1,0,7,10,10,10,1" ) sync_sub_base = ( "Segoe UI,{size},&H31FF31&,&H00000000,&H00000000,&H00000000," "1,0,0,0,100,100,0,0,1,1,0,{pos},10,10,10,1" ) selected_sub_style_ref = sync_sub_base.format( size=str(self.sub_size + 5), pos="7" ) selected_sub_style_sync = sync_sub_base.format( size=str(self.sub_size + 5), pos="9" ) self.check_de_interlaced(num_source_frames, num_encode_frames) b_frames = None if not self.frames: b_frames = self.get_b_frames(num_source_frames) screenshot_comparison_dir, screenshot_sync_dir = self.generate_folders() self.handle_crop() self.handle_resize() self.handle_hdr() vs_source_info, vs_encode_info = self.handle_subtitles(selected_sub_style) if not self.frames: img_job = self.generate_screens( b_frames, vs_source_info, vs_encode_info, screenshot_comparison_dir, screenshot_sync_dir, selected_sub_style_ref, selected_sub_style_sync, ) else: img_job = self.generate_exact_screens( vs_source_info, vs_encode_info, screenshot_comparison_dir, ) return img_job @staticmethod def screen_gen_callback(sg_call_back): print( str(sg_call_back).replace("ScreenGen: ", "").replace("\n", "").strip(), flush=True, ) def generate_ref_screens( self, selected_sub_style_ref, frames: list, screenshot_sync_dir ): """Generates reference frames""" for ref_frame in frames: vs_encode_ref_info = self.core.sub.Subtitle( clip=self.encode_node, text=f"Reference\nFrame: {ref_frame}", style=selected_sub_style_ref, ) awsmfunc.ScreenGen( vs_encode_ref_info, frame_numbers=[ref_frame], fpng_compression=1, folder=screenshot_sync_dir, suffix="b_encode__%d", callback=self.screen_gen_callback, ) def generate_sync_screens( self, frame_list, selected_sub_style_sync, screenshot_sync_dir ): """Generates sync frames""" for sync_frame in frame_list: vs_sync_info = self.core.sub.Subtitle( clip=self.source_node, text=f"Sync\nFrame: {sync_frame}", style=selected_sub_style_sync, ) awsmfunc.ScreenGen( vs_sync_info, frame_numbers=[sync_frame], fpng_compression=1, folder=Path(screenshot_sync_dir), suffix="a_source__%d", callback=self.screen_gen_callback, ) def generate_exact_screens( self, vs_source_info, vs_encode_info, screenshot_comparison_dir, ) -> str: print("\nGenerating screenshots, please wait", flush=True) # generate source images awsmfunc.ScreenGen( vs_source_info, frame_numbers=self.frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="a_source__%d", callback=self.screen_gen_callback, ) # generate encode images awsmfunc.ScreenGen( vs_encode_info, frame_numbers=self.frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="b_encode__%d", callback=self.screen_gen_callback, ) print("Screen generation completed", flush=True) return str(screenshot_comparison_dir) def generate_screens( self, b_frames, vs_source_info, vs_encode_info, screenshot_comparison_dir, screenshot_sync_dir, selected_sub_style_ref, selected_sub_style_sync, ) -> str: print("\nGenerating screenshots, please wait", flush=True) # handle re_sync if needed sync_frames = [] if self.re_sync: get_sync_digits = re.search(r"(\d+)", self.re_sync) sync_digits = int(get_sync_digits.group(1)) if get_sync_digits else 0 for x_frames in b_frames: if self.re_sync.startswith("-"): sync_frames.append(int(x_frames) - sync_digits) else: sync_frames.append(int(x_frames) + sync_digits) else: sync_frames = b_frames # generate source images awsmfunc.ScreenGen( vs_source_info, frame_numbers=sync_frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="a_source__%d", callback=self.screen_gen_callback, ) # generate encode images awsmfunc.ScreenGen( vs_encode_info, frame_numbers=b_frames, fpng_compression=1, folder=screenshot_comparison_dir, suffix="b_encode__%d", callback=self.screen_gen_callback, ) # generate some sync frames print("\nGenerating a few sync frames", flush=True) # select two frames randomly from list get_sync_1 = choice(b_frames) remove_sync1 = b_frames.copy() remove_sync1.remove(get_sync_1) get_sync_2 = choice(remove_sync1) # sync list ref_sync_list = sorted([get_sync_1, get_sync_2]) # reference subs self.generate_ref_screens( selected_sub_style_ref, ref_sync_list, screenshot_sync_dir ) # sync subs 1 sync_subs_1 = [ref_sync_list[0] + i for i in range(-5, 6)] self.generate_sync_screens( sync_subs_1, selected_sub_style_sync, Path(Path(screenshot_sync_dir) / "sync1"), ) # sync subs 2 sync_subs_2 = [ref_sync_list[1] + i for i in range(-5, 6)] self.generate_sync_screens( sync_subs_2, selected_sub_style_sync, Path(Path(screenshot_sync_dir) / "sync2"), ) print("Screen generation completed", flush=True) return str(screenshot_comparison_dir) def handle_subtitles(self, selected_sub_style): vs_source_info = self.core.sub.Subtitle( clip=self.source_node, text="Source", style=selected_sub_style ) vs_encode_info = awsmfunc.FrameInfo( clip=self.encode_node, title=self.release_sub_title if self.release_sub_title else "", style=selected_sub_style, ) return vs_source_info, vs_encode_info def handle_hdr(self): if self.tone_map: self.source_node = awsmfunc.DynamicTonemap( clip=self.source_node, libplacebo=False ) self.encode_node = awsmfunc.DynamicTonemap( clip=self.encode_node, reference=self.reference_source_file, libplacebo=False, ) def handle_resize(self): if ( self.source_node.width != self.encode_node.width and self.source_node.height != self.encode_node.height or any( [ self.adv_resize_left, self.adv_resize_right, self.adv_resize_top, self.adv_resize_bottom, ] ) ): # advanced resize offset vars advanced_resize_left = self.adv_resize_left if self.adv_resize_left else 0 advanced_resize_top = self.adv_resize_top if self.adv_resize_top else 0 advanced_resize_width = ( self.adv_resize_right if self.adv_resize_right else 0 ) advanced_resize_height = ( self.adv_resize_bottom if self.adv_resize_bottom else 0 ) # resize source to match encode for screenshots self.source_node = self.core.resize.Spline36( self.source_node, width=int(self.encode_node.width), height=int(self.encode_node.height), src_left=advanced_resize_left, src_top=advanced_resize_top, src_width=float( self.source_node.width - (advanced_resize_left + advanced_resize_width) ), src_height=float( self.source_node.height - (advanced_resize_top + advanced_resize_height) ), dither_type="error_diffusion", ) def handle_crop(self): if any([self.left_crop, self.right_crop, self.top_crop, self.bottom_crop]): self.source_node = self.core.std.Crop( self.source_node, left=self.left_crop if self.left_crop else 0, right=self.right_crop if self.right_crop else 0, top=self.top_crop if self.top_crop else 0, bottom=self.bottom_crop if self.bottom_crop else 0, ) def generate_folders(self): print("\nCreating folders for images", flush=True) if self.image_dir: image_output_dir = Path(self.image_dir) else: image_output_dir = Path( Path(self.encode_file).parent / f"{Path(self.encode_file).stem}_images" ) # remove any accent characters from path image_output_dir = Path(unidecode(str(image_output_dir))) # check if temp image dir exists, if so delete it! if image_output_dir.exists(): shutil.rmtree(image_output_dir, ignore_errors=True) # create main image dir image_output_dir.mkdir(exist_ok=True, parents=True) # create comparison image directory and define it as variable Path(Path(image_output_dir) / "img_comparison").mkdir(exist_ok=True) screenshot_comparison_dir = str(Path(Path(image_output_dir) / "img_comparison")) # create selected image directory and define it as variable Path(Path(image_output_dir) / "img_selected").mkdir(exist_ok=True) # create sync image directory and define it as variable Path(Path(image_output_dir) / "img_sync").mkdir(exist_ok=True) screenshot_sync_dir = str(Path(Path(image_output_dir) / "img_sync")) # create sub directories Path(Path(image_output_dir) / "img_sync/sync1").mkdir(exist_ok=True) Path(Path(image_output_dir) / "img_sync/sync2").mkdir(exist_ok=True) print("Folder creation completed", flush=True) return screenshot_comparison_dir, screenshot_sync_dir def get_b_frames(self, num_source_frames): print( f"\nGenerating {self.comparison_count} 'B' frames for " "comparison images", flush=True, ) b_frames = list( linspace( int(num_source_frames * 0.15), int(num_source_frames * 0.75), int(self.comparison_count), ).astype(int) ) try: for i, frame in enumerate(b_frames): while ( self.encode_node.get_frame(frame).props["_PictType"].decode() != "B" ): frame += 1 b_frames[i] = frame except ValueError: raise FrameForgeError( "Error! Your encode file is likely an incomplete or corrupted encode" ) print(f"Finished generating {self.comparison_count} 'B' frames", flush=True) return b_frames def check_de_interlaced(self, num_source_frames, num_encode_frames): print("\nChecking if encode has been de-interlaced", flush=True) try: source_fps = float(self.source_node.fps) encode_fps = float(self.encode_node.fps) if source_fps != encode_fps: if num_source_frames == num_encode_frames: self.source_node = self.core.std.AssumeFPS( self.source_node, fpsnum=self.encode_node.fps.numerator, fpsden=self.encode_node.fps.denominator, ) print( "Adjusting source fps to match the encode using AssumeFPS() on the source", flush=True, ) else: even_frames_for = "" if num_source_frames != num_encode_frames: file_differences = float(num_source_frames / num_encode_frames) if file_differences > 1.01: even_frames_for = "source" self.source_node = self.core.std.SelectEvery( self.source_node, cycle=2, offsets=0 ) elif file_differences < 0.99: even_frames_for = "encode" self.encode_node = self.core.std.SelectEvery( self.encode_node, cycle=2, offsets=0 ) print( f"Source: FPS={source_fps} Frames={num_source_frames}\n" f"Encode: FPS={encode_fps} Frames={num_encode_frames}\n" "Source vs Encode appears to be different, we're going to assume encode has been" f" de-interlaced, automatically generating even frames for {even_frames_for}", flush=True, ) else: print("No de-interlacing detected", flush=True) except ValueError: print( "There was an error while detecting source or encode fps, attempting to continue", flush=True, ) def index_lsmash(self): print("Indexing source", flush=True) # index source file # if index is found in the StaxRip temp working directory, attempt to use it if ( Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir() and Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi" ).is_file() ): print("Index found in StaxRip temp, attempting to use", flush=True) # define cache path lwi_cache_path = Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.lwi" ) # try to use index on source file with the cache path try: self.source_node = self.core.lsmas.LWLibavSource( source=self.source_file, cachefile=lwi_cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( source=self.source_file, cachefile=lwi_cache_path ) print("Using existing index", flush=True) # if index cannot be used except vs.Error: print("L-Smash version miss-match, indexing source again", flush=True) # index source file self.source_node = self.core.lsmas.LWLibavSource(self.source_file) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file ) # if no existing index is found index source file else: cache_path = Path(Path(self.source_file).with_suffix(".lwi")) try: # create index self.source_node = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) except vs.Error: # delete index Path(self.source_file).with_suffix(".lwi").unlink(missing_ok=True) # create index self.source_node = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) self.reference_source_file = self.core.lsmas.LWLibavSource( self.source_file, cachefile=cache_path ) print("Source index completed\n\nIndexing encode", flush=True) # define a path for encode index to go if self.index_dir: index_base_path = Path(self.index_dir) / Path(self.encode_file).name cache_path_enc = index_base_path.with_suffix(".lwi") else: cache_path_enc = Path(Path(self.encode_file).with_suffix(".lwi")) try: # create index self.encode_node = self.core.lsmas.LWLibavSource( self.encode_file, cachefile=cache_path_enc ) except vs.Error: # delete index cache_path_enc.unlink(missing_ok=True) # create index self.encode_node = self.core.lsmas.LWLibavSource( self.encode_file, cachefile=cache_path_enc ) print("Encode index completed", flush=True) def index_ffms2(self): print("Indexing source", flush=True) # index source file # if index is found in the StaxRip temp working directory, attempt to use it if ( Path(str(Path(self.source_file).with_suffix("")) + "_temp/").is_dir() and Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex" ).is_file() ): print("Index found in StaxRip temp, attempting to use", flush=True) # define cache path ffindex_cache_path = Path( str(Path(self.source_file).with_suffix("")) + "_temp/temp.ffindex" ) # try to use index on source file with the cache path try: self.source_node = self.core.ffms2.Source( source=self.source_file, cachefile=ffindex_cache_path ) self.reference_source_file = self.core.ffms2.Source( source=self.source_file, cachefile=ffindex_cache_path ) print("Using existing index", flush=True) # if index cannot be used except vs.Error: print("FFMS2 version miss-match, indexing source again", flush=True) # index source file self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) # if no existing index is found index source file else: try: # create index print( "FFMS2 library doesn't allow progress, please wait while the index is completed", flush=True, ) self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) except vs.Error: # delete index Path(self.source_file).with_suffix(".ffindex").unlink(missing_ok=True) # create index print( "FFMS2 library doesn't allow progress, please wait while the index is completed", flush=True, ) self.source_node = self.core.ffms2.Source(self.source_file) self.reference_source_file = self.core.ffms2.Source(self.source_file) print("Source index completed\n\nIndexing encode", flush=True) # define a path for encode index to go if self.index_dir: index_base_path = Path(self.index_dir) / Path(self.encode_file).name cache_path_enc = Path(str(index_base_path) + ".ffindex") else: cache_path_enc = Path(self.encode_file + ".ffindex") try: self.encode_node = self.core.ffms2.Source( self.encode_file, cachefile=cache_path_enc ) except vs.Error: cache_path_enc.unlink(missing_ok=True) self.encode_node = self.core.ffms2.Source( self.encode_file, cachefile=cache_path_enc ) print("Encode index completed", flush=True) def load_plugins(self): plugin_path = get_working_dir() / "img_plugins" if not plugin_path.is_dir() and not plugin_path.exists(): raise FrameForgeError("Can not detect plugin directory") else: for plugin in plugin_path.glob("*.dll"): self.core.std.LoadPlugin(Path(plugin).resolve())