Skip to content

Commit

Permalink
[save-images] Make all threads exception-safe
Browse files Browse the repository at this point in the history
Ensure errors are re-raised safely from worker threads by using non-blocking
puts and monitoring a common error queue.
  • Loading branch information
Breakthrough committed Nov 17, 2024
1 parent 86b31aa commit ce49341
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 82 deletions.
184 changes: 103 additions & 81 deletions scenedetect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import math
import queue
import sys
import threading
import typing as ty
from dataclasses import dataclass
Expand Down Expand Up @@ -183,46 +184,7 @@ def detect(
scene_manager.stats_manager.save_to_csv(csv_file=stats_file_path)
return scene_manager.get_scene_list(start_in_scene=start_in_scene)


# TODO: Just merge these variables into the extractor.
@dataclass
class ImageExtractorConfig:
num_images: int = 3
"""Number of images to generate for each scene. Minimum is 1."""
frame_margin: int = 1
"""Number of frames to pad each scene around the beginning
and end (e.g. moves the first/last image into the scene by N frames).
Can set to 0, but will result in some video files failing to extract
the very last frame."""
image_extension: str = "jpg"
"""Type of image to save (must be one of 'jpg', 'png', or 'webp')."""
encoder_param: int = 95
"""Quality/compression efficiency, based on type of image:
'jpg' / 'webp': Quality 0-100, higher is better quality. 100 is lossless for webp.
'png': Compression from 1-9, where 9 achieves best filesize but is slower to encode."""
image_name_template: str = "$VIDEO_NAME-Scene-$SCENE_NUMBER-$IMAGE_NUMBER"
"""Template to use for naming image files. Can use the template variables
$VIDEO_NAME, $SCENE_NUMBER, $IMAGE_NUMBER, $TIMECODE, $FRAME_NUMBER, $TIMESTAMP_MS.
Should not include an extension."""
scale: ty.Optional[float] = None
"""Optional factor by which to rescale saved images. A scaling factor of 1 would
not result in rescaling. A value < 1 results in a smaller saved image, while a
value > 1 results in an image larger than the original. This value is ignored if
either the height or width values are specified."""
height: ty.Optional[int] = None
"""Optional value for the height of the saved images. Specifying both the height
and width will resize images to an exact size, regardless of aspect ratio.
Specifying only height will rescale the image to that number of pixels in height
while preserving the aspect ratio."""
width: ty.Optional[int] = None
"""Optional value for the width of the saved images. Specifying both the width
and height will resize images to an exact size, regardless of aspect ratio.
Specifying only width will rescale the image to that number of pixels wide
while preserving the aspect ratio."""
interpolation: Interpolation = Interpolation.CUBIC
"""Type of interpolation to use when resizing images."""


# TODO(v1.0): Move post-processing functions into separate submodule.
class ImageExtractor:
def __init__(
self,
Expand All @@ -236,7 +198,10 @@ def __init__(
width: ty.Optional[int] = None,
interpolation: Interpolation = Interpolation.CUBIC,
):
"""Helper type to handle saving images for a set of scenes. This object is *not* thread-safe.
"""Multi-threaded implementation of save-images functionality. Uses background threads to
handle image encoding and saving images to disk to improve parallelism.
This object is thread-safe.
Arguments:
num_images: Number of images to generate for each scene. Minimum is 1.
Expand Down Expand Up @@ -275,13 +240,22 @@ def __init__(
self._width = width
self._interpolation = interpolation


def run(
self,
video: VideoStream,
scene_list: SceneList,
output_dir: ty.Optional[str] = None,
show_progress=False,
) -> ty.Dict[int, ty.List[str]]:
"""Run image extraction on `video` using the current parameters. Thread-safe.
Arguments:
video: The video to process.
scene_list: The scenes detected in the video.
output_dir: Directory to write files to.
show_progress: If `true` and tqdm is available, shows a progress bar.
"""
if not scene_list:
return {}
if self._num_images <= 0 or self._frame_margin < 0:
Expand All @@ -300,70 +274,116 @@ def run(
total=len(scene_list) * self._num_images, unit="images", dynamic_ncols=True
)

timecode_list = self.generate_timecode_list(scene_list)
image_filenames = {i: [] for i in range(len(timecode_list))}

filename_template = Template(self._image_name_template)
logger.debug("Writing images with template %s", filename_template.template)
scene_num_format = "%0"
scene_num_format += str(max(3, math.floor(math.log(len(scene_list), 10)) + 1)) + "d"
image_num_format = "%0"
image_num_format += str(math.floor(math.log(self._num_images, 10)) + 2) + "d"

timecode_list = self.generate_timecode_list(scene_list)
image_filenames = {i: [] for i in range(len(timecode_list))}
logger.debug("Writing images with template %s", filename_template.template)
def format_filename(scene_number: int, image_number: int, image_timecode: FrameTimecode):
return "%s.%s" % (
filename_template.safe_substitute(
VIDEO_NAME=video.name,
SCENE_NUMBER=scene_num_format % (scene_number + 1),
IMAGE_NUMBER=image_num_format % (image_number + 1),
FRAME_NUMBER=image_timecode.get_frames(),
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
),
self._image_extension,
)

MAX_QUEUED_ENCODE_FRAMES = 4
MAX_QUEUED_SAVE_IMAGES = 4
encode_queue = queue.Queue(MAX_QUEUED_ENCODE_FRAMES)
save_queue = queue.Queue(MAX_QUEUED_SAVE_IMAGES)
encode_thread = threading.Thread(
target=self._image_encode_thread,
args=(video, encode_queue, save_queue, self._image_extension),
daemon=True,
)
save_thread = threading.Thread(
target=self._save_files_thread,
args=(save_queue, progress_bar),
daemon=True,
error_queue = queue.Queue(2) # Queue size must be the same as the # of worker threads!

def check_error_queue():
try:
return error_queue.get(block=False)
except queue.Empty:
pass
return None

def launch_thread(callable, *args, **kwargs):
def capture_errors(callable, *args, **kwargs):
try:
return callable(*args, **kwargs)
# Errors we capture in `error_queue` will be re-raised by this thread.
except: # noqa: E722

Check notice

Code scanning / CodeQL

Except block handles 'BaseException' Note

Except block directly handles BaseException.
error_queue.put(sys.exc_info())
return None

thread = threading.Thread(
target=capture_errors,
args=(
callable,
*args,
),
kwargs=kwargs,
daemon=True,
)
thread.start()
return thread

def checked_put(work_queue: queue.Queue, item: ty.Any):
error = None
while True:
try:
work_queue.put(item, timeout=0.1)
break
except queue.Full:
error = check_error_queue()
if error is None:
continue
if error is not None:
raise error[1].with_traceback(error[2])

encode_thread = launch_thread(
self._encode_images,
video,
encode_queue,
save_queue,
self._image_extension,
)
encode_thread.start()
save_thread.start()
save_thread = launch_thread(self._save_images, save_queue, progress_bar)

for i, scene_timecodes in enumerate(timecode_list):
for j, image_timecode in enumerate(scene_timecodes):
video.seek(image_timecode)
for j, timecode in enumerate(scene_timecodes):
video.seek(timecode)
frame_im = video.read()
if frame_im is not None and frame_im is not False:
# TODO: Add extension to template.
# TODO: Allow NUM to be a valid suffix in addition to NUMBER.
file_path = "%s.%s" % (
filename_template.safe_substitute(
VIDEO_NAME=video.name,
SCENE_NUMBER=scene_num_format % (i + 1),
IMAGE_NUMBER=image_num_format % (j + 1),
FRAME_NUMBER=image_timecode.get_frames(),
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
),
self._image_extension,
)
file_path = format_filename(i, j, timecode)
image_filenames[i].append(file_path)
encode_queue.put((frame_im, get_and_create_path(file_path, output_dir)))
checked_put(
encode_queue, (frame_im, get_and_create_path(file_path, output_dir))
)
else:
completed = False
break

# *WARNING*: We do not handle errors or exceptions yet, and this can deadlock on errors!
encode_queue.put((None, None))
save_queue.put((None, None))
checked_put(encode_queue, (None, None))
checked_put(save_queue, (None, None))
encode_thread.join()
save_thread.join()

error = check_error_queue()
if error is not None:
raise error[1].with_traceback(error[2])

if progress_bar is not None:
progress_bar.close()
if not completed:
logger.error("Could not generate all output images.")

return image_filenames

def _image_encode_thread(
def _encode_images(
self,
video: VideoStream,
encode_queue: queue.Queue,
Expand Down Expand Up @@ -393,7 +413,7 @@ def _image_encode_thread(
continue
save_queue.put((encoded, dest_path))

def _save_files_thread(self, save_queue: queue.Queue, progress_bar: tqdm):
def _save_images(self, save_queue: queue.Queue, progress_bar: tqdm):
while True:
encoded, dest_path = save_queue.get()
if encoded is None:
Expand Down Expand Up @@ -457,14 +477,16 @@ def resize_image(
image = cv2.resize(
image, (0, 0), fx=aspect_ratio, fy=1.0, interpolation=self._interpolation.value
)
# Figure out what kind of resizing needs to be done
width = self._width
height = self._height
image_height = image.shape[0]
image_width = image.shape[1]
# Figure out what kind of resizing needs to be done
if self._height or self._width:
if self._height and not self._width:
factor = self._height / float(image_height)
if width or height:
if height and not width:
factor = height / float(image_height)
width = int(factor * image_width)
if self._width and not self._height:
if width and not height:
factor = width / float(image_width)
height = int(factor * image_height)
assert height > 0 and width > 0
Expand Down
1 change: 0 additions & 1 deletion scenedetect/scene_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def on_new_scene(frame_img: numpy.ndarray, frame_num: int):
import sys
import threading
from enum import Enum
from pathlib import Path
from typing import Callable, Dict, List, Optional, TextIO, Tuple, Union

import cv2
Expand Down

0 comments on commit ce49341

Please sign in to comment.