From ce493410fddf9b3cd7fa3dae1797092e61257586 Mon Sep 17 00:00:00 2001 From: Breakthrough Date: Sat, 16 Nov 2024 00:52:00 -0500 Subject: [PATCH] [save-images] Make all threads exception-safe Ensure errors are re-raised safely from worker threads by using non-blocking puts and monitoring a common error queue. --- scenedetect/__init__.py | 184 ++++++++++++++++++++--------------- scenedetect/scene_manager.py | 1 - 2 files changed, 103 insertions(+), 82 deletions(-) diff --git a/scenedetect/__init__.py b/scenedetect/__init__.py index 4f8a6836..abbfa97f 100644 --- a/scenedetect/__init__.py +++ b/scenedetect/__init__.py @@ -17,6 +17,7 @@ import math import queue +import sys import threading import typing as ty from dataclasses import dataclass @@ -183,46 +184,7 @@ def detect( scene_manager.stats_manager.save_to_csv(csv_file=stats_file_path) return scene_manager.get_scene_list(start_in_scene=start_in_scene) - -# TODO: Just merge these variables into the extractor. -@dataclass -class ImageExtractorConfig: - num_images: int = 3 - """Number of images to generate for each scene. Minimum is 1.""" - frame_margin: int = 1 - """Number of frames to pad each scene around the beginning - and end (e.g. moves the first/last image into the scene by N frames). - Can set to 0, but will result in some video files failing to extract - the very last frame.""" - image_extension: str = "jpg" - """Type of image to save (must be one of 'jpg', 'png', or 'webp').""" - encoder_param: int = 95 - """Quality/compression efficiency, based on type of image: - 'jpg' / 'webp': Quality 0-100, higher is better quality. 100 is lossless for webp. - 'png': Compression from 1-9, where 9 achieves best filesize but is slower to encode.""" - image_name_template: str = "$VIDEO_NAME-Scene-$SCENE_NUMBER-$IMAGE_NUMBER" - """Template to use for naming image files. Can use the template variables - $VIDEO_NAME, $SCENE_NUMBER, $IMAGE_NUMBER, $TIMECODE, $FRAME_NUMBER, $TIMESTAMP_MS. - Should not include an extension.""" - scale: ty.Optional[float] = None - """Optional factor by which to rescale saved images. A scaling factor of 1 would - not result in rescaling. A value < 1 results in a smaller saved image, while a - value > 1 results in an image larger than the original. This value is ignored if - either the height or width values are specified.""" - height: ty.Optional[int] = None - """Optional value for the height of the saved images. Specifying both the height - and width will resize images to an exact size, regardless of aspect ratio. - Specifying only height will rescale the image to that number of pixels in height - while preserving the aspect ratio.""" - width: ty.Optional[int] = None - """Optional value for the width of the saved images. Specifying both the width - and height will resize images to an exact size, regardless of aspect ratio. - Specifying only width will rescale the image to that number of pixels wide - while preserving the aspect ratio.""" - interpolation: Interpolation = Interpolation.CUBIC - """Type of interpolation to use when resizing images.""" - - +# TODO(v1.0): Move post-processing functions into separate submodule. class ImageExtractor: def __init__( self, @@ -236,7 +198,10 @@ def __init__( width: ty.Optional[int] = None, interpolation: Interpolation = Interpolation.CUBIC, ): - """Helper type to handle saving images for a set of scenes. This object is *not* thread-safe. + """Multi-threaded implementation of save-images functionality. Uses background threads to + handle image encoding and saving images to disk to improve parallelism. + + This object is thread-safe. Arguments: num_images: Number of images to generate for each scene. Minimum is 1. @@ -275,6 +240,7 @@ def __init__( self._width = width self._interpolation = interpolation + def run( self, video: VideoStream, @@ -282,6 +248,14 @@ def run( output_dir: ty.Optional[str] = None, show_progress=False, ) -> ty.Dict[int, ty.List[str]]: + """Run image extraction on `video` using the current parameters. Thread-safe. + + Arguments: + video: The video to process. + scene_list: The scenes detected in the video. + output_dir: Directory to write files to. + show_progress: If `true` and tqdm is available, shows a progress bar. + """ if not scene_list: return {} if self._num_images <= 0 or self._frame_margin < 0: @@ -300,62 +274,108 @@ def run( total=len(scene_list) * self._num_images, unit="images", dynamic_ncols=True ) + timecode_list = self.generate_timecode_list(scene_list) + image_filenames = {i: [] for i in range(len(timecode_list))} + filename_template = Template(self._image_name_template) + logger.debug("Writing images with template %s", filename_template.template) scene_num_format = "%0" scene_num_format += str(max(3, math.floor(math.log(len(scene_list), 10)) + 1)) + "d" image_num_format = "%0" image_num_format += str(math.floor(math.log(self._num_images, 10)) + 2) + "d" - timecode_list = self.generate_timecode_list(scene_list) - image_filenames = {i: [] for i in range(len(timecode_list))} - logger.debug("Writing images with template %s", filename_template.template) + def format_filename(scene_number: int, image_number: int, image_timecode: FrameTimecode): + return "%s.%s" % ( + filename_template.safe_substitute( + VIDEO_NAME=video.name, + SCENE_NUMBER=scene_num_format % (scene_number + 1), + IMAGE_NUMBER=image_num_format % (image_number + 1), + FRAME_NUMBER=image_timecode.get_frames(), + TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000), + TIMECODE=image_timecode.get_timecode().replace(":", ";"), + ), + self._image_extension, + ) MAX_QUEUED_ENCODE_FRAMES = 4 MAX_QUEUED_SAVE_IMAGES = 4 encode_queue = queue.Queue(MAX_QUEUED_ENCODE_FRAMES) save_queue = queue.Queue(MAX_QUEUED_SAVE_IMAGES) - encode_thread = threading.Thread( - target=self._image_encode_thread, - args=(video, encode_queue, save_queue, self._image_extension), - daemon=True, - ) - save_thread = threading.Thread( - target=self._save_files_thread, - args=(save_queue, progress_bar), - daemon=True, + error_queue = queue.Queue(2) # Queue size must be the same as the # of worker threads! + + def check_error_queue(): + try: + return error_queue.get(block=False) + except queue.Empty: + pass + return None + + def launch_thread(callable, *args, **kwargs): + def capture_errors(callable, *args, **kwargs): + try: + return callable(*args, **kwargs) + # Errors we capture in `error_queue` will be re-raised by this thread. + except: # noqa: E722 + error_queue.put(sys.exc_info()) + return None + + thread = threading.Thread( + target=capture_errors, + args=( + callable, + *args, + ), + kwargs=kwargs, + daemon=True, + ) + thread.start() + return thread + + def checked_put(work_queue: queue.Queue, item: ty.Any): + error = None + while True: + try: + work_queue.put(item, timeout=0.1) + break + except queue.Full: + error = check_error_queue() + if error is None: + continue + if error is not None: + raise error[1].with_traceback(error[2]) + + encode_thread = launch_thread( + self._encode_images, + video, + encode_queue, + save_queue, + self._image_extension, ) - encode_thread.start() - save_thread.start() + save_thread = launch_thread(self._save_images, save_queue, progress_bar) for i, scene_timecodes in enumerate(timecode_list): - for j, image_timecode in enumerate(scene_timecodes): - video.seek(image_timecode) + for j, timecode in enumerate(scene_timecodes): + video.seek(timecode) frame_im = video.read() if frame_im is not None and frame_im is not False: - # TODO: Add extension to template. - # TODO: Allow NUM to be a valid suffix in addition to NUMBER. - file_path = "%s.%s" % ( - filename_template.safe_substitute( - VIDEO_NAME=video.name, - SCENE_NUMBER=scene_num_format % (i + 1), - IMAGE_NUMBER=image_num_format % (j + 1), - FRAME_NUMBER=image_timecode.get_frames(), - TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000), - TIMECODE=image_timecode.get_timecode().replace(":", ";"), - ), - self._image_extension, - ) + file_path = format_filename(i, j, timecode) image_filenames[i].append(file_path) - encode_queue.put((frame_im, get_and_create_path(file_path, output_dir))) + checked_put( + encode_queue, (frame_im, get_and_create_path(file_path, output_dir)) + ) else: completed = False break - # *WARNING*: We do not handle errors or exceptions yet, and this can deadlock on errors! - encode_queue.put((None, None)) - save_queue.put((None, None)) + checked_put(encode_queue, (None, None)) + checked_put(save_queue, (None, None)) encode_thread.join() save_thread.join() + + error = check_error_queue() + if error is not None: + raise error[1].with_traceback(error[2]) + if progress_bar is not None: progress_bar.close() if not completed: @@ -363,7 +383,7 @@ def run( return image_filenames - def _image_encode_thread( + def _encode_images( self, video: VideoStream, encode_queue: queue.Queue, @@ -393,7 +413,7 @@ def _image_encode_thread( continue save_queue.put((encoded, dest_path)) - def _save_files_thread(self, save_queue: queue.Queue, progress_bar: tqdm): + def _save_images(self, save_queue: queue.Queue, progress_bar: tqdm): while True: encoded, dest_path = save_queue.get() if encoded is None: @@ -457,14 +477,16 @@ def resize_image( image = cv2.resize( image, (0, 0), fx=aspect_ratio, fy=1.0, interpolation=self._interpolation.value ) + # Figure out what kind of resizing needs to be done + width = self._width + height = self._height image_height = image.shape[0] image_width = image.shape[1] - # Figure out what kind of resizing needs to be done - if self._height or self._width: - if self._height and not self._width: - factor = self._height / float(image_height) + if width or height: + if height and not width: + factor = height / float(image_height) width = int(factor * image_width) - if self._width and not self._height: + if width and not height: factor = width / float(image_width) height = int(factor * image_height) assert height > 0 and width > 0 diff --git a/scenedetect/scene_manager.py b/scenedetect/scene_manager.py index d8489d5b..58bafba7 100644 --- a/scenedetect/scene_manager.py +++ b/scenedetect/scene_manager.py @@ -86,7 +86,6 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int): import sys import threading from enum import Enum -from pathlib import Path from typing import Callable, Dict, List, Optional, TextIO, Tuple, Union import cv2