Skip to content

Commit

Permalink
[save-images] Make all threads exception-safe
Browse files Browse the repository at this point in the history
Ensure errors are re-raised safely from worker threads by using non-blocking
puts and monitoring a common error queue.
  • Loading branch information
Breakthrough committed Nov 16, 2024
1 parent 86b31aa commit 921f535
Showing 1 changed file with 81 additions and 35 deletions.
116 changes: 81 additions & 35 deletions scenedetect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import math
import queue
import sys
import threading
import typing as ty
from dataclasses import dataclass
Expand Down Expand Up @@ -300,70 +301,115 @@ def run(
total=len(scene_list) * self._num_images, unit="images", dynamic_ncols=True
)

timecode_list = self.generate_timecode_list(scene_list)
image_filenames = {i: [] for i in range(len(timecode_list))}

filename_template = Template(self._image_name_template)
logger.debug("Writing images with template %s", filename_template.template)
scene_num_format = "%0"
scene_num_format += str(max(3, math.floor(math.log(len(scene_list), 10)) + 1)) + "d"
image_num_format = "%0"
image_num_format += str(math.floor(math.log(self._num_images, 10)) + 2) + "d"

timecode_list = self.generate_timecode_list(scene_list)
image_filenames = {i: [] for i in range(len(timecode_list))}
logger.debug("Writing images with template %s", filename_template.template)
def format_filename(scene_number: int, image_number: int, image_timecode: FrameTimecode):
return "%s.%s" % (
filename_template.safe_substitute(
VIDEO_NAME=video.name,
SCENE_NUMBER=scene_num_format % (scene_number + 1),
IMAGE_NUMBER=image_num_format % (image_number + 1),
FRAME_NUMBER=image_timecode.get_frames(),
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
),
self._image_extension,
)

MAX_QUEUED_ENCODE_FRAMES = 4
MAX_QUEUED_SAVE_IMAGES = 4
encode_queue = queue.Queue(MAX_QUEUED_ENCODE_FRAMES)
save_queue = queue.Queue(MAX_QUEUED_SAVE_IMAGES)
encode_thread = threading.Thread(
target=self._image_encode_thread,
args=(video, encode_queue, save_queue, self._image_extension),
daemon=True,
)
save_thread = threading.Thread(
target=self._save_files_thread,
args=(save_queue, progress_bar),
daemon=True,
error_queue = queue.Queue(2) # Queue size must be the same as the # of worker threads!

def check_error_queue():
try:
return error_queue.get(block=False)
except queue.Empty:
pass
return None

def launch_thread(callable, *args, **kwargs):
def capture_errors(callable, *args, **kwargs):

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
try:
return callable(*args, **kwargs)
# Errors we capture in `error_queue` will be re-raised by this thread.
except: # noqa: E722

Check notice

Code scanning / CodeQL

Except block handles 'BaseException' Note

Except block directly handles BaseException.
error_queue.put(sys.exc_info())

thread = threading.Thread(
target=capture_errors,
args=(
callable,
*args,
),
kwargs=kwargs,
daemon=True,
)
thread.start()
return thread

def checked_put(work_queue: queue.Queue, item: ty.Any):
error = None
while True:
try:
work_queue.put(item, timeout=0.1)
break
except queue.Full:
error = check_error_queue()
if error is None:
continue
if error is not None:
raise error[1].with_traceback(error[2])

encode_thread = launch_thread(
self._encode_images,
video,
encode_queue,
save_queue,
self._image_extension,
)
encode_thread.start()
save_thread.start()
save_thread = launch_thread(self._save_images, save_queue, progress_bar)

for i, scene_timecodes in enumerate(timecode_list):
for j, image_timecode in enumerate(scene_timecodes):
video.seek(image_timecode)
for j, timecode in enumerate(scene_timecodes):
video.seek(timecode)
frame_im = video.read()
if frame_im is not None and frame_im is not False:
# TODO: Add extension to template.
# TODO: Allow NUM to be a valid suffix in addition to NUMBER.
file_path = "%s.%s" % (
filename_template.safe_substitute(
VIDEO_NAME=video.name,
SCENE_NUMBER=scene_num_format % (i + 1),
IMAGE_NUMBER=image_num_format % (j + 1),
FRAME_NUMBER=image_timecode.get_frames(),
TIMESTAMP_MS=int(image_timecode.get_seconds() * 1000),
TIMECODE=image_timecode.get_timecode().replace(":", ";"),
),
self._image_extension,
)
file_path = format_filename(i, j, timecode)
image_filenames[i].append(file_path)
encode_queue.put((frame_im, get_and_create_path(file_path, output_dir)))
checked_put(
encode_queue, (frame_im, get_and_create_path(file_path, output_dir))
)
else:
completed = False
break

# *WARNING*: We do not handle errors or exceptions yet, and this can deadlock on errors!
encode_queue.put((None, None))
save_queue.put((None, None))
checked_put(encode_queue, (None, None))
checked_put(save_queue, (None, None))
encode_thread.join()
save_thread.join()

error = check_error_queue()
if error is not None:
raise error[1].with_traceback(error[2])

if progress_bar is not None:
progress_bar.close()
if not completed:
logger.error("Could not generate all output images.")

return image_filenames

def _image_encode_thread(
def _encode_images(
self,
video: VideoStream,
encode_queue: queue.Queue,
Expand Down Expand Up @@ -393,7 +439,7 @@ def _image_encode_thread(
continue
save_queue.put((encoded, dest_path))

def _save_files_thread(self, save_queue: queue.Queue, progress_bar: tqdm):
def _save_images(self, save_queue: queue.Queue, progress_bar: tqdm):
while True:
encoded, dest_path = save_queue.get()
if encoded is None:
Expand Down

0 comments on commit 921f535

Please sign in to comment.