diff --git a/dvr_scan/app/__main__.py b/dvr_scan/app/__main__.py index ec1e756..2c19780 100644 --- a/dvr_scan/app/__main__.py +++ b/dvr_scan/app/__main__.py @@ -12,14 +12,17 @@ import argparse import logging import sys -import typing as ty +import dvr_scan from dvr_scan import get_license_info from dvr_scan.app.application import Application -from dvr_scan.config import CHOICE_MAP -from dvr_scan.platform import init_logger +from dvr_scan.config import CHOICE_MAP, USER_CONFIG_FILE_PATH, ConfigLoadFailure, ConfigRegistry +from dvr_scan.shared import ScanSettings, init_logging from dvr_scan.shared.cli import VERSION_STRING, LicenseAction, VersionAction, string_type_check +logger = logging.getLogger("dvr_scan") + + EXIT_SUCCESS: int = 0 EXIT_ERROR: int = 1 @@ -32,6 +35,16 @@ def get_cli_parser(): if hasattr(parser, "_optionals"): parser._optionals.title = "arguments" + parser.add_argument( + "-c", + "--config", + metavar="settings.cfg", + type=str, + help=( + "Path to config file. If not set, tries to load one from %s" % (USER_CONFIG_FILE_PATH) + ), + ) + parser.add_argument( "-V", "--version", @@ -70,26 +83,57 @@ def get_cli_parser(): return parser -def _init_logging(args: ty.Optional[argparse.ArgumentParser]): - verbosity = logging.INFO - if args is not None and hasattr(args, "verbosity"): - verbosity = getattr(logging, args.verbosity.upper()) - - quiet_mode = False - if args is not None and hasattr(args, "quiet_mode"): - quiet_mode = args.quiet_mode - - init_logger( - log_level=verbosity, - show_stdout=not quiet_mode, - log_file=args.logfile if hasattr(args, "logfile") else None, - ) - - def main(): - args = get_cli_parser().parse_args() - _init_logging(args) - app = Application() + """Parse command line options and load config file settings.""" + + init_log = [] + config_load_error = None + failed_to_load_config = False + config = ConfigRegistry() + # Try to load config from user settings folder. + try: + user_config = ConfigRegistry() + user_config.load() + config = user_config + except ConfigLoadFailure as ex: + config_load_error = ex + + # Parse CLI args, override config if an override was specified on the command line. + try: + args = get_cli_parser().parse_args() + init_logging(args, config) + init_log += [(logging.INFO, "DVR-Scan Application %s" % dvr_scan.__version__)] + if config_load_error and not hasattr(args, "config"): + raise config_load_error + if hasattr(args, "config"): + config_setting = ConfigRegistry() + config_setting.load(args.config) + init_logging(args, config_setting) + config = config_setting + init_log += config.consume_init_log() + except ConfigLoadFailure as ex: + init_log += ex.init_log + if ex.reason is not None: + init_log += [(logging.ERROR, "Error: %s" % str(ex.reason).replace("\t", " "))] + failed_to_load_config = True + config_load_error = ex + finally: + for log_level, log_str in init_log: + logger.log(log_level, log_str) + if failed_to_load_config: + logger.critical("Failed to load config file.") + logger.debug("Error loading config file:", exc_info=config_load_error) + # Intentionally suppress the exception in release mode since we've already logged the + # failure reason to the user above. We can now exit with an error code. + return None # noqa: B012 + + if config.config_dict: + logger.debug("Loaded configuration:\n%s", str(config.config_dict)) + + logger.debug("Program arguments:\n%s", str(args)) + settings = ScanSettings(args=args, config=config) + + app = Application(settings=settings) app.run() sys.exit(EXIT_SUCCESS) diff --git a/dvr_scan/app/application.py b/dvr_scan/app/application.py index b1ecf73..7a37592 100644 --- a/dvr_scan/app/application.py +++ b/dvr_scan/app/application.py @@ -9,6 +9,7 @@ # LICENSE file, or visit one of the above pages for details. # +import copy import tkinter as tk import tkinter.messagebox import tkinter.scrolledtext @@ -22,6 +23,7 @@ from dvr_scan.app.common import register_icon from dvr_scan.app.scan_window import ScanWindow from dvr_scan.config import CONFIG_MAP +from dvr_scan.shared import ScanSettings WINDOW_TITLE = "DVR-Scan" @@ -51,6 +53,7 @@ SETTING_INPUT_WIDTH = 12 PATH_INPUT_WIDTH = 32 +MAX_KERNEL_SIZE = 21 class InputArea: @@ -169,7 +172,7 @@ def _on_use_regions(self): class SettingsArea: # TODO: make this less busy by making it a notebook widget that can also include the # output settings. Can also have an additional tab to load/save the various settings. - def __init__(self, root: tk.Widget): + def __init__(self, root: tk.Widget, settings: ScanSettings): self._root = root root.rowconfigure(0, pad=PADDING, weight=1) @@ -195,11 +198,21 @@ def __init__(self, root: tk.Widget): combo.grid(row=0, column=1, sticky=sticky) tk.Label(root, text="Kernel Size").grid(row=1, column=0, sticky=sticky) - self._kernel_size = tk.StringVar() - combo = ttk.Combobox(root, textvariable=self._kernel_size, width=SETTING_INPUT_WIDTH) - combo["values"] = ("Auto", "Off", "3x3", "5x5", "7x7", "9x9") - combo.state(["readonly"]) # TODO: Custom kernel sizes. - combo.grid(row=1, column=1, sticky=sticky) + + self._kernel_size = ttk.Combobox(root, width=SETTING_INPUT_WIDTH, state="readonly") + # 0: Auto + # 1: Off + # 2: 3x3 + # 3: 5x5 + # 4: 7x7 + # 5: 9x9... + self._kernel_size["values"] = ( + "Off", + "Auto", + *tuple(f"{n}x{n}" for n in range(3, MAX_KERNEL_SIZE + 1, 2)), + ) + self._kernel_size.grid(row=1, column=1, sticky=sticky) + self._kernel_size.current(1) tk.Label(root, text="Threshold").grid(row=2, column=0, sticky=sticky) self._threshold = tk.StringVar() @@ -260,11 +273,38 @@ def __init__(self, root: tk.Widget): self._update_default_state() + @property + def use_default(self) -> bool: + return self._default.get() + def _update_default_state(self): use_default = self._default.get() for child in self._root.winfo_children(): child.configure(state=tk.DISABLED if use_default else tk.NORMAL) self._default_button["state"] = tk.NORMAL + self._kernel_size["state"] = tk.DISABLED if use_default else "readonly" + + @property + def kernel_size(self) -> int: + index = self._kernel_size.current() + if index == 0: + return 0 + elif index == 1: + return -1 + else: + assert index > 0 + return (index * 2) - 1 + + @kernel_size.setter + def kernel_size(self, size): + # TODO: Handle this discrepency properly, we're clipping the user config right now. + if size > MAX_KERNEL_SIZE: + logger.warning("Kernel sizes above 21 are not supported yet, clipping to 21.") + kernel_size = min(size, MAX_KERNEL_SIZE) + auto_kernel = bool(kernel_size < 0) + none_kernel = bool(kernel_size == 0) + index = 0 if none_kernel else 1 if auto_kernel else (1 + (kernel_size // 2)) + self._kernel_size.current(index) class OutputArea: @@ -359,9 +399,10 @@ def enable(self): class Application: - def __init__(self): + def __init__(self, settings: ScanSettings): self._root = tk.Tk() self._root.withdraw() + self._settings: ScanSettings = None self._root.option_add("*tearOff", False) self._root.title(WINDOW_TITLE) @@ -381,34 +422,58 @@ def __init__(self): input_frame.grid(row=0, sticky=tk.NSEW, padx=PADDING, pady=(PADDING, 0)) settings_frame = ttk.Labelframe(self._root, text="Motion", padding=PADDING) - self._settings = SettingsArea(settings_frame) + self._settings_area = SettingsArea(settings_frame, settings) settings_frame.grid(row=1, sticky=tk.EW, padx=PADDING, pady=(PADDING, 0)) output_frame = ttk.Labelframe(self._root, text="Output", padding=PADDING) - self._output = OutputArea(output_frame) + self._output_area = OutputArea(output_frame) output_frame.grid(row=2, sticky=tk.EW, padx=PADDING, pady=(PADDING, 0)) scan_frame = ttk.Labelframe(self._root, text="Scan", padding=PADDING) - self._scan = ScanArea(self._root, scan_frame) + self._scan_area = ScanArea(self._root, scan_frame) scan_frame.grid(row=3, sticky=tk.EW, padx=PADDING, pady=PADDING) self._scan_window: ty.Optional[ScanWindow] = None self._root.bind("<>", lambda _: self._start_new_scan()) self._root.protocol("WM_DELETE_WINDOW", self._on_delete) + self._set_from(settings) + + def _set_from(self, settings: ScanSettings): + """Initialize UI from config file.""" + logger.debug("initializing UI state from settings") + self._settings = settings + + # Scan Area + self._settings_area.kernel_size = self._settings.get("kernel-size") + + def get_scan_settings(self) -> ScanSettings: + """Get current UI state as a new ScanSettings.""" + settings = copy.deepcopy(self._settings) + + # Scan Area + if not self._settings_area.use_default: + settings.set("kernel-size", self._settings_area.kernel_size) + + # HACK: Prevent output files. + settings.set("output-mode", "scan_only") + return settings + def _start_new_scan(self): assert self._scan_window is None + settings = self.get_scan_settings() + def on_scan_window_close(): logger.debug("scan window closed, removing window and restoring focus") self._scan_window = None - self._scan.enable() + self._scan_area.enable() self._root.deiconify() self._root.grab_set() self._root.focus() - self._scan.disable() - self._scan_window = ScanWindow(self._root, on_scan_window_close) + self._scan_window = ScanWindow(self._root, settings, on_scan_window_close) + self._scan_area.disable() self._root.grab_release() self._scan_window.show() diff --git a/dvr_scan/app/scan_window.py b/dvr_scan/app/scan_window.py index 9480cef..bb16af9 100644 --- a/dvr_scan/app/scan_window.py +++ b/dvr_scan/app/scan_window.py @@ -10,30 +10,34 @@ # import threading -import time import tkinter as tk import tkinter.messagebox as messagebox import tkinter.ttk as ttk import typing as ty from logging import getLogger +from dvr_scan.shared import ScanSettings, init_scanner + TITLE = "Scanning..." logger = getLogger("dvr_scan") class ScanWindow: - def __init__(self, root: tk.Tk, on_destroyed: ty.Callable[[], None]): + def __init__(self, root: tk.Tk, settings: ScanSettings, on_destroyed: ty.Callable[[], None]): self._root = tk.Toplevel(master=root) self._root.withdraw() self._root.title(TITLE) self._root.resizable(True, True) self._root.minsize(320, 240) + self._scanner = init_scanner(settings) + self._scanner.set_callbacks( + scan_started=self._on_scan_started, + processed_frame=self._on_processed_frame, + ) # Widgets self._scan_thread = threading.Thread(target=self._do_scan) - self._stop_scan = threading.Event() - self._paused = threading.Event() self._state_lock = threading.Lock() self._on_destroyed = on_destroyed @@ -44,8 +48,6 @@ def __init__(self, root: tk.Tk, on_destroyed: ty.Callable[[], None]): self._root.columnconfigure(0, weight=1) self._root.columnconfigure(1, weight=1) self._progress_bar.grid(sticky=tk.NSEW, row=0, columnspan=2) - self._root.bind("<>", lambda _: self._on_progress()) - self._root.bind("<>", lambda _: self._on_complete()) self._root.minsize(width=self._root.winfo_reqwidth(), height=self._root.winfo_reqheight()) self._root.bind("<>", self.stop) @@ -61,20 +63,39 @@ def __init__(self, root: tk.Tk, on_destroyed: ty.Callable[[], None]): self._prompt_shown = False + self._scan_started = threading.Event() + self._expected_num_frames = 0 + self._num_events = 0 + self._frames_processed = 0 + + def _update(self): + if self._scan_started.is_set(): + self._progress_bar = ttk.Progressbar( + self._root, variable=self._progress, maximum=self._expected_num_frames + ) + self._progress_bar.grid(sticky=tk.NSEW, row=0, columnspan=2) + self._scan_started.clear() + with self._state_lock: + self._progress.set(self._frames_processed) + self._root.after(100, self._update) + def _on_complete(self): logger.debug("scan completed") self._stop_button["state"] = tk.DISABLED self._close_button["state"] = tk.NORMAL def _destroy(self): - self._root.after(10, self.stop) + logger.debug("scheduling stop") with self._state_lock: + logger.debug("destroying root") self._root.destroy() self._root = None self._on_destroyed() + self.stop() + logger.debug("root destroyed") def prompt_stop(self): - if self._stop_scan.is_set(): + if self._scanner.is_stopped(): return if messagebox.askyesno( title="Stop scan?", @@ -84,34 +105,30 @@ def prompt_stop(self): self._destroy() def stop(self): - if not self._stop_scan.is_set(): - self._stop_scan.set() + if not self._scanner.is_stopped(): + logger.debug("stopping scan thread") + self._scanner.stop() self._scan_thread.join() - def _on_progress(self): - self._progress.set(self._progress.get() + 1) - def show(self): logger.debug("starting scan thread showing scan window.") self._scan_thread.start() self._root.focus() self._root.grab_set() self._root.deiconify() + self._root.after(10, self._update) self._root.wait_window() + def _on_scan_started(self, num_frames: int): + self._expected_num_frames = num_frames + self._scan_started.set() + + def _on_processed_frame(self, num_events: int): + # TODO: See if we can get the estimated time remaining from the tqdm progress bar. + with self._state_lock: + self._num_events = num_events + self._frames_processed += 1 + def _do_scan(self): - complete = True - for _ in range(10): - if self._stop_scan.is_set(): - complete = False - logger.debug("stop event set") - break - logger.debug("ping...") - with self._state_lock: - if self._root is None: - logger.debug("root was destroyed, bailing") - return - self._root.event_generate("<>") - time.sleep(0.3) - logger.debug("scan complete" if complete else "scan NOT complete") - self._root.event_generate("<>") + self._scanner.scan() + # TODO: Handle scan completion. diff --git a/dvr_scan/config.py b/dvr_scan/config.py index 6b3863a..f348acc 100644 --- a/dvr_scan/config.py +++ b/dvr_scan/config.py @@ -545,7 +545,7 @@ def is_default(self, option: str) -> bool: """True if the option is default, i.e. is NOT set by the user.""" return option not in self._config - def get_value( + def get( self, option: str, override: Optional[ConfigValue] = None, diff --git a/dvr_scan/controller.py b/dvr_scan/controller.py index c7d6b2d..55e11e6 100644 --- a/dvr_scan/controller.py +++ b/dvr_scan/controller.py @@ -13,7 +13,6 @@ This module manages the DVR-Scan program control flow, starting with `run_dvr_scan()`. """ -import argparse import glob import logging import time @@ -24,10 +23,8 @@ import dvr_scan from dvr_scan.cli import get_cli_parser from dvr_scan.config import ConfigLoadFailure, ConfigRegistry, RegionValueDeprecated -from dvr_scan.overlays import BoundingBoxOverlay, TextOverlay -from dvr_scan.platform import init_logger -from dvr_scan.scanner import DetectorType, MotionScanner, OutputMode -from dvr_scan.shared.settings import ScanSettings +from dvr_scan.scanner import DetectorType, OutputMode +from dvr_scan.shared import ScanSettings, init_logging, init_scanner logger = logging.getLogger("dvr_scan") @@ -67,26 +64,6 @@ def _preprocess_args(args): return True, args -def _init_logging(args: ty.Optional[argparse.ArgumentParser], config: ty.Optional[ScanSettings]): - verbosity = logging.INFO - if args is not None and hasattr(args, "verbosity"): - verbosity = getattr(logging, args.verbosity.upper()) - elif config is not None: - verbosity = getattr(logging, config.get_value("verbosity").upper()) - - quiet_mode = False - if args is not None and hasattr(args, "quiet_mode"): - quiet_mode = args.quiet_mode - elif config is not None: - quiet_mode = config.get_value("quiet-mode") - - init_logger( - log_level=verbosity, - show_stdout=not quiet_mode, - log_file=args.logfile if hasattr(args, "logfile") else None, - ) - - def parse_settings(args: ty.List[str] = None) -> ty.Optional[ScanSettings]: """Parse command line options and load config file settings.""" init_log = [] @@ -101,12 +78,12 @@ def parse_settings(args: ty.List[str] = None) -> ty.Optional[ScanSettings]: config = user_config except ConfigLoadFailure as ex: config_load_error = ex - _init_logging(args, config) + init_logging(args, config) # Parse CLI args, override config if an override was specified on the command line. try: args = get_cli_parser(config).parse_args(args=args) debug_mode = args.debug - _init_logging(args, config) + init_logging(args, config) init_log += [(logging.INFO, "DVR-Scan %s" % dvr_scan.__version__)] if config_load_error and not hasattr(args, "config"): raise config_load_error @@ -115,7 +92,7 @@ def parse_settings(args: ty.List[str] = None) -> ty.Optional[ScanSettings]: if hasattr(args, "config"): config_setting = ConfigRegistry() config_setting.load(args.config) - _init_logging(args, config_setting) + init_logging(args, config_setting) config = config_setting init_log += config.consume_init_log() except ConfigLoadFailure as ex: @@ -175,114 +152,13 @@ def run_dvr_scan( """Run DVR-Scan scanning logic using validated `settings` from `parse_settings()`.""" logger.info("Initializing scan context...") - scanner = MotionScanner( - input_videos=settings.get_arg("input"), - frame_skip=settings.get("frame-skip"), - show_progress=not settings.get("quiet-mode"), - debug_mode=settings.get("debug"), - ) - - output_mode = ( - OutputMode.SCAN_ONLY if settings.get_arg("scan-only") else settings.get("output-mode") - ) - scanner.set_output( - comp_file=settings.get_arg("output"), - mask_file=settings.get_arg("mask-output"), - output_mode=output_mode, - opencv_fourcc=settings.get("opencv-codec"), - ffmpeg_input_args=settings.get("ffmpeg-input-args"), - ffmpeg_output_args=settings.get("ffmpeg-output-args"), - output_dir=settings.get("output-dir"), - ) - - timecode_overlay = None - if settings.get("time-code"): - timecode_overlay = TextOverlay( - font_scale=settings.get("text-font-scale"), - margin=settings.get("text-margin"), - border=settings.get("text-border"), - thickness=settings.get("text-font-thickness"), - color=settings.get("text-font-color"), - bg_color=settings.get("text-bg-color"), - corner=TextOverlay.Corner.TopLeft, - ) - - metrics_overlay = None - if settings.get("frame-metrics"): - metrics_overlay = TextOverlay( - font_scale=settings.get("text-font-scale"), - margin=settings.get("text-margin"), - border=settings.get("text-border"), - thickness=settings.get("text-font-thickness"), - color=settings.get("text-font-color"), - bg_color=settings.get("text-bg-color"), - corner=TextOverlay.Corner.TopRight, - ) - - bounding_box = None - # bounding_box_arg will be None if -bb was not set, False if -bb was set without any args, - # otherwise it represents the desired smooth time. - bounding_box_arg = settings.get_arg("bounding-box") - if bounding_box_arg is not None or settings.get("bounding-box"): - if bounding_box_arg is not None and bounding_box_arg is not False: - smoothing_time = FrameTimecode(bounding_box_arg, scanner.framerate) - else: - smoothing_time = FrameTimecode( - settings.get("bounding-box-smooth-time"), scanner.framerate - ) - bounding_box = BoundingBoxOverlay( - min_size_ratio=settings.get("bounding-box-min-size"), - thickness_ratio=settings.get("bounding-box-thickness"), - color=settings.get("bounding-box-color"), - smoothing=smoothing_time.frame_num, - ) - - scanner.set_overlays( - timecode_overlay=timecode_overlay, - metrics_overlay=metrics_overlay, - bounding_box=bounding_box, - ) - - scanner.set_detection_params( - detector_type=DetectorType[settings.get("bg-subtractor").upper()], - threshold=settings.get("threshold"), - max_threshold=settings.get("max-threshold"), - variance_threshold=settings.get("variance-threshold"), - kernel_size=settings.get("kernel-size"), - downscale_factor=settings.get("downscale-factor"), - learning_rate=settings.get("learning-rate"), - ) - - scanner.set_event_params( - min_event_len=settings.get("min-event-length"), - time_pre_event=settings.get("time-before-event"), - time_post_event=settings.get("time-post-event"), - use_pts=settings.get("use-pts"), - ) - - scanner.set_thumbnail_params( - thumbnails=settings.get("thumbnails"), - ) - - scanner.set_video_time( - start_time=settings.get_arg("start-time"), - end_time=settings.get_arg("end-time"), - duration=settings.get_arg("duration"), - ) - - scanner.set_regions( - region_editor=settings.get("region-editor"), - regions=settings.get_arg("regions"), - load_region=settings.get("load-region"), - save_region=settings.get_arg("save-region"), - roi_deprecated=settings.get("region-of-interest"), - ) + scanner = init_scanner(settings) # Scan video for motion with specified parameters. processing_start = time.time() result = scanner.scan() if result is None: - logging.debug("Exiting early, scan() returned None.") + logger.debug("Exiting early, scan() returned None.") return processing_time = time.time() - processing_start @@ -327,5 +203,6 @@ def run_dvr_scan( # start1-end1[,[+]start2-end2[,[+]start3-end3...]] print(",".join(timecode_list)) - if output_mode != OutputMode.SCAN_ONLY: + # TODO: Fix private variable access. + if scanner._output_mode != OutputMode.SCAN_ONLY: logger.info("Motion events written to disk.") diff --git a/dvr_scan/scanner.py b/dvr_scan/scanner.py index 3174988..60a532d 100644 --- a/dvr_scan/scanner.py +++ b/dvr_scan/scanner.py @@ -20,6 +20,7 @@ import subprocess import sys import threading +import typing as ty from dataclasses import dataclass from enum import Enum from typing import Any, AnyStr, List, Optional, Tuple, Union @@ -267,6 +268,8 @@ def __init__( self._roi_deprecated = None # Input Video Parameters (set_video_time) + if not input_videos: + input_videos = ["tests/resources/simple_movement.mp4"] self._input: VideoJoiner = VideoJoiner(input_videos) # -i/--input self._frame_skip: int = frame_skip # -fs/--frame-skip self._start_time: FrameTimecode = None # -st/--start-time @@ -286,6 +289,10 @@ def __init__( self._highscore = 0 self._highframe = None + # Callbacks for UI integration + self._scan_started = None + self._processed_frame = None + # Make sure we initialize defaults now that we loaded the input videos. self.set_detection_params() self.set_event_params() @@ -584,6 +591,16 @@ def _handle_regions(self) -> bool: logger.debug("No regions selected.") return True + @property + def frames_remaining(self) -> int: + num_frames = self._input.total_frames + # Correct for end time. + if self._end_time and self._end_time.frame_num < num_frames: + num_frames = self._end_time.frame_num + # Correct for current seek position. + num_frames = max(0, num_frames - self._input.position.frame_num) + return num_frames + def _create_progress_bar(self) -> tqdm: num_frames = self._input.total_frames # Correct for end time. @@ -592,17 +609,27 @@ def _create_progress_bar(self) -> tqdm: # Correct for current seek position. num_frames = max(0, num_frames - self._input.position.frame_num) return tqdm( - total=num_frames, + total=self.frames_remaining, unit=" frames", desc=PROGRESS_BAR_DESCRIPTION % 0, dynamic_ncols=True, ) def stop(self): - """Stop the current scan call. This is the only thread-safe public method.""" + """Stop the current scan call. Thread-safe.""" self._stop.set() logger.debug("Stop event set.") + def is_stopped(self): + """Check if the current scan call was stopped, or `False` if one wasn't run. Thread-safe.""" + return self._stop.is_set() + + def set_callbacks( + self, scan_started: ty.Callable[[int], None], processed_frame: ty.Callable[[int], None] + ): + self._scan_started = scan_started + self._processed_frame = processed_frame + def scan(self) -> Optional[DetectionResult]: """Performs motion analysis on the MotionScanner's input video(s).""" self._stop.clear() @@ -709,8 +736,10 @@ def scan(self) -> Optional[DetectionResult]: if len(self._input.paths) > 1 else "input video", ) + logger.debug(f"output mode = {self._output_mode}") progress_bar = FakeTqdmObject() if not self._show_progress else self._create_progress_bar() + num_frames_to_process = self.frames_remaining decode_queue = queue.Queue(MAX_DECODE_QUEUE_SIZE) decode_thread = threading.Thread( @@ -728,8 +757,13 @@ def scan(self) -> Optional[DetectionResult]: ) encode_thread.start() + if self._scan_started: + self._scan_started(num_frames=num_frames_to_process) + # TODO: The main scanning loop should be refactored into a state machine. while not self._stop.is_set(): + if self._processed_frame: + self._processed_frame(num_events=len(event_list)) # Keep polling decode queue until it's empty (signaled via None). frame: Optional[DecodeEvent] = decode_queue.get() if frame is None: diff --git a/dvr_scan/shared/__init__.py b/dvr_scan/shared/__init__.py index 0c08f77..9b2da99 100644 --- a/dvr_scan/shared/__init__.py +++ b/dvr_scan/shared/__init__.py @@ -10,3 +10,145 @@ # """Business logic shared between the DVR-Scan CLI and the DVR-Scan GUI.""" + +import argparse +import logging +import typing as ty + +from scenedetect import FrameTimecode + +from dvr_scan.overlays import BoundingBoxOverlay, TextOverlay +from dvr_scan.platform import init_logger as _init_logger +from dvr_scan.scanner import DetectorType, MotionScanner, OutputMode +from dvr_scan.shared.settings import ScanSettings + +logger = logging.getLogger("dvr_scan") + + +def init_logging(args: ty.Optional[argparse.ArgumentParser], config: ty.Optional[ScanSettings]): + verbosity = logging.INFO + if args is not None and hasattr(args, "verbosity"): + verbosity = getattr(logging, args.verbosity.upper()) + elif config is not None: + verbosity = getattr(logging, config.get("verbosity").upper()) + + quiet_mode = False + if args is not None and hasattr(args, "quiet_mode"): + quiet_mode = args.quiet_mode + elif config is not None: + quiet_mode = config.get("quiet-mode") + + _init_logger( + log_level=verbosity, + show_stdout=not quiet_mode, + log_file=args.logfile if hasattr(args, "logfile") else None, + ) + + +def init_scanner( + settings: ScanSettings, +) -> MotionScanner: + logger.info("initializing motion scan") + scanner = MotionScanner( + input_videos=settings.get_arg("input"), + frame_skip=settings.get("frame-skip"), + show_progress=not settings.get("quiet-mode"), + # debug_mode=settings.get("debug"), + ) + + scanner.set_output( + comp_file=settings.get_arg("output"), + mask_file=settings.get_arg("mask-output"), + output_mode=OutputMode.SCAN_ONLY + if settings.get_arg("scan-only") + else settings.get("output-mode"), + opencv_fourcc=settings.get("opencv-codec"), + ffmpeg_input_args=settings.get("ffmpeg-input-args"), + ffmpeg_output_args=settings.get("ffmpeg-output-args"), + output_dir=settings.get("output-dir"), + ) + + timecode_overlay = None + if settings.get("time-code"): + timecode_overlay = TextOverlay( + font_scale=settings.get("text-font-scale"), + margin=settings.get("text-margin"), + border=settings.get("text-border"), + thickness=settings.get("text-font-thickness"), + color=settings.get("text-font-color"), + bg_color=settings.get("text-bg-color"), + corner=TextOverlay.Corner.TopLeft, + ) + + metrics_overlay = None + if settings.get("frame-metrics"): + metrics_overlay = TextOverlay( + font_scale=settings.get("text-font-scale"), + margin=settings.get("text-margin"), + border=settings.get("text-border"), + thickness=settings.get("text-font-thickness"), + color=settings.get("text-font-color"), + bg_color=settings.get("text-bg-color"), + corner=TextOverlay.Corner.TopRight, + ) + + bounding_box = None + # bounding_box_arg will be None if -bb was not set, False if -bb was set without any args, + # otherwise it represents the desired smooth time. + bounding_box_arg = settings.get_arg("bounding-box") + if bounding_box_arg is not None or settings.get("bounding-box"): + if bounding_box_arg is not None and bounding_box_arg is not False: + smoothing_time = FrameTimecode(bounding_box_arg, scanner.framerate) + else: + smoothing_time = FrameTimecode( + settings.get("bounding-box-smooth-time"), scanner.framerate + ) + bounding_box = BoundingBoxOverlay( + min_size_ratio=settings.get("bounding-box-min-size"), + thickness_ratio=settings.get("bounding-box-thickness"), + color=settings.get("bounding-box-color"), + smoothing=smoothing_time.frame_num, + ) + + scanner.set_overlays( + timecode_overlay=timecode_overlay, + metrics_overlay=metrics_overlay, + bounding_box=bounding_box, + ) + + scanner.set_detection_params( + detector_type=DetectorType[settings.get("bg-subtractor").upper()], + threshold=settings.get("threshold"), + max_threshold=settings.get("max-threshold"), + variance_threshold=settings.get("variance-threshold"), + kernel_size=settings.get("kernel-size"), + downscale_factor=settings.get("downscale-factor"), + learning_rate=settings.get("learning-rate"), + ) + + scanner.set_event_params( + min_event_len=settings.get("min-event-length"), + time_pre_event=settings.get("time-before-event"), + time_post_event=settings.get("time-post-event"), + use_pts=settings.get("use-pts"), + ) + + scanner.set_thumbnail_params( + thumbnails=settings.get("thumbnails"), + ) + + scanner.set_video_time( + start_time=settings.get_arg("start-time"), + end_time=settings.get_arg("end-time"), + duration=settings.get_arg("duration"), + ) + + scanner.set_regions( + region_editor=settings.get("region-editor"), + regions=settings.get_arg("regions"), + load_region=settings.get("load-region"), + save_region=settings.get_arg("save-region"), + roi_deprecated=settings.get("region-of-interest"), + ) + + return scanner diff --git a/dvr_scan/shared/settings.py b/dvr_scan/shared/settings.py index 31f18bc..e89b04b 100644 --- a/dvr_scan/shared/settings.py +++ b/dvr_scan/shared/settings.py @@ -22,6 +22,7 @@ class ScanSettings: def __init__(self, args: argparse.Namespace, config: ConfigRegistry): self._args = args self._config = config + self._app_settings = dict() @property def config(self) -> ConfigRegistry: @@ -39,7 +40,13 @@ def get(self, option: str) -> ty.Union[str, int, float, bool]: the dvr-scan.cfg file in the user's settings folder). 3. Default value specified in the config map (`dvr_scan.config.CONFIG_MAP`). """ + if option in self._app_settings: + return self._app_settings[option] arg_val = self.get_arg(option) if arg_val is not None: return arg_val - return self.config.get_value(option) + return self.config.get(option) + + def set(self, option: str, value: ty.Union[str, int, float, bool]): + """Set application overrides for any setting.""" + self._app_settings[option] = value