diff --git a/.github/workflows/check-code-format.yml b/.github/workflows/check-code-format.yml index 6336df0..fa1b4a9 100644 --- a/.github/workflows/check-code-format.yml +++ b/.github/workflows/check-code-format.yml @@ -1,15 +1,13 @@ -# Use YAPF to check DVR-Scan formatting (run `yapf -i -r dvr_scan tests` locally to fix) -name: Check Code Format +# Check DVR-Scan code lint warnings and formatting. +name: Static Analysis on: pull_request: paths: - - .style.yapf - dvr_scan/** - tests/** push: paths: - - .style.yapf - dvr_scan/** - tests/** @@ -25,14 +23,21 @@ jobs: with: python-version: '3.13' - - name: Update pip - run: python -m pip install --upgrade pip - - name: Install yapf - run: python -m pip install --upgrade yapf toml - - name: Install DVR-Scan Dependencies - run: python -m pip install -r requirements_headless.txt + - name: Install Dependencies + run: | + python -m pip install --upgrade pip + python -m pip install --upgrade yapf toml + python -m pip install -r requirements_headless.txt - - name: Check Code Format (dvr_scan) - run: python -m yapf --diff --recursive dvr_scan/ - - name: Check Code Format (tests) - run: python -m yapf --diff --recursive tests/ + - name: Check Code Format (yapf) + if: ${{ hashFiles('.style.yapf') != '' }} + run: | + python -m yapf --diff --recursive dvr_scan/ + python -m yapf --diff --recursive tests/ + + - name: Static Analysis (ruff) + if: ${{ hashFiles('.style.yapf') == '' }} + run: | + python -m pip install --upgrade ruff + python -m ruff check + python -m ruff format --check diff --git a/.style.yapf b/.style.yapf deleted file mode 100644 index 9f089c5..0000000 --- a/.style.yapf +++ /dev/null @@ -1,6 +0,0 @@ -[style] -based_on_style = yapf -spaces_before_comment = 15, 20 -indent_width = 4 -split_before_logical_operator = true -column_limit = 100 diff --git a/dist/post_release.py b/dist/post_release.py index 0e12561..1b030df 100644 --- a/dist/post_release.py +++ b/dist/post_release.py @@ -1,8 +1,8 @@ -# -*- coding: utf-8 -*- import glob import os import shutil import sys + sys.path.append(os.path.abspath(".")) @@ -11,39 +11,39 @@ # TODO: See if some these can be excluded in the .spec file. DIRECTORY_GLOBS = [ - 'altgraph-*.dist-info', - 'certifi', - 'imageio', - 'imageio_ffmpeg', - 'importlib_metadata-*.dist-info', - 'matplotlib', - 'PIL', - 'PyQt5', - 'pip-*.dist-info', - 'psutil', - 'pyinstaller-*.dist-info', - 'setuptools-*.dist-info', - 'tcl8', - 'wheel-*.dist-info', - 'wx', + "altgraph-*.dist-info", + "certifi", + "imageio", + "imageio_ffmpeg", + "importlib_metadata-*.dist-info", + "matplotlib", + "PIL", + "PyQt5", + "pip-*.dist-info", + "psutil", + "pyinstaller-*.dist-info", + "setuptools-*.dist-info", + "tcl8", + "wheel-*.dist-info", + "wx", ] FILE_GLOBS = [ - '_bz2.pyd', - '_decimal.pyd', - '_elementtree.pyd', - '_hashlib.pyd', - '_lzma.pyd', - '_multiprocessing.pyd', - 'd3dcompiler*.dll', - 'kiwisolver.*.pyd', - 'libopenblas64_*', # There seems to be a second copy of this currently. - 'libEGL.dll', - 'libGLESv2.dll', - 'opengl32sw.dll', - 'Qt5*.dll', - 'wxbase*.dll', - 'wxmsw315u*.dll', + "_bz2.pyd", + "_decimal.pyd", + "_elementtree.pyd", + "_hashlib.pyd", + "_lzma.pyd", + "_multiprocessing.pyd", + "d3dcompiler*.dll", + "kiwisolver.*.pyd", + "libopenblas64_*", # There seems to be a second copy of this currently. + "libEGL.dll", + "libGLESv2.dll", + "opengl32sw.dll", + "Qt5*.dll", + "wxbase*.dll", + "wxmsw315u*.dll", ] for dir_glob in DIRECTORY_GLOBS: @@ -63,10 +63,9 @@ os.rmdir(os.path.join(BASE_PATH, "dvr-scan")) - def write_version_file(): - import dvr_scan + VERSION = dvr_scan.__version__ with open("dist/.version_info", "wb") as f: @@ -78,7 +77,8 @@ def write_version_file(): else: (maj, min, pat) = int(v[0]), int(v[1]), 0 - f.write(f"""# UTF-8 + f.write( + f"""# UTF-8 # # For more details about fixed file info 'ffi' see: # http://msdn.microsoft.com/en-us/library/ms646997.aspx @@ -121,4 +121,5 @@ def write_version_file(): VarFileInfo([VarStruct(u'Translation', [1033, 1200])]) ] ) -""".encode()) +""".encode() + ) diff --git a/dist/pre_release.py b/dist/pre_release.py index f45a653..d9cd6f2 100644 --- a/dist/pre_release.py +++ b/dist/pre_release.py @@ -1,9 +1,10 @@ -# -*- coding: utf-8 -*- import os import sys + sys.path.append(os.path.abspath(".")) import dvr_scan + VERSION = dvr_scan.__version__ with open("dist/.version_info", "wb") as f: @@ -13,7 +14,8 @@ minor = elements[1] patch = elements[2] if len(elements) == 3 else 0 - f.write(f"""# UTF-8 + f.write( + f"""# UTF-8 # # For more details about fixed file info 'ffi' see: # http://msdn.microsoft.com/en-us/library/ms646997.aspx @@ -56,4 +58,5 @@ VarFileInfo([VarStruct(u'Translation', [1033, 1200])]) ] ) -""".encode()) +""".encode() + ) diff --git a/dvr_scan/__init__.py b/dvr_scan/__init__.py index 2386a50..56612ba 100644 --- a/dvr_scan/__init__.py +++ b/dvr_scan/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -33,13 +32,12 @@ """ import os -import sys import pkgutil +import sys # Handle loading OpenCV. This **MUST** be first before any other DVR-Scan or third-party # packages are imported which might attempt to import the `cv2` module. -import dvr_scan.opencv_loader as _ - +import dvr_scan.opencv_loader as _ # noqa: F401 from dvr_scan.platform import init_logger # Used for module/distribution identification. @@ -54,10 +52,14 @@ def get_license_info() -> str: if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"): app_folder = os.path.abspath(os.path.dirname(sys.executable)) license_files = ["LICENSE", "LICENSE-THIRDPARTY"] - license_text = "\n".join([ - open(os.path.join(app_folder, license_file), "rb").read().decode("ascii", "ignore") - for license_file in license_files - ]) + license_text = "\n".join( + [ + open(os.path.join(app_folder, license_file), "rb") + .read() + .decode("ascii", "ignore") + for license_file in license_files + ] + ) # Use the LICENSE file included with the package distribution. else: license_text = pkgutil.get_data(__name__, "LICENSE").decode("ascii", "ignore") @@ -65,9 +67,11 @@ def get_license_info() -> str: # During development this is normal since the package paths won't be correct. except FileNotFoundError: pass - return ("[DVR-Scan] Error: Missing LICENSE files.\n" - "See the following URL for license/copyright information:\n" - " < https://www.dvr-scan.com/resources >\n") + return ( + "[DVR-Scan] Error: Missing LICENSE files.\n" + "See the following URL for license/copyright information:\n" + " < https://www.dvr-scan.com/resources >\n" + ) # Initialize logger. diff --git a/dvr_scan/__main__.py b/dvr_scan/__main__.py index 2fda291..d449cce 100644 --- a/dvr_scan/__main__.py +++ b/dvr_scan/__main__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -15,13 +14,13 @@ """ import logging -from subprocess import CalledProcessError import sys - -from dvr_scan.cli.controller import parse_settings, run_dvr_scan +from subprocess import CalledProcessError from scenedetect import VideoOpenFailure -from scenedetect.platform import logging_redirect_tqdm, FakeTqdmLoggingRedirect +from scenedetect.platform import FakeTqdmLoggingRedirect, logging_redirect_tqdm + +from dvr_scan.cli.controller import parse_settings, run_dvr_scan EXIT_SUCCESS: int = 0 EXIT_ERROR: int = 1 @@ -33,9 +32,9 @@ def main(): if settings is None: sys.exit(EXIT_ERROR) logger = logging.getLogger("dvr_scan") - redirect = (FakeTqdmLoggingRedirect if settings.get("quiet-mode") else logging_redirect_tqdm) + redirect = FakeTqdmLoggingRedirect if settings.get("quiet-mode") else logging_redirect_tqdm debug_mode = settings.get("debug") - show_traceback = logging.DEBUG == getattr(logging, settings.get("verbosity").upper()) + show_traceback = getattr(logging, settings.get("verbosity").upper()) == logging.DEBUG with redirect(loggers=[logger]): try: run_dvr_scan(settings) @@ -47,7 +46,7 @@ def main(): logger.critical("Failed to load input: %s", str(ex), exc_info=show_traceback) if debug_mode: raise - except KeyboardInterrupt as ex: + except KeyboardInterrupt: logger.info("Stopping (interrupt received)...", exc_info=show_traceback) if debug_mode: raise diff --git a/dvr_scan/cli/__init__.py b/dvr_scan/cli/__init__.py index 647f940..25c4869 100644 --- a/dvr_scan/cli/__init__.py +++ b/dvr_scan/cli/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -23,7 +22,7 @@ from typing import List, Optional import dvr_scan -from dvr_scan.cli.config import ConfigRegistry, CHOICE_MAP, USER_CONFIG_FILE_PATH +from dvr_scan.cli.config import CHOICE_MAP, USER_CONFIG_FILE_PATH, ConfigRegistry from dvr_scan.region import RegionValidator # Version string shown for the -v/--version CLI argument. @@ -83,8 +82,12 @@ def _type_checker(value): # Timecode in HH:MM:SS[.nnn] format. elif ":" in value: tc_val = value.split(":") - if (len(tc_val) == 3 and tc_val[0].isdigit() and tc_val[1].isdigit() - and tc_val[2].replace(".", "").isdigit()): + if ( + len(tc_val) == 3 + and tc_val[0].isdigit() + and tc_val[1].isdigit() + and tc_val[2].replace(".", "").isdigit() + ): hrs, mins = int(tc_val[0]), int(tc_val[1]) secs = float(tc_val[2]) if "." in tc_val[2] else int(tc_val[2]) if hrs >= 0 and mins >= 0 and secs >= 0 and mins < 60 and secs < 60: @@ -93,7 +96,8 @@ def _type_checker(value): raise argparse.ArgumentTypeError( f"invalid timecode: {value}\n" "Timecode must be specified as number of frames (12345), seconds (number followed " - "by s, e.g. 123s or 123.45s), or timecode (HH:MM:SS[.nnn].") + "by s, e.g. 123s or 123.45s), or timecode (HH:MM:SS[.nnn]." + ) return value return _type_checker @@ -148,10 +152,11 @@ def _kernel_size_type_check(metavar: Optional[str] = None): def _type_checker(value): value = int(value) - if not value in (-1, 0) and (value < 3 or value % 2 == 0): + if value not in (-1, 0) and (value < 3 or value % 2 == 0): raise argparse.ArgumentTypeError( "invalid choice: %d (%s must be an odd number starting from 3, 0 to disable, or " - "-1 for auto)" % (value, metavar)) + "-1 for auto)" % (value, metavar) + ) return value return _type_checker @@ -207,9 +212,9 @@ def _type_checker(value): return _type_checker -def string_type_check(valid_strings: List[str], - case_sensitive: bool = True, - metavar: Optional[str] = None): +def string_type_check( + valid_strings: List[str], case_sensitive: bool = True, metavar: Optional[str] = None +): """Creates an argparse type for a list of strings. The passed argument is declared valid if it is a valid string which exists @@ -234,7 +239,7 @@ def _type_checker(value): valid = True if not case_sensitive: value = value.lower() - if not value in valid_strings: + if value not in valid_strings: valid = False case_msg = " (case sensitive)" if case_sensitive else "" msg = "invalid choice: %s (valid settings for %s%s are: %s)" % ( @@ -253,7 +258,6 @@ def _type_checker(value): class LicenseAction(argparse.Action): """argparse Action for displaying DVR-Scan license & copyright info.""" - # pylint: disable=redefined-builtin, too-many-arguments def __init__( self, option_strings, @@ -281,7 +285,6 @@ def __call__(self, parser, namespace, values, option_string=None): class VersionAction(argparse.Action): """argparse Action for displaying DVR-Scan version.""" - # pylint: disable=redefined-builtin, too-many-arguments def __init__( self, option_strings, @@ -307,7 +310,7 @@ def __call__(self, parser, namespace, values, option_string=None): class RegionAction(argparse.Action): - DEFAULT_ERROR_MESSAGE = ("Region must be 3 or more points of the form X0 Y0 X1 Y1 X2 Y2 ...") + DEFAULT_ERROR_MESSAGE = "Region must be 3 or more points of the form X0 Y0 X1 Y1 X2 Y2 ..." def __init__( self, @@ -342,15 +345,18 @@ def __call__(self, parser, namespace, values: List[str], option_string=None): region = RegionValidator(" ".join(values)) except ValueError as ex: message = " ".join(str(arg) for arg in ex.args) - raise (argparse.ArgumentError( - self, message if message else RegionAction.DEFAULT_ERROR_MESSAGE)) from ex + raise ( + argparse.ArgumentError( + self, message if message else RegionAction.DEFAULT_ERROR_MESSAGE + ) + ) from ex # Append this ROI to any existing ones, if any. # TODO(v1.7): Audit uses of the 'regions' constant for -a/--add-region, replace with a named # constant where possible. items = getattr(namespace, "regions", []) items += [region.value] - setattr(namespace, "regions", items) + namespace.regions = items # TODO: To help with debugging, add a `debug` option to the config file as well that, if set in the @@ -371,7 +377,6 @@ def get_cli_parser(user_config: ConfigRegistry): ) if hasattr(parser, "_optionals"): - # pylint: disable=protected-access parser._optionals.title = "arguments" parser.add_argument( @@ -396,9 +401,11 @@ def get_cli_parser(user_config: ConfigRegistry): type=str, nargs="+", action="append", - help=("[REQUIRED] Path to input video. May specify multiple inputs with the same" - " resolution and framerate, or by specifying a wildcard/glob. Output" - " filenames are generated using the first video name only."), + help=( + "[REQUIRED] Path to input video. May specify multiple inputs with the same" + " resolution and framerate, or by specifying a wildcard/glob. Output" + " filenames are generated using the first video name only." + ), ) parser.add_argument( @@ -406,9 +413,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--output-dir", metavar="path", type=str, - help=("If specified, write output files in the given directory. If path does not" - " exist, it will be created. If unset, output files are written to the" - " current working directory."), + help=( + "If specified, write output files in the given directory. If path does not" + " exist, it will be created. If unset, output files are written to the" + " current working directory." + ), ) parser.add_argument( @@ -416,9 +425,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--output", metavar="video.avi", type=str, - help=("If specified, all motion events will be written to a single file" - " in order (if not specified, separate files are created for each event)." - " Filename MUST end with .avi. Only supported in output mode OPENCV."), + help=( + "If specified, all motion events will be written to a single file" + " in order (if not specified, separate files are created for each event)." + " Filename MUST end with .avi. Only supported in output mode OPENCV." + ), ) parser.add_argument( @@ -426,11 +437,14 @@ def get_cli_parser(user_config: ConfigRegistry): "--output-mode", metavar="mode", type=string_type_check(VALID_OUTPUT_MODES, False, "mode"), - help=("Set mode for generating output files. Certain features may not work with " - " all output modes. Must be one of: %s.%s" % ( - ", ".join(VALID_OUTPUT_MODES), - user_config.get_help_string("output-mode"), - )), + help=( + "Set mode for generating output files. Certain features may not work with " + " all output modes. Must be one of: %s.%s" + % ( + ", ".join(VALID_OUTPUT_MODES), + user_config.get_help_string("output-mode"), + ) + ), ) parser.add_argument( @@ -438,8 +452,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--scan-only", action="store_true", default=False, - help=("Only perform motion detection (does not write any files to disk)." - " If set, -m/--output-mode is ignored."), + help=( + "Only perform motion detection (does not write any files to disk)." + " If set, -m/--output-mode is ignored." + ), ) parser.add_argument( @@ -447,8 +463,9 @@ def get_cli_parser(user_config: ConfigRegistry): "--config", metavar="settings.cfg", type=str, - help=("Path to config file. If not set, tries to load one from %s" % - (USER_CONFIG_FILE_PATH)), + help=( + "Path to config file. If not set, tries to load one from %s" % (USER_CONFIG_FILE_PATH) + ), ) parser.add_argument( @@ -456,9 +473,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--region-editor", dest="region_editor", action="store_true", - help=("Show region editor window. Motion detection will be limited to the enclosed area " - "during processing. Only single regions can be edited, but supports preview of " - "multiple regions if defined.%s" % user_config.get_help_string("region-editor")), + help=( + "Show region editor window. Motion detection will be limited to the enclosed area " + "during processing. Only single regions can be edited, but supports preview of " + "multiple regions if defined.%s" % user_config.get_help_string("region-editor") + ), ) parser.add_argument( @@ -472,7 +491,8 @@ def get_cli_parser(user_config: ConfigRegistry): "Limit motion detection to a region of the frame. The region is defined as a sequence " "of 3 or more points forming a closed shape inside the video. Coordinate 0 0 is top " "left of the frame, and WIDTH-1 HEIGHT-1 is bottom right. Can be specified multiple " - "times to add more regions."), + "times to add more regions." + ), ) # TODO: Consider merging the load/save region options into a single --region-file option. @@ -482,8 +502,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--load-region", metavar="REGIONS.txt", type=str, - help=("Load region data from file. Each line must be a list of points in the format " - "specified by -a/--add-region. Each line is treated as a separate polygon."), + help=( + "Load region data from file. Each line must be a list of points in the format " + "specified by -a/--add-region. Each line is treated as a separate polygon." + ), ) parser.add_argument( @@ -491,8 +513,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--save-region", metavar="REGIONS.txt", type=str, - help=("Save regions before processing. If REGIONS.txt exists it will be overwritten. " - "The region editor will save regions here instead of asking for a path."), + help=( + "Save regions before processing. If REGIONS.txt exists it will be overwritten. " + "The region editor will save regions here instead of asking for a path." + ), ) parser.add_argument( @@ -500,9 +524,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--bg-subtractor", metavar="type", type=string_type_check(["MOG2", "CNT", "MOG2_CUDA"], False, "type"), - help=("The type of background subtractor to use, must be one of: " - " MOG2 (default), CNT (parallel), MOG2_CUDA (Nvidia GPU).%s") % - user_config.get_help_string("bg-subtractor"), + help=( + "The type of background subtractor to use, must be one of: " + " MOG2 (default), CNT (parallel), MOG2_CUDA (Nvidia GPU).%s" + ) + % user_config.get_help_string("bg-subtractor"), ) parser.add_argument( @@ -510,11 +536,13 @@ def get_cli_parser(user_config: ConfigRegistry): "--threshold", metavar="value", type=float_type_check(0.0, None, "value"), - help=("Threshold representing amount of motion in a frame required to trigger" - " motion events. Lower values are more sensitive to motion. If too high," - " some movement in the scene may not be detected, while too low of a" - " threshold can result in false detections.%s" % - (user_config.get_help_string("threshold"))), + help=( + "Threshold representing amount of motion in a frame required to trigger" + " motion events. Lower values are more sensitive to motion. If too high," + " some movement in the scene may not be detected, while too low of a" + " threshold can result in false detections.%s" + % (user_config.get_help_string("threshold")) + ), ) parser.add_argument( @@ -522,10 +550,12 @@ def get_cli_parser(user_config: ConfigRegistry): "--kernel-size", metavar="size", type=_kernel_size_type_check(metavar="size"), - help=("Size in pixels of the noise reduction kernel. Must be odd number greater than 1, " - "0 to disable, or -1 to auto-set based on video resolution (default). If the kernel " - "size is set too large, some movement in the scene may not be detected.%s" % - (user_config.get_help_string("kernel-size"))), + help=( + "Size in pixels of the noise reduction kernel. Must be odd number greater than 1, " + "0 to disable, or -1 to auto-set based on video resolution (default). If the kernel " + "size is set too large, some movement in the scene may not be detected.%s" + % (user_config.get_help_string("kernel-size")) + ), ) parser.add_argument( @@ -533,9 +563,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--min-event-length", metavar="time", type=timecode_type_check("time"), - help=("Length of time that must contain motion before triggering a new event. Can be" - " specified as frames (123), seconds (12.3s), or timecode (00:00:01).%s" % - user_config.get_help_string("min-event-length")), + help=( + "Length of time that must contain motion before triggering a new event. Can be" + " specified as frames (123), seconds (12.3s), or timecode (00:00:01).%s" + % user_config.get_help_string("min-event-length") + ), ) parser.add_argument( @@ -543,9 +575,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--time-before-event", metavar="time", type=timecode_type_check("time"), - help=("Maximum amount of time to include before each event. Can be specified as" - " frames (123), seconds (12.3s), or timecode (00:00:01).%s" % - user_config.get_help_string("time-before-event")), + help=( + "Maximum amount of time to include before each event. Can be specified as" + " frames (123), seconds (12.3s), or timecode (00:00:01).%s" + % user_config.get_help_string("time-before-event") + ), ) parser.add_argument( @@ -553,10 +587,12 @@ def get_cli_parser(user_config: ConfigRegistry): "--time-post-event", metavar="time", type=timecode_type_check("time"), - help=("Maximum amount of time to include after each event. The event will end once no" - " motion has been detected for this period of time. Can be specified as frames (123)," - " seconds (12.3s), or timecode (00:00:01).%s" % - user_config.get_help_string("time-post-event")), + help=( + "Maximum amount of time to include after each event. The event will end once no" + " motion has been detected for this period of time. Can be specified as frames (123)," + " seconds (12.3s), or timecode (00:00:01).%s" + % user_config.get_help_string("time-post-event") + ), ) parser.add_argument( @@ -564,9 +600,11 @@ def get_cli_parser(user_config: ConfigRegistry): "--start-time", metavar="time", type=timecode_type_check("time"), - help=("Time to seek to in video before performing detection. Can be" - " given in number of frames (12345), seconds (number followed" - " by s, e.g. 123s or 123.45s), or timecode (HH:MM:SS[.nnn])."), + help=( + "Time to seek to in video before performing detection. Can be" + " given in number of frames (12345), seconds (number followed" + " by s, e.g. 123s or 123.45s), or timecode (HH:MM:SS[.nnn])." + ), ) parser.add_argument( @@ -574,8 +612,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--duration", metavar="time", type=timecode_type_check("time"), - help=("Duration stop processing the input after (see -st for valid timecode formats)." - " Overrides -et."), + help=( + "Duration stop processing the input after (see -st for valid timecode formats)." + " Overrides -et." + ), ) parser.add_argument( @@ -603,27 +643,33 @@ def get_cli_parser(user_config: ConfigRegistry): type=timecode_type_check("smooth_time"), nargs="?", const=False, - help=("If set, draws a bounding box around the area where motion was detected. The amount" - " of temporal smoothing can be specified in either frames (12345) or seconds (number" - " followed by s, e.g. 123s or 123.45s). If omitted, defaults to 0.1s. If set to 0," - " smoothing is disabled.%s" % - (user_config.get_help_string("bounding-box", show_default=False))), + help=( + "If set, draws a bounding box around the area where motion was detected. The amount" + " of temporal smoothing can be specified in either frames (12345) or seconds (number" + " followed by s, e.g. 123s or 123.45s). If omitted, defaults to 0.1s. If set to 0," + " smoothing is disabled.%s" + % (user_config.get_help_string("bounding-box", show_default=False)) + ), ) parser.add_argument( "-tc", "--time-code", action="store_true", - help=("Draw time code in top left corner of each frame.%s" % - user_config.get_help_string("time-code", show_default=False)), + help=( + "Draw time code in top left corner of each frame.%s" + % user_config.get_help_string("time-code", show_default=False) + ), ) parser.add_argument( "-fm", "--frame-metrics", action="store_true", - help=("Draw frame metrics in top right corner of each frame.%s" % - user_config.get_help_string("frame-metrics", show_default=False)), + help=( + "Draw frame metrics in top right corner of each frame.%s" + % user_config.get_help_string("frame-metrics", show_default=False) + ), ) parser.add_argument( @@ -631,8 +677,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--mask-output", metavar="motion_mask.avi", type=str, - help=("Write a video containing the motion mask of each frame. Useful when tuning " - "detection parameters."), + help=( + "Write a video containing the motion mask of each frame. Useful when tuning " + "detection parameters." + ), ) parser.add_argument( @@ -640,11 +688,13 @@ def get_cli_parser(user_config: ConfigRegistry): "--downscale-factor", metavar="factor", type=int_type_check(0, None, "factor"), - help=("Integer factor to downscale (shrink) video before processing, to" - " improve performance. For example, if input video resolution" - " is 1024 x 400, and factor=2, each frame is reduced to" - " 1024/2 x 400/2=512 x 200 before processing.%s" % - (user_config.get_help_string("downscale-factor"))), + help=( + "Integer factor to downscale (shrink) video before processing, to" + " improve performance. For example, if input video resolution" + " is 1024 x 400, and factor=2, each frame is reduced to" + " 1024/2 x 400/2=512 x 200 before processing.%s" + % (user_config.get_help_string("downscale-factor")) + ), ) parser.add_argument( @@ -652,21 +702,25 @@ def get_cli_parser(user_config: ConfigRegistry): "--frame-skip", metavar="num_frames", type=int_type_check(0, None, "num_frames"), - help=("Number of frames to skip after processing a given frame." - " Improves performance, at expense of frame and time accuracy," - " and may increase probability of missing motion events." - " If set, -l, -tb, and -tp will all be scaled relative to the source" - " framerate. Values above 1 or 2 are not recommended.%s" % - (user_config.get_help_string("frame-skip"))), + help=( + "Number of frames to skip after processing a given frame." + " Improves performance, at expense of frame and time accuracy," + " and may increase probability of missing motion events." + " If set, -l, -tb, and -tp will all be scaled relative to the source" + " framerate. Values above 1 or 2 are not recommended.%s" + % (user_config.get_help_string("frame-skip")) + ), ) parser.add_argument( "-q", "--quiet", dest="quiet_mode", action="store_true", - help=("Suppress all output except for final comma-separated list of motion events." - " Useful for computing or piping output directly into other programs/scripts.%s" % - user_config.get_help_string("quiet-mode")), + help=( + "Suppress all output except for final comma-separated list of motion events." + " Useful for computing or piping output directly into other programs/scripts.%s" + % user_config.get_help_string("quiet-mode") + ), ) # Options that only take long-form. @@ -675,8 +729,10 @@ def get_cli_parser(user_config: ConfigRegistry): "--logfile", metavar="file", type=str, - help=("Path to log file for writing application output. If FILE already exists, the program" - " output will be appended to the existing contents."), + help=( + "Path to log file for writing application output. If FILE already exists, the program" + " output will be appended to the existing contents." + ), ) parser.add_argument( @@ -692,10 +748,13 @@ def get_cli_parser(user_config: ConfigRegistry): "--verbosity", metavar="type", type=string_type_check(CHOICE_MAP["verbosity"], False, "type"), - help=("Amount of verbosity to use for log output. Must be one of: %s.%s" % ( - ", ".join(CHOICE_MAP["verbosity"]), - user_config.get_help_string("verbosity"), - )), + help=( + "Amount of verbosity to use for log output. Must be one of: %s.%s" + % ( + ", ".join(CHOICE_MAP["verbosity"]), + user_config.get_help_string("verbosity"), + ) + ), ) parser.add_argument( diff --git a/dvr_scan/cli/config.py b/dvr_scan/cli/config.py index 7dc69d4..5369a36 100644 --- a/dvr_scan/cli/config.py +++ b/dvr_scan/cli/config.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # PySceneDetect: Python-Based Video Scene Detector # --------------------------------------------------------------- @@ -19,11 +18,11 @@ in the `dvr_scan` module. """ -from abc import ABC, abstractmethod import logging import os import os.path -from configparser import ConfigParser, ParsingError, DEFAULTSECT +from abc import ABC, abstractmethod +from configparser import DEFAULTSECT, ConfigParser, ParsingError from typing import Any, AnyStr, Dict, List, Optional, Tuple, Union from platformdirs import user_config_dir @@ -43,7 +42,7 @@ DEPRECATED_CONFIG_OPTION: Dict[str, str] = { "region-of-interest": "The region-of-interest config option is deprecated and may be removed. " - "Use the load-region option instead, or specify -R/--load-region." + "Use the load-region option instead, or specify -R/--load-region." } @@ -150,8 +149,9 @@ def from_config(config_value: str, default: "RangeValue") -> "RangeValue": max_val=default.max_val, ) except ValueError as ex: - raise OptionParseFailure("Value must be between %s and %s." % - (default.min_val, default.max_val)) from ex + raise OptionParseFailure( + "Value must be between %s and %s." % (default.min_val, default.max_val) + ) from ex class KernelSizeValue(ValidatedValue): @@ -159,7 +159,7 @@ class KernelSizeValue(ValidatedValue): def __init__(self, value: int = -1): value = int(value) - if not value in (-1, 0) and (value < 3 or value % 2 == 0): + if value not in (-1, 0) and (value < 3 or value % 2 == 0): raise ValueError() self._value = value @@ -183,7 +183,8 @@ def from_config(config_value: str, default: "KernelSizeValue") -> "KernelSizeVal return KernelSizeValue(int(config_value)) except ValueError as ex: raise OptionParseFailure( - "Size must be odd number starting from 3, 0 to disable, or -1 for auto.") from ex + "Size must be odd number starting from 3, 0 to disable, or -1 for auto." + ) from ex class RegionValueDeprecated(ValidatedValue): @@ -195,11 +196,15 @@ class RegionValueDeprecated(ValidatedValue): def __init__(self, value: Optional[str] = None, allow_size: bool = False): if value is not None: translation_table = str.maketrans( - {char: " " for char in RegionValueDeprecated._IGNORE_CHARS}) + {char: " " for char in RegionValueDeprecated._IGNORE_CHARS} + ) values = value.translate(translation_table).split() valid_lengths = (2, 4) if allow_size else (4,) - if not (len(values) in valid_lengths and all([val.isdigit() for val in values]) - and all([int(val) >= 0 for val in values])): + if not ( + len(values) in valid_lengths + and all([val.isdigit() for val in values]) + and all([int(val) >= 0 for val in values]) + ): raise ValueError() self._value = [int(val) for val in values] else: @@ -227,8 +232,10 @@ def from_config(config_value: str, default: "RegionValueDeprecated") -> "RegionV try: return RegionValueDeprecated(config_value) except ValueError as ex: - raise OptionParseFailure("ROI must be four positive integers of the form (x,y)/(w,h)." - " Brackets, commas, slashes, and spaces are optional.") from ex + raise OptionParseFailure( + "ROI must be four positive integers of the form (x,y)/(w,h)." + " Brackets, commas, slashes, and spaces are optional." + ) from ex class RGBValue(ValidatedValue): @@ -254,8 +261,11 @@ def __init__(self, value: Union[int, str]): if not isinstance(value, int): translation_table = str.maketrans({char: " " for char in RGBValue._IGNORE_CHARS}) values = value.translate(translation_table).split() - if not (len(values) == 3 and all([val.isdigit() for val in values]) - and all([int(val) >= 0 for val in values])): + if not ( + len(values) == 3 + and all([val.isdigit() for val in values]) + and all([int(val) >= 0 for val in values]) + ): raise ValueError() value = int(values[0]) << 16 | int(values[1]) << 8 | int(values[2]) assert isinstance(value, int) @@ -289,7 +299,8 @@ def from_config(config_value: str, default: "RGBValue") -> "RGBValue": return RGBValue(config_value) except ValueError as ex: raise OptionParseFailure( - "Color values must be in hex (0xFFFFFF) or R,G,B (255,255,255).") from ex + "Color values must be in hex (0xFFFFFF) or R,G,B (255,255,255)." + ) from ex ConfigValue = Union[bool, int, float, str] @@ -302,22 +313,22 @@ def from_config(config_value: str, default: "RGBValue") -> "RGBValue": # TODO: Replace these default values with those set in dvr_scan.context. CONFIG_MAP: ConfigDict = { - # General Options + # General Options "region-editor": False, "quiet-mode": False, "verbosity": "info", - # Input/Output + # Input/Output "output-dir": "", "output-mode": "opencv", "ffmpeg-input-args": DEFAULT_FFMPEG_INPUT_ARGS, "ffmpeg-output-args": DEFAULT_FFMPEG_OUTPUT_ARGS, "opencv-codec": "XVID", - # Motion Events + # Motion Events "min-event-length": TimecodeValue("0.1s"), "time-before-event": TimecodeValue("1.5s"), "time-post-event": TimecodeValue("2.0s"), "use-pts": False, - # Detection Parameters + # Detection Parameters "bg-subtractor": "MOG2", "threshold": 0.15, "max-threshold": 255.0, @@ -325,12 +336,12 @@ def from_config(config_value: str, default: "RGBValue") -> "RGBValue": "kernel-size": KernelSizeValue(), "downscale-factor": 0, "learning-rate": float(-1), - # TODO(v1.7): Remove, replaced with region files. + # TODO(v1.7): Remove, replaced with region files. "region-of-interest": RegionValueDeprecated(), "load-region": "", "frame-skip": 0, - # Overlays - # Text Overlays + # Overlays + # Text Overlays "time-code": False, "frame-metrics": False, "text-border": 4, @@ -339,7 +350,7 @@ def from_config(config_value: str, default: "RGBValue") -> "RGBValue": "text-font-thickness": 2, "text-font-color": RGBValue(0xFFFFFF), "text-bg-color": RGBValue(0x000000), - # Bounding Box + # Bounding Box "bounding-box": False, "bounding-box-smooth-time": TimecodeValue("0.1s"), "bounding-box-color": RGBValue(0xFF0000), @@ -419,12 +430,12 @@ def load(self, path=None): # Try to load and parse the config file at `path`. config = ConfigParser() try: - config_file_contents = "[%s]\n%s" % (DEFAULTSECT, open(path, "r").read()) + config_file_contents = "[%s]\n%s" % (DEFAULTSECT, open(path).read()) config.read_string(config_file_contents, source=path) except ParsingError as ex: - raise ConfigLoadFailure(self._init_log, reason=ex) + raise ConfigLoadFailure(self._init_log, reason=ex) from ex except OSError as ex: - raise ConfigLoadFailure(self._init_log, reason=ex) + raise ConfigLoadFailure(self._init_log, reason=ex) from ex self._parse_config(config) if any(level >= logging.ERROR for level, _ in self._init_log): raise ConfigLoadFailure(self._init_log) @@ -449,8 +460,9 @@ def _migrate_deprecated(self, config: ConfigParser): for deprecated in deprecated_options: self._log(logging.WARNING, f"WARNING: {DEPRECATED_CONFIG_OPTION[deprecated]}") - def _parse_config(self, - config: ConfigParser) -> Tuple[Optional[ConfigDict], List[Tuple[int, str]]]: + def _parse_config( + self, config: ConfigParser + ) -> Tuple[Optional[ConfigDict], List[Tuple[int, str]]]: """Process the given configuration into a key-value mapping. Returns: @@ -459,15 +471,15 @@ def _parse_config(self, if config.sections(): self._log( logging.ERROR, - "Invalid config file: must not contain any sections, found:\n %s" % - (", ".join(["[%s]" % section for section in config.sections()])), + "Invalid config file: must not contain any sections, found:\n %s" + % (", ".join(["[%s]" % section for section in config.sections()])), ) return self._migrate_deprecated(config) for option in config[DEFAULTSECT]: - if not option in CONFIG_MAP: + if option not in CONFIG_MAP: self._log(logging.ERROR, "Unsupported config option: %s" % (option)) continue try: @@ -499,7 +511,8 @@ def _parse_config(self, if issubclass(option_type, ValidatedValue): try: self._config[option] = option_type.from_config( - config_value=config_value, default=default) + config_value=config_value, default=default + ) except OptionParseFailure as ex: self._log( logging.ERROR, @@ -510,14 +523,15 @@ def _parse_config(self, # If we didn't process the value as a given type, handle it as a string. We also # replace newlines with spaces, and strip any remaining leading/trailing whitespace. if value_type is None: - config_value = (config.get(DEFAULTSECT, option).replace("\n", " ").strip()) + config_value = config.get(DEFAULTSECT, option).replace("\n", " ").strip() if option in CHOICE_MAP: if config_value.lower() not in [ - choice.lower() for choice in CHOICE_MAP[option] + choice.lower() for choice in CHOICE_MAP[option] ]: self._log( logging.ERROR, - "Invalid setting for %s:\n %s\nMust be one of: %s." % ( + "Invalid setting for %s:\n %s\nMust be one of: %s." + % ( option, config.get(DEFAULTSECT, option), ", ".join(choice for choice in CHOICE_MAP[option]), @@ -529,7 +543,7 @@ def _parse_config(self, def is_default(self, option: str) -> bool: """True if the option is default, i.e. is NOT set by the user.""" - return not option in self._config + return option not in self._config def get_value( self, @@ -548,7 +562,7 @@ def get_value( if ignore_default: return None if issubclass(type(value), ValidatedValue): - return value.value # Extract validated value. + return value.value # Extract validated value. return value def get_help_string(self, option: str, show_default: Optional[bool] = None) -> str: @@ -567,7 +581,8 @@ def get_help_string(self, option: str, show_default: Optional[bool] = None) -> s else: value_str = str(self._config[option]) return " [setting: %s]" % (value_str) - if show_default is False or (show_default is None and is_flag - and CONFIG_MAP[option] is False): + if show_default is False or ( + show_default is None and is_flag and CONFIG_MAP[option] is False + ): return "" return " [default: %s]" % (str(CONFIG_MAP[option])) diff --git a/dvr_scan/cli/controller.py b/dvr_scan/cli/controller.py index 0928b77..d2372b6 100644 --- a/dvr_scan/cli/controller.py +++ b/dvr_scan/cli/controller.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -25,10 +24,10 @@ import dvr_scan from dvr_scan.cli import get_cli_parser -from dvr_scan.cli.config import ConfigRegistry, ConfigLoadFailure, RegionValueDeprecated -from dvr_scan.overlays import TextOverlay, BoundingBoxOverlay -from dvr_scan.scanner import DetectorType, OutputMode, MotionScanner +from dvr_scan.cli.config import ConfigLoadFailure, ConfigRegistry, RegionValueDeprecated +from dvr_scan.overlays import BoundingBoxOverlay, TextOverlay from dvr_scan.platform import init_logger +from dvr_scan.scanner import DetectorType, MotionScanner, OutputMode logger = logging.getLogger("dvr_scan") @@ -77,14 +76,15 @@ def _preprocess_args(args): input_files += expanded args.input = input_files # -o/--output - if hasattr(args, "output") and not "." in args.output: + if hasattr(args, "output") and "." not in args.output: args.output += ".avi" # -roi/--region-of-interest if hasattr(args, "region_of_interest") and args.region_of_interest: original_roi = args.region_of_interest try: args.region_of_interest = RegionValueDeprecated( - value=" ".join(original_roi), allow_size=True).value + value=" ".join(original_roi), allow_size=True + ).value except ValueError: logger.error( "Error: Invalid value for ROI: %s. ROI must be specified as a rectangle of" @@ -161,7 +161,9 @@ def parse_settings(args: ty.List[str] = None) -> ty.Optional[ProgramSettings]: logger.debug("Error loading config file:", exc_info=config_load_error) if debug_mode: raise config_load_error - return None + # Intentionally suppress the exception in release mode since we've already logged the + # failure reason to the user above. We can now exit with an error code. + return None # noqa: B012 if config.config_dict: logger.debug("Loaded configuration:\n%s", str(config.config_dict)) @@ -203,7 +205,9 @@ def parse_settings(args: ty.List[str] = None) -> ty.Optional[ProgramSettings]: # to the option in MotionScanner should be done via properties, e.g. make a property in # ProgramSettings called 'output_dir' that just returns settings.get('output_dir'). These can then # be directly referenced from the MotionScanner. -def run_dvr_scan(settings: ProgramSettings,) -> ty.List[ty.Tuple[FrameTimecode, FrameTimecode]]: +def run_dvr_scan( + settings: ProgramSettings, +) -> ty.List[ty.Tuple[FrameTimecode, FrameTimecode]]: """Run DVR-Scan scanning logic using validated `settings` from `parse_settings()`.""" logger.info("Initializing scan context...") @@ -215,7 +219,8 @@ def run_dvr_scan(settings: ProgramSettings,) -> ty.List[ty.Tuple[FrameTimecode, ) output_mode = ( - OutputMode.SCAN_ONLY if settings.get_arg("scan-only") else settings.get("output-mode")) + OutputMode.SCAN_ONLY if settings.get_arg("scan-only") else settings.get("output-mode") + ) scanner.set_output( comp_file=settings.get_arg("output"), mask_file=settings.get_arg("mask-output"), @@ -259,7 +264,8 @@ def run_dvr_scan(settings: ProgramSettings,) -> ty.List[ty.Tuple[FrameTimecode, smoothing_time = FrameTimecode(bounding_box_arg, scanner.framerate) else: smoothing_time = FrameTimecode( - settings.get("bounding-box-smooth-time"), scanner.framerate) + settings.get("bounding-box-smooth-time"), scanner.framerate + ) bounding_box = BoundingBoxOverlay( min_size_ratio=settings.get("bounding-box-min-size"), thickness_ratio=settings.get("bounding-box-thickness"), @@ -290,7 +296,9 @@ def run_dvr_scan(settings: ProgramSettings,) -> ty.List[ty.Tuple[FrameTimecode, use_pts=settings.get("use-pts"), ) - scanner.set_thumbnail_params(thumbnails=settings.get("thumbnails"),) + scanner.set_thumbnail_params( + thumbnails=settings.get("thumbnails"), + ) scanner.set_video_time( start_time=settings.get_arg("start-time"), @@ -333,12 +341,14 @@ def run_dvr_scan(settings: ProgramSettings,) -> ty.List[ty.Tuple[FrameTimecode, "-------------------------------------------------------------", ] output_strs += [ - "| Event %4d | %s | %s | %s |" % ( + "| Event %4d | %s | %s | %s |" + % ( i + 1, event.start.get_timecode(precision=1), (event.end - event.start).get_timecode(precision=1), event.end.get_timecode(precision=1), - ) for i, event in enumerate(result.event_list) + ) + for i, event in enumerate(result.event_list) ] output_strs += ["-------------------------------------------------------------"] logger.info("List of motion events:\n%s", "\n".join(output_strs)) diff --git a/dvr_scan/detector.py b/dvr_scan/detector.py index 915e9e6..388ce24 100644 --- a/dvr_scan/detector.py +++ b/dvr_scan/detector.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -15,16 +14,16 @@ represents the relative amount of movement of consecutive frames in a video. """ -from collections import namedtuple -from dataclasses import dataclass import logging import typing as ty +from collections import namedtuple +from dataclasses import dataclass import cv2 import numpy as np -from dvr_scan.subtractor import Subtractor from dvr_scan.region import Point +from dvr_scan.subtractor import Subtractor Rectangle = namedtuple("Rectangle", ["x", "y", "w", "h"]) @@ -56,7 +55,7 @@ def __init__( self._subtractor = subtractor self._frame_size = frame_size self._downscale = downscale - self._regions = list(regions) if not regions is None else [] + self._regions = list(regions) if regions is not None else [] self._mask: np.ndarray = np.ones((0, 0)) self._area: ty.Tuple[Point, Point] = ( Point(0, 0), @@ -80,13 +79,14 @@ def __init__( max_x, max_y = max(max_x, point.x), max(max_y, point.y) self._area = (Point(min_x, min_y), Point(max_x, max_y)) coverage = 100.0 * (active_pixels / float(frame_size[0] * frame_size[1])) - mask = mask[self._area[0].y:self._area[1].y, self._area[0].x:self._area[1].x] + mask = mask[self._area[0].y : self._area[1].y, self._area[0].x : self._area[1].x] logger.debug( "Region Mask: area = (" f"{self._area[0].x},{self._area[0].y}),({self._area[1].x},{self._area[1].y}" - f"), coverage = {coverage:.2f}%") + f"), coverage = {coverage:.2f}%" + ) if self._downscale > 1: - mask = mask[::self._downscale, ::self._downscale] + mask = mask[:: self._downscale, :: self._downscale] logger.debug(f"Mask Downscaled: size = {mask.shape[0]}, {mask.shape[1]}") self._mask = mask @@ -101,11 +101,11 @@ def _preprocess(self, frame: np.ndarray) -> np.ndarray: cropped = frame else: cropped = frame[ - self._area[0].y:self._area[1].y, - self._area[0].x:self._area[1].x, + self._area[0].y : self._area[1].y, + self._area[0].x : self._area[1].x, ] if self._downscale > 1: - return cropped[::self._downscale, ::self._downscale, :] + return cropped[:: self._downscale, :: self._downscale, :] return cropped @@ -114,7 +114,8 @@ def update(self, frame: np.ndarray) -> ProcessedFrame: subtracted = self._subtractor.apply(frame) if not self._regions: return ProcessedFrame( - subtracted=subtracted, masked=subtracted, score=np.average(subtracted)) + subtracted=subtracted, masked=subtracted, score=np.average(subtracted) + ) motion_mask = np.ma.array(subtracted, mask=self._mask) return ProcessedFrame( subtracted=subtracted, diff --git a/dvr_scan/opencv_loader.py b/dvr_scan/opencv_loader.py index f5fab43..f3b9151 100644 --- a/dvr_scan/opencv_loader.py +++ b/dvr_scan/opencv_loader.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -15,8 +14,9 @@ better error messaging in cases where the module isn't installed. """ +import importlib +import importlib.util import os -import sys # On Windows, make sure we include any required DLL paths. if os.name == "nt": @@ -29,10 +29,9 @@ # OpenCV is a required package, but we don't have it as an explicit dependency since we # need to support both opencv-python and opencv-python-headless. Include some additional # context with the exception if this is the case. -try: - import cv2 as _ -except ModuleNotFoundError as ex: + +if not importlib.util.find_spec("cv2"): raise ModuleNotFoundError( "OpenCV could not be found, try installing opencv-python:\n\npip install opencv-python", name="cv2", - ) from ex + ) diff --git a/dvr_scan/overlays.py b/dvr_scan/overlays.py index a829be0..cc09776 100644 --- a/dvr_scan/overlays.py +++ b/dvr_scan/overlays.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -21,7 +20,7 @@ import numpy -class TextOverlay(object): +class TextOverlay: """Renders text onto video frames, primarily used for drawing timecodes. Text is currently anchored to the top left of the frame. @@ -32,15 +31,15 @@ class Corner(Enum): TopRight = 2 def __init__( - self, - font: int = cv2.FONT_HERSHEY_SIMPLEX, - font_scale: float = 1.0, - margin: int = 4, - border: int = 4, - thickness: int = 2, - color: ty.Tuple[int, int, int] = (255, 255, 255), - bg_color: ty.Tuple[int, int, int] = (0, 0, 0), - corner: Corner = Corner.TopLeft, + self, + font: int = cv2.FONT_HERSHEY_SIMPLEX, + font_scale: float = 1.0, + margin: int = 4, + border: int = 4, + thickness: int = 2, + color: ty.Tuple[int, int, int] = (255, 255, 255), + bg_color: ty.Tuple[int, int, int] = (0, 0, 0), + corner: Corner = Corner.TopLeft, ): """Initialize a TextOverlay with the given parameters. @@ -117,7 +116,7 @@ def draw(self, frame: numpy.ndarray, text: str): y_offset += size[0][1] + line_spacing -class BoundingBoxOverlay(object): +class BoundingBoxOverlay: """Calculates and draws a bounding box onto of video frames based on a binary mask representing areas of interest/motion.""" @@ -184,8 +183,7 @@ def _get_smoothed_window(self) -> ty.Tuple[int, int, int, int]: """ assert self._smoothing_window return [ - round(sum([box[i] - for box in self._smoothing_window]) / len(self._smoothing_window)) + round(sum([box[i] for box in self._smoothing_window]) / len(self._smoothing_window)) for i in range(4) ] diff --git a/dvr_scan/platform.py b/dvr_scan/platform.py index 22bf54d..307d198 100644 --- a/dvr_scan/platform.py +++ b/dvr_scan/platform.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -14,11 +13,11 @@ Provides logging and platform/operating system compatibility. """ -from contextlib import contextmanager import logging import os import subprocess import sys +from contextlib import contextmanager from typing import AnyStr, Optional try: @@ -52,7 +51,7 @@ def get_icon_path() -> str: return "" -HAS_TKINTER = not tkinter is None +HAS_TKINTER = tkinter is not None IS_WINDOWS = os.name == "nt" @@ -65,7 +64,7 @@ def get_min_screen_bounds(): """Attempts to get the minimum screen resolution of all monitors using the `screeninfo` package. Returns the minimum of all monitor's heights and widths with 10% padding, or None if the package is unavailable.""" - if not screeninfo is None: + if screeninfo is not None: try: monitors = screeninfo.get_monitors() return ( @@ -141,9 +140,10 @@ def init_logger( # We still want to make sure we can tell the messages apart, so we add a short prefix [::]. format_str = "[DVR-Scan] :: %(message)s" if log_level == logging.DEBUG: - format_str = ("%(levelname)s: [scenedetect] %(module)s.%(funcName)s(): %(message)s") + format_str = "%(levelname)s: [scenedetect] %(module)s.%(funcName)s(): %(message)s" _init_logger_impl( - logging.getLogger("pyscenedetect"), log_level, format_str, show_stdout, log_file) + logging.getLogger("pyscenedetect"), log_level, format_str, show_stdout, log_file + ) return logging.getLogger("dvr_scan") diff --git a/dvr_scan/region.py b/dvr_scan/region.py index 13b8d43..53a99a0 100644 --- a/dvr_scan/region.py +++ b/dvr_scan/region.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -14,18 +13,18 @@ Regions are represented as a set of closed polygons defined by lists of points. """ +import math +import os +import typing as ty from collections import namedtuple from copy import deepcopy from dataclasses import dataclass from logging import getLogger -import math -import os -import typing as ty import cv2 import numpy as np -from dvr_scan.platform import HAS_TKINTER, IS_WINDOWS, temp_tk_window, set_icon +from dvr_scan.platform import HAS_TKINTER, IS_WINDOWS, set_icon, temp_tk_window if HAS_TKINTER: import tkinter @@ -68,7 +67,8 @@ def warn_if_tkinter_missing(): if not HAS_TKINTER: logger.warning( "Warning: Tkinter is not installed. Install the python3-tk package to ensure region " - "data is saved, or specify -s/--save-region.") + "data is saved, or specify -s/--save-region." + ) class RegionValidator: @@ -81,8 +81,10 @@ def __init__(self, value: str): translation_table = str.maketrans({char: " " for char in RegionValidator._IGNORE_CHARS}) values = value.translate(translation_table).split() if not all([val.isdigit() for val in values]): - raise ValueError("Regions can only contain numbers and the following characters:" - f" , / ( )\n Input: {value}") + raise ValueError( + "Regions can only contain numbers and the following characters:" + f" , / ( )\n Input: {value}" + ) if not len(values) % 2 == 0: raise ValueError(f"Could not parse region, missing X or Y component.\n Input: {value}") if not len(values) >= 6: @@ -226,7 +228,7 @@ def initial_point_list(frame_size: Size) -> ty.List[Point]: def squared_distance(a: Point, b: Point): - return (a.x - b.x)**2 + (a.y - b.y)**2 + return (a.x - b.x) ** 2 + (a.y - b.y) ** 2 def bound_point(point: Point, size: Size): @@ -235,19 +237,19 @@ def bound_point(point: Point, size: Size): def load_regions(path: ty.AnyStr) -> ty.Iterable[RegionValidator]: region_data = None - with open(path, "rt") as file: + with open(path) as file: region_data = file.readlines() if region_data: return list( RegionValidator(region).value - for region in filter(None, (region.strip() for region in region_data))) + for region in filter(None, (region.strip() for region in region_data)) + ) return [] # TODO(v1.7): Allow multiple polygons by adding new ones using keyboard. # TODO(v1.7): Allow shifting polygons by using middle mouse button. class RegionEditor: - def __init__( self, frame: np.ndarray, @@ -260,12 +262,12 @@ def __init__( # TODO: Move more fields from this class into `EditorSettings`. self._settings = EditorSettings(video_path=video_path, save_path=save_path) - self._source_frame = frame.copy() # Frame before downscaling + self._source_frame = frame.copy() # Frame before downscaling self._source_size = Size(w=frame.shape[1], h=frame.shape[0]) self._scale: int = 1 if initial_scale is None else initial_scale - self._frame = frame.copy() # Workspace + self._frame = frame.copy() # Workspace self._frame_size = Size(w=frame.shape[1], h=frame.shape[0]) - self._original_frame = frame.copy() # Copy to redraw on + self._original_frame = frame.copy() # Copy to redraw on if initial_shapes: self._regions = initial_shapes else: @@ -281,12 +283,12 @@ def __init__( self._dragging = False self._drag_start = None self._debug_mode = debug_mode - self._segment_dist = [] # Square distance of segment from point i to i+1 - self._mouse_dist = [] # Square distance of mouse to point i + self._segment_dist = [] # Square distance of segment from point i to i+1 + self._mouse_dist = [] # Square distance of mouse to point i if self._scale > 1: self._rescale() - self._persisted = True # Indicates if we've saved outstanding changes to disk. - self._commit(persisted=True) # Add initial history for undo. + self._persisted = True # Indicates if we've saved outstanding changes to disk. + self._commit(persisted=True) # Add initial history for undo. @property def shapes(self) -> ty.Iterable[ty.Iterable[Point]]: @@ -294,12 +296,15 @@ def shapes(self) -> ty.Iterable[ty.Iterable[Point]]: @property def active_region(self) -> ty.Optional[ty.List[Point]]: - return (self._regions[self._active_shape] if - (not self._active_shape is None and bool(self._regions)) else None) + return ( + self._regions[self._active_shape] + if (self._active_shape is not None and bool(self._regions)) + else None + ) def _rescale(self): assert self._scale > 0 - self._original_frame = self._source_frame[::self._scale, ::self._scale, :].copy() + self._original_frame = self._source_frame[:: self._scale, :: self._scale, :].copy() self._frame = self._original_frame.copy() self._frame_size = Size(w=self._frame.shape[1], h=self._frame.shape[0]) self._redraw = True @@ -336,7 +341,7 @@ def _commit(self, persisted=False): # rewriting history from that point. # Take a copy of the current state and put it in the history buffer. snapshot = deepcopy(Snapshot(regions=self._regions, active_shape=self._active_shape)) - self._history = self._history[self._history_pos:] + self._history = self._history[self._history_pos :] self._history.insert(0, snapshot) self._history = self._history[:MAX_HISTORY_SIZE] self._history_pos = 0 @@ -350,8 +355,7 @@ def _emit_points(self): for shape in self._regions: region_info.append("-a %s" % " ".join(f"{x} {y}" for x, y in shape)) data = " ".join(region_info) - logger.info("Command to scan region:\n" - f"dvr-scan -i {self._settings.video_path} {data}") + logger.info("Command to scan region:\n" f"dvr-scan -i {self._settings.video_path} {data}") def _draw(self): if self._recalculate: @@ -379,7 +383,9 @@ def _draw(self): points = points // self._scale line_type = ( cv2.LINE_AA - if self._settings.use_aa and self._scale <= MAX_DOWNSCALE_AA_LEVEL else cv2.LINE_4) + if self._settings.use_aa and self._scale <= MAX_DOWNSCALE_AA_LEVEL + else cv2.LINE_4 + ) # if not self._settings.mask_source: frame = cv2.polylines( @@ -390,18 +396,20 @@ def _draw(self): thickness=thickness, lineType=line_type, ) - if not self._hover_point is None and not self._settings.mask_source: + if self._hover_point is not None and not self._settings.mask_source: first, mid, last = ( (self._hover_point - 1) % len(self.active_region), self._hover_point, (self._hover_point + 1) % len(self.active_region), ) points = np.array( - [[ - self.active_region[first], - self.active_region[mid], - self.active_region[last], - ]], + [ + [ + self.active_region[first], + self.active_region[mid], + self.active_region[last], + ] + ], np.int32, ) if self._scale > 1: @@ -411,17 +419,23 @@ def _draw(self): points, isClosed=False, color=self._settings.hover_color - if not self._dragging else self._settings.hover_color_alt, + if not self._dragging + else self._settings.hover_color_alt, thickness=thickness_active, lineType=line_type, ) - elif (not self._nearest_points is None and self._settings.highlight_insert - and not self._settings.mask_source): + elif ( + self._nearest_points is not None + and self._settings.highlight_insert + and not self._settings.mask_source + ): points = np.array( - [[ - self.active_region[self._nearest_points[0]], - self.active_region[self._nearest_points[1]], - ]], + [ + [ + self.active_region[self._nearest_points[0]], + self.active_region[self._nearest_points[1]], + ] + ], np.int32, ) if self._scale > 1: @@ -435,19 +449,23 @@ def _draw(self): lineType=line_type, ) - if not self.active_region is None: + if self.active_region is not None: radius = control_handle_radius(self._scale) for i, point in enumerate(self.active_region): color = self._settings.line_color_alt - if not self._hover_point is None: + if self._hover_point is not None: if self._hover_point == i: color = ( self._settings.hover_color_alt - if not self._dragging else self._settings.interact_color) - elif not self._nearest_points is None and i in self._nearest_points: + if not self._dragging + else self._settings.interact_color + ) + elif self._nearest_points is not None and i in self._nearest_points: color = ( self._settings.hover_color - if self._dragging else self._settings.interact_color) + if self._dragging + else self._settings.interact_color + ) start, end = ( Point( (point.x // self._scale) - radius, @@ -478,15 +496,15 @@ def _find_nearest(self) -> ty.Tuple[int, int]: a_sq = min(self._mouse_dist[i], self._mouse_dist[next]) c_sq = max(self._mouse_dist[i], self._mouse_dist[next]) b_sq = self._segment_dist[i] - assert (a_sq > 0) # Should never hit this since we check _hovering_over first. + assert a_sq > 0 # Should never hit this since we check _hovering_over first. if b_sq == 0: - # Two adjacent points are overlapping, just skip this one. + # Two adjacent points are overlapping, just skip this one. continue a, b = math.sqrt(a_sq), math.sqrt(b_sq) cos_C = ((a_sq + b_sq) - c_sq) / (2.0 * a * b) - # If cos_C is between [0,1] the triangle is acute. If it's not, just take the distance - # of the closest point. - dist = int(a_sq - (int(a * cos_C)**2)) if cos_C > 0 else a_sq + # If cos_C is between [0,1] the triangle is acute. If it's not, just take the distance + # of the closest point. + dist = int(a_sq - (int(a * cos_C) ** 2)) if cos_C > 0 else a_sq if dist < nearest_dist or (dist == nearest_dist and cos_C > largest_cosine): nearest_seg, nearest_dist, largest_cosine = i, dist, cos_C last = self._settings.highlight_insert @@ -505,8 +523,12 @@ def _hovering_over(self) -> ty.Optional[int]: min_i = i # If we've shrunk the image, we need to compensate for the size difference in the image. # The control handles remain the same size but the image is smaller - return (min_i if self._mouse_dist[min_i] <= (4 * control_handle_radius(self._scale) * - self._scale)**2 else None) + return ( + min_i + if self._mouse_dist[min_i] + <= (4 * control_handle_radius(self._scale) * self._scale) ** 2 + else None + ) def _init_window(self): cv2.namedWindow(WINDOW_TITLE, self._settings.window_mode) @@ -565,8 +587,10 @@ def run(self) -> bool: self._draw() key = ( cv2.waitKey( - MAX_UPDATE_RATE_NORMAL if not self._dragging else MAX_UPDATE_RATE_DRAGGING) - & 0xFF) + MAX_UPDATE_RATE_NORMAL if not self._dragging else MAX_UPDATE_RATE_DRAGGING + ) + & 0xFF + ) if key == KEYCODE_ESCAPE: if self._prompt_save_on_quit(): break @@ -589,8 +613,12 @@ def _adjust_downscale(self, amount: int): # scale is clamped to MIN_DOWNSCALE_FACTOR/MAX_DOWNSCALE_FACTOR. scale = self._scale + amount self._scale = ( - MIN_DOWNSCALE_FACTOR if scale < MIN_DOWNSCALE_FACTOR else - scale if scale < MAX_DOWNSCALE_FACTOR else MAX_DOWNSCALE_FACTOR) + MIN_DOWNSCALE_FACTOR + if scale < MIN_DOWNSCALE_FACTOR + else scale + if scale < MAX_DOWNSCALE_FACTOR + else MAX_DOWNSCALE_FACTOR + ) logger.info(f"Downscale factor: {self._scale}") self._rescale() @@ -651,7 +679,7 @@ def _save(self, path=None): if not self._settings.save_path: return False path = self._settings.save_path - with open(path, "wt") as region_file: + with open(path, "w") as region_file: for shape in self._regions: region_file.write(" ".join(f"{x} {y}" for x, y in shape)) region_file.write("\n") @@ -700,7 +728,7 @@ def _prompt_load(self): self._active_shape = 0 if len(self._regions) > 0 else None def _delete_point(self): - if not self._hover_point is None and not self._dragging: + if self._hover_point is not None and not self._dragging: if len(self.active_region) > MIN_NUM_POINTS: hover = self._hover_point x, y = self.active_region[hover] @@ -737,9 +765,12 @@ def _toggle_window_mode(self): self._init_window() def _add_point(self) -> bool: - if not self._nearest_points is None: - insert_pos = (1 + self._nearest_points[0] if self._nearest_points[0] - < self._nearest_points[1] else self._nearest_points[1]) + if self._nearest_points is not None: + insert_pos = ( + 1 + self._nearest_points[0] + if self._nearest_points[0] < self._nearest_points[1] + else self._nearest_points[1] + ) insert_pos = insert_pos % len(self.active_region) self.active_region.insert(insert_pos, self._curr_mouse_pos) self._nearest_points = None @@ -788,8 +819,9 @@ def _handle_mouse_input(self, event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: if not self._regions: logger.info( - f"No regions to edit, add a new one by pressing {KEYBIND_REGION_ADD.upper()}.") - if not self._hover_point is None: + f"No regions to edit, add a new one by pressing {KEYBIND_REGION_ADD.upper()}." + ) + if self._hover_point is not None: self._dragging = True self._drag_start = self._curr_mouse_pos self._redraw = True @@ -806,9 +838,11 @@ def _handle_mouse_input(self, event, x, y, flags, param): elif event == cv2.EVENT_LBUTTONUP: if self._dragging: - assert not self._hover_point is None - if (len(self.active_region) != len(self._history[self._history_pos].regions) - or self._curr_mouse_pos != self._drag_start): + assert self._hover_point is not None + if ( + len(self.active_region) != len(self._history[self._history_pos].regions) + or self._curr_mouse_pos != self._drag_start + ): self.active_region[self._hover_point] = self._curr_mouse_pos x, y = self.active_region[self._hover_point] logger.debug("Add: [%d] = %s", self._hover_point, f"P({x},{y})") @@ -816,7 +850,7 @@ def _handle_mouse_input(self, event, x, y, flags, param): self._redraw = True self._dragging = False - elif (event == cv2.EVENT_MBUTTONDOWN or IS_WINDOWS and event == cv2.EVENT_RBUTTONDOWN): + elif event == cv2.EVENT_MBUTTONDOWN or IS_WINDOWS and event == cv2.EVENT_RBUTTONDOWN: self._delete_point() # Only draw again if we aren't dragging (too many events to draw on each one), or if diff --git a/dvr_scan/scanner.py b/dvr_scan/scanner.py index 4f05d7f..27e2c47 100644 --- a/dvr_scan/scanner.py +++ b/dvr_scan/scanner.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -14,8 +13,6 @@ Contains the motion scanning engine (`MotionScanner`) for DVR-Scan. """ -from dataclasses import dataclass -from enum import Enum import logging import os import os.path @@ -23,6 +20,8 @@ import subprocess import sys import threading +from dataclasses import dataclass +from enum import Enum from typing import Any, AnyStr, List, Optional, Tuple, Union import cv2 @@ -34,8 +33,8 @@ from dvr_scan.detector import MotionDetector from dvr_scan.overlays import BoundingBoxOverlay, TextOverlay from dvr_scan.platform import get_filename, get_min_screen_bounds, is_ffmpeg_available -from dvr_scan.region import RegionEditor, Point, Size, bound_point, load_regions -from dvr_scan.subtractor import SubtractorMOG2, SubtractorCNT, SubtractorCudaMOG2 +from dvr_scan.region import Point, RegionEditor, Size, bound_point, load_regions +from dvr_scan.subtractor import SubtractorCNT, SubtractorCudaMOG2, SubtractorMOG2 from dvr_scan.video_joiner import VideoJoiner logger = logging.getLogger("dvr_scan") @@ -225,54 +224,50 @@ def __init__( # Scan state and options they come from: # Output Parameters (set_output) - self._comp_file: Optional[AnyStr] = None # -o/--output - self._mask_file: Optional[AnyStr] = None # -mo/--mask-output - self._fourcc: Any = None # opencv-codec - self._output_mode: OutputMode = None # -m/--output-mode / -so/--scan-only - self._ffmpeg_input_args: Optional[str] = ( - None # input args for OutputMode.FFMPEG/COPY - ) - self._ffmpeg_output_args: Optional[str] = ( - None # output args for OutputMode.FFMPEG - ) - self._output_dir: AnyStr = "" # -d/--directory - # TODO: Replace uses of self._output_dir with - # a helper function called "get_output_path". + self._comp_file: Optional[AnyStr] = None # -o/--output + self._mask_file: Optional[AnyStr] = None # -mo/--mask-output + self._fourcc: Any = None # opencv-codec + self._output_mode: OutputMode = None # -m/--output-mode / -so/--scan-only + self._ffmpeg_input_args: Optional[str] = None # input args for OutputMode.FFMPEG/COPY + self._ffmpeg_output_args: Optional[str] = None # output args for OutputMode.FFMPEG + self._output_dir: AnyStr = "" # -d/--directory + # TODO: Replace uses of self._output_dir with + # a helper function called "get_output_path". # Overlay Parameters (set_overlays) - self._timecode_overlay = None # -tc/--time-code, None or TextOverlay + self._timecode_overlay = None # -tc/--time-code, None or TextOverlay self._metrics_overlay = None # -fm/--frame-metrics, None or TextOverlay - self._bounding_box = None # -bb/--bounding-box, None or BoundingBoxOverlay + self._bounding_box = None # -bb/--bounding-box, None or BoundingBoxOverlay # Motion Detection Parameters (set_detection_params) - self._subtractor_type = DetectorType.MOG2 # -b/--bg-subtractor - self._threshold = 0.15 # -t/--threshold - self._variance_threshold = 16.0 # variance-threshold - self._kernel_size = None # -k/--kernel-size - self._downscale_factor = 1 # -df/--downscale-factor - self._learning_rate = -1 # learning-rate - self._max_threshold = 255.0 # max-threshold + self._subtractor_type = DetectorType.MOG2 # -b/--bg-subtractor + self._threshold = 0.15 # -t/--threshold + self._variance_threshold = 16.0 # variance-threshold + self._kernel_size = None # -k/--kernel-size + self._downscale_factor = 1 # -df/--downscale-factor + self._learning_rate = -1 # learning-rate + self._max_threshold = 255.0 # max-threshold # Motion Event Parameters (set_event_params) self._min_event_len = None # -l/--min-event-length self._pre_event_len = None # -tb/--time-before-event - self._post_event_len = None # -tp/--time-post-event - self._use_pts = None # --use_pts + self._post_event_len = None # -tp/--time-post-event + self._use_pts = None # --use_pts # Region Parameters (set_region) - self._region_editor = False # -w/--region-window - self._regions: List[List[Point]] = [] # -a/--add-region, -w/--region-window - self._load_region: Optional[str] = None # -R/--load-region - self._save_region: Optional[str] = None # -s/--save-region + self._region_editor = False # -w/--region-window + self._regions: List[List[Point]] = [] # -a/--add-region, -w/--region-window + self._load_region: Optional[str] = None # -R/--load-region + self._save_region: Optional[str] = None # -s/--save-region self._max_roi_size_deprecated = None self._show_roi_window_deprecated = False self._roi_deprecated = None # Input Video Parameters (set_video_time) - self._input: VideoJoiner = VideoJoiner(input_videos) # -i/--input - self._frame_skip: int = frame_skip # -fs/--frame-skip - self._start_time: FrameTimecode = None # -st/--start-time - self._end_time: FrameTimecode = None # -et/--end-time + self._input: VideoJoiner = VideoJoiner(input_videos) # -i/--input + self._frame_skip: int = frame_skip # -fs/--frame-skip + self._start_time: FrameTimecode = None # -st/--start-time + self._end_time: FrameTimecode = None # -et/--end-time # Internal Variables self._stop: threading.Event = threading.Event() @@ -347,14 +342,15 @@ def set_output( if not isinstance(output_mode, OutputMode): output_mode = OutputMode[output_mode.upper().replace("-", "_")] if len(self._input.paths) > 1 and output_mode not in ( - OutputMode.SCAN_ONLY, - OutputMode.OPENCV, + OutputMode.SCAN_ONLY, + OutputMode.OPENCV, ): raise ValueError( - "input concatenation is only supported in `scan-only` or `opencv` mode.") + "input concatenation is only supported in `scan-only` or `opencv` mode." + ) if comp_file is not None and output_mode != OutputMode.OPENCV: raise ValueError("output to single file is only supported with mode `opencv`") - if (output_mode in (OutputMode.FFMPEG, OutputMode.COPY) and not is_ffmpeg_available()): + if output_mode in (OutputMode.FFMPEG, OutputMode.COPY) and not is_ffmpeg_available(): raise ValueError("ffmpeg is required to use output mode FFMPEG/COPY") self._comp_file = comp_file self._mask_file = mask_file @@ -486,8 +482,9 @@ def set_video_time( def _handle_regions(self) -> bool: # TODO(v2.0): Remove deprecated ROI selection handlers. - if (self._show_roi_window_deprecated) and (self._load_region or self._regions - or self._region_editor): + if (self._show_roi_window_deprecated) and ( + self._load_region or self._regions or self._region_editor + ): raise ValueError("Use -r/--region-editor instead of -roi.") if not self._select_roi_deprecated(): return @@ -497,7 +494,8 @@ def _handle_regions(self) -> bool: logger.error(f"File does not exist: {self._load_region}") raise ValueError( "Could not find specified region file. Ensure the specified path is valid " - "and the file exists.") + "and the file exists." + ) try: logger.info(f"Loading regions from file: {self._load_region}") regions = load_regions(self._load_region) @@ -515,9 +513,10 @@ def _handle_regions(self) -> bool: ) self._regions += regions if self._regions: - self._regions = [[bound_point(point, Size(*self._input.resolution)) - for point in shape] - for shape in self._regions] + self._regions = [ + [bound_point(point, Size(*self._input.resolution)) for point in shape] + for shape in self._regions + ] if self._region_editor: logger.info("Selecting area of interest:") # TODO(v1.7): Ensure ROI window respects start time if set. @@ -525,14 +524,14 @@ def _handle_regions(self) -> bool: frame_for_crop = self._input.read() scale_factor = 1 screen_bounds = get_min_screen_bounds() - if not screen_bounds is None: + if screen_bounds is not None: max_w, max_h = screen_bounds[1], screen_bounds[0] frame_w, frame_h = frame_for_crop.shape[1], frame_for_crop.shape[0] if (max_h > 0 and frame_h > max_h) or (max_w > 0 and frame_w > max_w): logger.debug("Max window size: %d x %d", max_w, max_h) # Downscale the image if it's too large for the screen. - factor_h = (frame_h / float(max_h) if max_h > 0 and frame_h > max_h else 1) - factor_w = (frame_w / float(max_w) if max_w > 0 and frame_w > max_w else 1) + factor_h = frame_h / float(max_h) if max_h > 0 and frame_h > max_h else 1 + factor_w = frame_w / float(max_w) if max_w > 0 and frame_w > max_w else 1 scale_factor = round(max(factor_h, factor_w)) regions = RegionEditor( frame=frame_for_crop, @@ -547,23 +546,30 @@ def _handle_regions(self) -> bool: self._regions = list(regions.shapes) elif self._save_region: regions = ( - self._regions if self._regions else [[ - Point(0, 0), - Point(self._input.resolution[0] - 1, 0), - Point(self._input.resolution[0] - 1, self._input.resolution[1] - 1), - Point(0, self._input.resolution[1] - 1), - ]]) + self._regions + if self._regions + else [ + [ + Point(0, 0), + Point(self._input.resolution[0] - 1, 0), + Point(self._input.resolution[0] - 1, self._input.resolution[1] - 1), + Point(0, self._input.resolution[1] - 1), + ] + ] + ) path = self._save_region if self._output_dir: path = os.path.join(self._output_dir, path) - with open(path, "wt") as region_file: + with open(path, "w") as region_file: for shape in self._regions: region_file.write(" ".join(f"{x} {y}" for x, y in shape)) region_file.write("\n") logger.info(f"Saved region data to: {path}") if self._regions: - logger.info(f"Limiting detection to {len(self._regions)} " - f"region{'s' if len(self._regions) > 1 else ''}.") + logger.info( + f"Limiting detection to {len(self._regions)} " + f"region{'s' if len(self._regions) > 1 else ''}." + ) else: logger.debug("No regions selected.") return True @@ -616,8 +622,9 @@ def scan(self) -> Optional[DetectionResult]: if self._kernel_size == -1: # Calculate size of noise reduction kernel. Even if an ROI is set, the auto factor is # set based on the original video's input resolution. - kernel_size = _recommended_kernel_size(self._input.resolution[0], - self._downscale_factor) + kernel_size = _recommended_kernel_size( + self._input.resolution[0], self._downscale_factor + ) else: kernel_size = _scale_kernel_size(self._kernel_size, self._downscale_factor) @@ -636,7 +643,8 @@ def scan(self) -> Optional[DetectionResult]: ) logger.info( - "Using subtractor %s with kernel_size = %s%s, variance_threshold = %s and learning_rate = %s", + "Using subtractor %s with kernel_size = %s%s, " + "variance_threshold = %s and learning_rate = %s", self._subtractor_type.name, str(kernel_size) if kernel_size else "off", " (auto)" if self._kernel_size == -1 else "", @@ -658,10 +666,12 @@ def scan(self) -> Optional[DetectionResult]: # important as this affects the number of frames we consider for the actual motion event. if not self._use_pts: start_event_shift: int = self._pre_event_len.frame_num + min_event_len * ( - self._frame_skip + 1) + self._frame_skip + 1 + ) else: - start_event_shift_ms: float = (self._pre_event_len.get_seconds() + - self._min_event_len.get_seconds()) * 1000 + start_event_shift_ms: float = ( + self._pre_event_len.get_seconds() + self._min_event_len.get_seconds() + ) * 1000 # Length of buffer we require in memory to keep track of all frames required for -l and -tb. buff_len = pre_event_len + min_event_len @@ -685,16 +695,17 @@ def scan(self) -> Optional[DetectionResult]: # main scanning loop below, otherwise it will interrupt the progress bar. logger.info( "Scanning %s for motion events...", - "%d input videos" % - len(self._input.paths) if len(self._input.paths) > 1 else "input video", + "%d input videos" % len(self._input.paths) + if len(self._input.paths) > 1 + else "input video", ) - progress_bar = ( - FakeTqdmObject() if not self._show_progress else self._create_progress_bar()) + progress_bar = FakeTqdmObject() if not self._show_progress else self._create_progress_bar() decode_queue = queue.Queue(MAX_DECODE_QUEUE_SIZE) decode_thread = threading.Thread( - target=MotionScanner._decode_thread, args=(self, decode_queue), daemon=True) + target=MotionScanner._decode_thread, args=(self, decode_queue), daemon=True + ) decode_thread.start() encode_thread = None @@ -721,7 +732,8 @@ def scan(self) -> Optional[DetectionResult]: video_res = self._input.resolution logger.warn( f"WARNING: Frame {time.frame_num} [{time.get_timecode()}] has unexpected size: " - f"{frame_size[0]}x{frame_size[1]}, expected {video_res[0]}x{video_res[1]}") + f"{frame_size[0]}x{frame_size[1]}, expected {video_res[0]}x{video_res[1]}" + ) result = detector.update(frame.frame_bgr) frame_score = result.score # TODO(1.7): Allow disabling the rejection filter or customizing amount of @@ -749,7 +761,9 @@ def scan(self) -> Optional[DetectionResult]: if self._bounding_box: bounding_box = ( self._bounding_box.update(result.subtracted) - if above_threshold else self._bounding_box.clear()) + if above_threshold + else self._bounding_box.clear() + ) if self._mask_file and not self._stop.is_set(): encode_queue.put( @@ -758,7 +772,8 @@ def scan(self) -> Optional[DetectionResult]: timecode=frame.timecode, score=frame_score, bounding_box=bounding_box, - )) + ) + ) # Last frame was part of a motion event, or still within the post-event window. if in_motion_event: @@ -779,17 +794,22 @@ def scan(self) -> Optional[DetectionResult]: if num_frames_post_event >= post_event_len: in_motion_event = False - logger.debug("event %d high score %f" % - (1 + self._num_events, self._highscore)) + logger.debug( + "event %d high score %f" % (1 + self._num_events, self._highscore) + ) if self._thumbnails == "highscore": video_name = get_filename( - path=self._input.paths[0], include_extension=False) + path=self._input.paths[0], include_extension=False + ) output_path = ( - self._comp_file if self._comp_file else OUTPUT_FILE_TEMPLATE.format( + self._comp_file + if self._comp_file + else OUTPUT_FILE_TEMPLATE.format( VIDEO_NAME=video_name, EVENT_NUMBER="%04d" % (1 + self._num_events), EXTENSION="jpg", - )) + ) + ) if self._output_dir: output_path = os.path.join(self._output_dir, output_path) cv2.imwrite(output_path, self._highframe) @@ -802,15 +822,17 @@ def scan(self) -> Optional[DetectionResult]: # We also add 1 to include the presentation duration of the last frame. if not self._use_pts: event_end = FrameTimecode( - 1 + last_frame_above_threshold + self._post_event_len.frame_num + - self._frame_skip, + 1 + + last_frame_above_threshold + + self._post_event_len.frame_num + + self._frame_skip, self._input.framerate, ) assert event_end.frame_num >= event_start.frame_num else: event_end = FrameTimecode( - (last_frame_above_threshold_ms / 1000) + - self._post_event_len.get_seconds(), + (last_frame_above_threshold_ms / 1000) + + self._post_event_len.get_seconds(), self._input.framerate, ) assert event_end.get_seconds() >= event_start.get_seconds() @@ -826,7 +848,8 @@ def scan(self) -> Optional[DetectionResult]: timecode=frame.timecode, bounding_box=bounding_box, score=frame_score, - )) + ) + ) # Not already in a motion event, look for a new one. else: # Buffer the required amount of frames and overlay data until we find an event. @@ -837,23 +860,27 @@ def scan(self) -> Optional[DetectionResult]: timecode=frame.timecode, bounding_box=bounding_box, score=frame_score, - )) + ) + ) buffered_frames = buffered_frames[-buff_len:] # Start a new event once all frames in the event window have motion. if len(event_window) >= min_event_len and all( - score >= self._threshold for score in event_window): + score >= self._threshold for score in event_window + ): in_motion_event = True progress_bar.set_description( - PROGRESS_BAR_DESCRIPTION % (1 + len(event_list)), refresh=False) + PROGRESS_BAR_DESCRIPTION % (1 + len(event_list)), refresh=False + ) event_window = [] num_frames_post_event = 0 - frames_since_last_event = (frame.timecode.frame_num - event_end.frame_num) + frames_since_last_event = frame.timecode.frame_num - event_end.frame_num last_frame_above_threshold = frame.timecode.frame_num if not self._use_pts: shift_amount = min(frames_since_last_event, start_event_shift) - shifted_start = max(start_frame, - frame.timecode.frame_num + 1 - shift_amount) + shifted_start = max( + start_frame, frame.timecode.frame_num + 1 - shift_amount + ) event_start = FrameTimecode(shifted_start, self._input.framerate) else: ms_since_last_event = pts - (event_end.get_seconds() * 1000) @@ -900,11 +927,14 @@ def scan(self) -> Optional[DetectionResult]: if self._thumbnails == "highscore": video_name = get_filename(path=self._input.paths[0], include_extension=False) output_path = ( - self._comp_file if self._comp_file else OUTPUT_FILE_TEMPLATE.format( + self._comp_file + if self._comp_file + else OUTPUT_FILE_TEMPLATE.format( VIDEO_NAME=video_name, EVENT_NUMBER="%04d" % (1 + self._num_events), EXTENSION="jpg", - )) + ) + ) if self._output_dir: output_path = os.path.join(self._output_dir, output_path) cv2.imwrite(output_path, self._highframe) @@ -920,7 +950,8 @@ def scan(self) -> Optional[DetectionResult]: encode_thread.join() if self._encode_thread_exception is not None: raise self._encode_thread_exception[1].with_traceback( - self._encode_thread_exception[2]) + self._encode_thread_exception[2] + ) # Display an error if we got more than one decode failure / corrupt frame. # TODO: This will also fire if no frames are decoded. Add a check to make sure @@ -939,7 +970,7 @@ def scan(self) -> Optional[DetectionResult]: def _decode_thread(self, decode_queue: queue.Queue): try: while not self._stop.is_set(): - if (self._end_time is not None and self._input.position >= self._end_time): + if self._end_time is not None and self._input.position >= self._end_time: break for _ in range(self._frame_skip): if self._input.read(decode=False) is None: @@ -956,14 +987,14 @@ def _decode_thread(self, decode_queue: queue.Queue): fps=self._input.framerate, ) else: - presentation_time = FrameTimecode(self._input.position_ms / 1000, - self._input.framerate) + presentation_time = FrameTimecode( + self._input.position_ms / 1000, self._input.framerate + ) if not self._stop.is_set(): decode_queue.put(DecodeEvent(frame_bgr, presentation_time)) # We'll re-raise any exceptions from the main thread. - # pylint: disable=bare-except - except: + except: # noqa: E722 self._stop.set() logger.critical("Fatal error: Exception raised in decode thread.") logger.debug(sys.exc_info()) @@ -971,15 +1002,16 @@ def _decode_thread(self, decode_queue: queue.Queue): finally: # Make sure main thread stops processing loop. decode_queue.put(None) - # pylint: enable=bare-except def _init_video_writer(self, path: AnyStr, frame_size: Tuple[int, int]) -> cv2.VideoWriter: """Create a new cv2.VideoWriter using the correct framerate.""" if self._output_dir: path = os.path.join(self._output_dir, path) effective_framerate = ( - self._input.framerate if self._frame_skip < 1 else self._input.framerate / - (1 + self._frame_skip)) + self._input.framerate + if self._frame_skip < 1 + else self._input.framerate / (1 + self._frame_skip) + ) return cv2.VideoWriter(path, self._fourcc, effective_framerate, frame_size) def _on_encode_frame_event(self, event: EncodeFrameEvent): @@ -989,17 +1021,21 @@ def _on_encode_frame_event(self, event: EncodeFrameEvent): video = self._input.resolution logger.warn( f"WARNING: Failed to write event at frame {time.frame_num} [{time.get_timecode()}] " - f"due to size mismatch: {size[0]}x{size[1]}, expected {video[0]}x{video[1]}") + f"due to size mismatch: {size[0]}x{size[1]}, expected {video[0]}x{video[1]}" + ) return if self._video_writer is None: # Use the first input video name as a filename template. video_name = get_filename(path=self._input.paths[0], include_extension=False) output_path = ( - self._comp_file if self._comp_file else OUTPUT_FILE_TEMPLATE.format( + self._comp_file + if self._comp_file + else OUTPUT_FILE_TEMPLATE.format( VIDEO_NAME=video_name, EVENT_NUMBER="%04d" % (1 + self._num_events), EXTENSION="avi", - )) + ) + ) self._video_writer = self._init_video_writer(output_path, size) # *NOTE*: Overlays are currently rendered in-place by modifying the event itself. self._draw_overlays(event.frame_bgr, event.timecode, event.score, event.bounding_box) @@ -1015,15 +1051,15 @@ def _draw_overlays( bounding_box: Optional[Tuple[int, int, int, int]], use_shift=True, ): - if not self._timecode_overlay is None: + if self._timecode_overlay is not None: self._timecode_overlay.draw(frame, text=timecode.get_timecode()) - if not self._metrics_overlay is None: + if self._metrics_overlay is not None: to_display = "Frame: %04d\nScore: %3.2f" % ( timecode.get_frames(), frame_score, ) self._metrics_overlay.draw(frame, text=to_display) - if not self._bounding_box is None and not bounding_box is None: + if self._bounding_box is not None and bounding_box is not None: self._bounding_box.draw(frame, bounding_box, use_shift) def _on_mask_event(self, event: MotionMaskEvent): @@ -1038,11 +1074,13 @@ def _on_mask_event(self, event: MotionMaskEvent): logger.warn( f"WARNING: Failed to write mask at frame {time.frame_num} [{time.get_timecode()}] " f"due to size mismatch: {size[0]}x{size[1]}, " - f" expected {self._mask_size[0]}x{self._mask_size[1]}") + f" expected {self._mask_size[0]}x{self._mask_size[1]}" + ) return out_frame = cv2.cvtColor(event.motion_mask, cv2.COLOR_GRAY2BGR) self._draw_overlays( - out_frame, event.timecode, event.score, event.bounding_box, use_shift=False) + out_frame, event.timecode, event.score, event.bounding_box, use_shift=False + ) self._mask_writer.write(out_frame) def _on_motion_event(self, event: MotionEvent): @@ -1058,15 +1096,20 @@ def _on_motion_event(self, event: MotionEvent): # Output motion event using Ffmpeg. output_args = ( self._ffmpeg_output_args - if self._output_mode == OutputMode.FFMPEG else COPY_MODE_OUTPUT_ARGS) + if self._output_mode == OutputMode.FFMPEG + else COPY_MODE_OUTPUT_ARGS + ) # Use the first input video name as a filename template. video_name = get_filename(path=self._input.paths[0], include_extension=False) output_path = ( - self._comp_file if self._comp_file else OUTPUT_FILE_TEMPLATE.format( + self._comp_file + if self._comp_file + else OUTPUT_FILE_TEMPLATE.format( VIDEO_NAME=video_name, EVENT_NUMBER="%04d" % self._num_events, EXTENSION="mp4", - )) + ) + ) if self._output_dir: output_path = os.path.join(self._output_dir, output_path) # Only log the args passed to ffmpeg on the first event, to reduce log spam. @@ -1087,8 +1130,9 @@ def _on_motion_event(self, event: MotionEvent): def _encode_thread(self, encode_queue: queue.Queue): try: while True: - event: Optional[Union[EncodeFrameEvent, MotionMaskEvent, - MotionEvent]] = encode_queue.get() + event: Optional[Union[EncodeFrameEvent, MotionMaskEvent, MotionEvent]] = ( + encode_queue.get() + ) if event is None: break if isinstance(event, EncodeFrameEvent): @@ -1098,8 +1142,7 @@ def _encode_thread(self, encode_queue: queue.Queue): elif isinstance(event, MotionEvent): self._on_motion_event(event) # We'll re-raise any exceptions from the main thread. - # pylint: disable=bare-except - except: + except: # noqa: E722 self._stop.set() logger.critical("Fatal error: Exception raised in encode thread.") logger.debug(sys.exc_info()) @@ -1112,7 +1155,6 @@ def _encode_thread(self, encode_queue: queue.Queue): # Unblock any waiting puts if we stopped early. while not encode_queue.empty(): _ = encode_queue.get_nowait() - # pylint: enable=bare-except # TODO(v2.0): Remove deprecated function, replaced by Region Editor. def _select_roi_deprecated(self) -> bool: @@ -1120,7 +1162,8 @@ def _select_roi_deprecated(self) -> bool: if self._show_roi_window_deprecated: logger.warning( "**WARNING**: -roi/--region-of-interest is deprecated and will be removed.\n\n" - "Use -r/--region-editor instead.\n") + "Use -r/--region-editor instead.\n" + ) logger.info("Selecting area of interest:") # TODO: We should process this frame. frame_for_crop = self._input.read() @@ -1171,6 +1214,7 @@ def _select_roi_deprecated(self) -> bool: "**WARNING**: region-of-interest (-roi) is deprecated and will be removed.\n\n" "You can use the following equivalent region:\n" f"--add-region {region_arg}\n" - "For config files, save this region to a file and set the load-region option.\n") + "For config files, save this region to a file and set the load-region option.\n" + ) self._regions += [region] return True diff --git a/dvr_scan/subtractor.py b/dvr_scan/subtractor.py index 49ee87e..e7ad221 100644 --- a/dvr_scan/subtractor.py +++ b/dvr_scan/subtractor.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -58,7 +57,8 @@ def __init__( if kernel_size < 0 or (kernel_size > 1 and kernel_size % 2 == 0): raise ValueError("kernel_size must be >= 0") self._kernel = ( - numpy.ones((kernel_size, kernel_size), numpy.uint8) if kernel_size > 1 else None) + numpy.ones((kernel_size, kernel_size), numpy.uint8) if kernel_size > 1 else None + ) self._subtractor = cv2.createBackgroundSubtractorMOG2( history=history, varThreshold=variance_threshold, @@ -71,7 +71,7 @@ def __init__( def apply(self, frame: numpy.ndarray) -> numpy.ndarray: frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame_mask = self._subtractor.apply(frame_gray, learningRate=self._learning_rate) - if not self._kernel is None: + if self._kernel is not None: frame_filt = cv2.morphologyEx(frame_mask, cv2.MORPH_OPEN, self._kernel) else: frame_filt = frame_mask @@ -98,7 +98,8 @@ def __init__( if kernel_size < 0 or (kernel_size > 1 and kernel_size % 2 == 0): raise ValueError("kernel_size must be odd integer >= 1 or zero (0)") self._kernel = ( - numpy.ones((kernel_size, kernel_size), numpy.uint8) if kernel_size > 1 else None) + numpy.ones((kernel_size, kernel_size), numpy.uint8) if kernel_size > 1 else None + ) self._subtractor = cv2.bgsegm.createBackgroundSubtractorCNT( minPixelStability=min_pixel_stability, useHistory=use_history, @@ -130,7 +131,10 @@ def __init__( cv2.MORPH_OPEN, cv2.CV_8UC1, numpy.ones((kernel_size, kernel_size), numpy.uint8), - ) if kernel_size > 1 else None) + ) + if kernel_size > 1 + else None + ) self._subtractor = cv2.cuda.createBackgroundSubtractorMOG2( history=history, varThreshold=variance_threshold, @@ -146,7 +150,7 @@ def apply(self, frame: numpy.ndarray) -> numpy.ndarray: frame_bgr_dev.upload(frame, stream=stream) frame_gray_dev = cv2.cuda.cvtColor(frame_bgr_dev, cv2.COLOR_BGR2GRAY, stream=stream) frame_mask_dev = self._subtractor.apply(frame_gray_dev, self._learning_rate, stream=stream) - if not self._filter is None: + if self._filter is not None: frame_filt_dev = self._filter.apply(frame_mask_dev, stream=stream) else: frame_filt_dev = frame_mask_dev diff --git a/dvr_scan/video_joiner.py b/dvr_scan/video_joiner.py index b6bdf9a..d18def7 100644 --- a/dvr_scan/video_joiner.py +++ b/dvr_scan/video_joiner.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -97,8 +96,9 @@ def read(self, decode: bool = True) -> Optional[numpy.ndarray]: # Compensate for presentation time of last frame self._position += 1 self._decode_failures += self._cap._decode_failures - logger.debug("End of current video, loading next: %s" % - self._paths[self._curr_cap_index]) + logger.debug( + "End of current video, loading next: %s" % self._paths[self._curr_cap_index] + ) self._cap = VideoStreamCv2(self._paths[self._curr_cap_index]) self._last_cap_pos = self._cap.base_timecode return self.read(decode=decode) @@ -112,7 +112,7 @@ def read(self, decode: bool = True) -> Optional[numpy.ndarray]: def seek(self, target: FrameTimecode): """Seek to the target offset. Only seeking forward is supported (i.e. `target` must be greater than the current `position`.""" - if (len(self._paths) == 1 or self._curr_cap_index == 0 and target <= self._cap.duration): + if len(self._paths) == 1 or self._curr_cap_index == 0 and target <= self._cap.duration: self._cap.seek(target) else: # TODO: This is ineffient if we have multiple input videos. @@ -128,7 +128,7 @@ def _load_input_videos(self): video_name = os.path.basename(video_path) try: cap = VideoStreamCv2(video_path) - except VideoOpenFailure as ex: + except VideoOpenFailure: logger.error("Error: Couldn't load video %s", video_path) raise validated_paths.append(video_path) @@ -157,14 +157,17 @@ def _load_input_videos(self): logger.error("Error: Video resolution does not match the first input.") raise VideoOpenFailure("Video resolutions must match to be concatenated!") if abs(cap.frame_rate - self._cap.frame_rate) > FRAMERATE_DELTA_TOLERANCE: - logger.warning("Warning: framerate does not match first input." - " Timecodes may be incorrect.") + logger.warning( + "Warning: framerate does not match first input." " Timecodes may be incorrect." + ) if round(cap.capture.get(cv2.CAP_PROP_FOURCC)) == 0: unsupported_codec = True self._paths = validated_paths if unsupported_codec: - logger.error("Unsupported or invalid codec, output may be incorrect. Possible fixes:\n" - " - Re-encode the input video with ffmpeg\n" - " - Update OpenCV (pip install --upgrade opencv-python)") + logger.error( + "Unsupported or invalid codec, output may be incorrect. Possible fixes:\n" + " - Re-encode the input video with ffmpeg\n" + " - Update OpenCV (pip install --upgrade opencv-python)" + ) diff --git a/pyproject.toml b/pyproject.toml index fed528d..36cff7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,45 @@ +# +# DVR-Scan: Video Motion Event Detection & Extraction Tool +# -------------------------------------------------------------- +# [ Site: https://www.dvr-scan.com/ ] +# [ Repo: https://github.com/Breakthrough/DVR-Scan ] +# +# Copyright (C) 2014-2024 Brandon Castellano . +# DVR-Scan is licensed under the BSD 2-Clause License; see the included +# LICENSE file, or visit one of the above pages for details. +# [build-system] requires = ["setuptools"] build-backend = "setuptools.build_meta" + +[tool.ruff] +exclude = [ + "docs" +] +line-length = 100 +indent-width = 4 + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +skip-magic-trailing-comma = false +docstring-code-format = true + +[tool.ruff.lint] +select = [ + # flake8-bugbear + "B", + # pycodestyle + "E", + # Pyflakes + "F", + # isort + "I", + # TODO - Add additional rule sets (https://docs.astral.sh/ruff/rules/): + # pyupgrade + #"UP", + # flake8-simplify + #"SIM", +] +fixable = ["ALL"] +unfixable = [] diff --git a/setup.py b/setup.py index 6e504a1..0b131c3 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- diff --git a/tests/__init__.py b/tests/__init__.py index 07fbdbf..ad74ca3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index 3ada3b0..3189e19 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -16,6 +15,7 @@ """ import os + import pytest # @@ -31,8 +31,9 @@ def get_absolute_path(relative_path: str) -> str: """ abs_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), relative_path) if not os.path.exists(abs_path): - raise FileNotFoundError("Test video file (%s) must be present to run test case!" % - relative_path) + raise FileNotFoundError( + "Test video file (%s) must be present to run test case!" % relative_path + ) return abs_path diff --git a/tests/test_cli.py b/tests/test_cli.py index 937503a..1167e1b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -20,12 +19,9 @@ from typing import List import pytest - -# We need to import the OpenCV loader before PySceneDetect as the latter imports OpenCV. -# pylint: disable=wrong-import-order, unused-import, ungrouped-imports -from dvr_scan import opencv_loader as _ from scenedetect.video_splitter import is_ffmpeg_available +# We need to import the OpenCV loader before PySceneDetect as the latter imports OpenCV. from dvr_scan.subtractor import SubtractorCNT, SubtractorCudaMOG2 MACHINE_ARCH = platform.machine().upper() @@ -68,11 +64,15 @@ # On some ARM chips (e.g. Apple M1), results are slightly different, so we allow a 1 frame # delta on the events for those platforms. -BASE_COMMAND_TIMECODE_LIST_GOLDEN = (""" +BASE_COMMAND_TIMECODE_LIST_GOLDEN = ( + """ 00:00:00.400,00:00:05.960,00:00:14.320,00:00:19.640,00:00:21.680,00:00:23.040 -"""[1:] if not ("ARM" in MACHINE_ARCH or "AARCH" in MACHINE_ARCH) else """ +"""[1:] + if not ("ARM" in MACHINE_ARCH or "AARCH" in MACHINE_ARCH) + else """ 00:00:00.400,00:00:06.000,00:00:14.320,00:00:19.640,00:00:21.680,00:00:23.040 -"""[1:]) +"""[1:] +) def test_info_commands(): @@ -85,7 +85,9 @@ def test_info_commands(): def test_default(tmp_path): """Test with all default arguments.""" output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND + [ + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ "--output-dir", tmp_path, ], @@ -94,10 +96,8 @@ def test_default(tmp_path): # Make sure the correct # of events were detected. assert "Detected %d motion events in input." % (BASE_COMMAND_NUM_EVENTS) in output - assert (BASE_COMMAND_EVENT_LIST_GOLDEN - in output), "Output event list does not match test golden." - assert (BASE_COMMAND_TIMECODE_LIST_GOLDEN - in output), "Output timecodes do not match test golden." + assert BASE_COMMAND_EVENT_LIST_GOLDEN in output, "Output event list does not match test golden." + assert BASE_COMMAND_TIMECODE_LIST_GOLDEN in output, "Output timecodes do not match test golden." # TODO: Check filenames. assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS @@ -106,7 +106,9 @@ def test_concatenate(tmp_path): """Test with setting -o/--output to concatenate all events to a single file.""" ouptut_file_name = "motion_events.avi" output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND + [ + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ "--output-dir", tmp_path, "--output", @@ -117,10 +119,8 @@ def test_concatenate(tmp_path): # Make sure the correct # of events were detected. assert "Detected %d motion events in input." % (BASE_COMMAND_NUM_EVENTS) in output - assert (BASE_COMMAND_EVENT_LIST_GOLDEN - in output), "Output event list does not match test golden." - assert (BASE_COMMAND_TIMECODE_LIST_GOLDEN - in output), "Output timecodes do not match test golden." + assert BASE_COMMAND_EVENT_LIST_GOLDEN in output, "Output event list does not match test golden." + assert BASE_COMMAND_TIMECODE_LIST_GOLDEN in output, "Output timecodes do not match test golden." generated_files = os.listdir(tmp_path) assert len(generated_files) == 1 assert ouptut_file_name in generated_files @@ -129,7 +129,9 @@ def test_concatenate(tmp_path): def test_scan_only(tmp_path): """Test -so/--scan-only.""" output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND + [ + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ "--output-dir", tmp_path, "--scan-only", @@ -142,16 +144,16 @@ def test_scan_only(tmp_path): # Make sure we didn't create a directory since we shouldn't write any files. assert len(os.listdir(tmp_path)) == 0, "Scan-only mode should not create any files." - assert (BASE_COMMAND_EVENT_LIST_GOLDEN - in output), "Output event list does not match test golden." - assert (BASE_COMMAND_TIMECODE_LIST_GOLDEN - in output), "Output timecodes do not match test golden." + assert BASE_COMMAND_EVENT_LIST_GOLDEN in output, "Output event list does not match test golden." + assert BASE_COMMAND_TIMECODE_LIST_GOLDEN in output, "Output timecodes do not match test golden." def test_quiet_mode(tmp_path): """Test -q/--quiet.""" output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND + [ + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ "--output-dir", tmp_path, "--scan-only", @@ -159,70 +161,100 @@ def test_quiet_mode(tmp_path): ], text=True, ) - assert (BASE_COMMAND_TIMECODE_LIST_GOLDEN - in output), "Output timecodes do not match test golden." + assert BASE_COMMAND_TIMECODE_LIST_GOLDEN in output, "Output timecodes do not match test golden." def test_mog2(tmp_path): """Test -b/--bg-subtractor MOG2 (the default).""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - ]) == 0) + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + ] + ) + == 0 + ) # Make sure the correct # of events were detected. - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." @pytest.mark.skipif(not SubtractorCNT.is_available(), reason="CNT not available") def test_cnt(tmp_path): """Test -b/--bg-subtractor CNT.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--bg-subtractor", - "cnt", - ]) == 0) - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--bg-subtractor", + "cnt", + ] + ) + == 0 + ) + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." @pytest.mark.skipif(not SubtractorCudaMOG2.is_available(), reason="MOG2_CUDA not available") def test_mog2_cuda(tmp_path): """Test -b/--bg-subtractor MOG2_CUDA.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--bg-subtractor", - "mog2_cuda", - ]) == 0) - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--bg-subtractor", + "mog2_cuda", + ] + ) + == 0 + ) + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." def test_overlays(tmp_path): """Test overlays -bb/--bounding-box, --fm/--frame-metrics, and -tc/--time-code.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--bounding-box", - "--frame-metrics", - "--time-code", - ]) == 0) - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--bounding-box", + "--frame-metrics", + "--time-code", + ] + ) + == 0 + ) + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." def test_mask_output(tmp_path): """Test mask output -mo/--mask-output.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--scan-only", - "--mask-output", - "mask.avi", - ]) == 0) + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--scan-only", + "--mask-output", + "mask.avi", + ] + ) + == 0 + ) assert os.listdir(tmp_path) == ["mask.avi"], "Only mask file should be created with -so -mo ..." @@ -234,7 +266,9 @@ def test_config_file(tmp_path): file.write(TEST_CONFIG_FILE) output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND[0:4] + [ # Only use the input from BASE_COMMAND. + args=DVR_SCAN_COMMAND + + BASE_COMMAND[0:4] + + [ # Only use the input from BASE_COMMAND. "--output-dir", tmp_path, "--config", @@ -243,43 +277,55 @@ def test_config_file(tmp_path): text=True, ) - assert (len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS + 1), "Incorrect amount of files." - assert (BASE_COMMAND_EVENT_LIST_GOLDEN - in output), "Output event list does not match test golden." - assert (BASE_COMMAND_TIMECODE_LIST_GOLDEN - in output), "Output timecodes do not match test golden." + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS + 1, "Incorrect amount of files." + assert BASE_COMMAND_EVENT_LIST_GOLDEN in output, "Output event list does not match test golden." + assert BASE_COMMAND_TIMECODE_LIST_GOLDEN in output, "Output timecodes do not match test golden." @pytest.mark.skipif(not is_ffmpeg_available(), reason="ffmpeg not available") def test_ffmpeg_mode(tmp_path): """Test -m/--mode ffmpeg.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--output-mode", - "ffmpeg", - ]) == 0) - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--output-mode", + "ffmpeg", + ] + ) + == 0 + ) + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." @pytest.mark.skipif(not is_ffmpeg_available(), reason="ffmpeg not available") def test_copy_mode(tmp_path): """Test -m/--mode copy.""" - assert (subprocess.call(args=DVR_SCAN_COMMAND + BASE_COMMAND + [ - "--output-dir", - tmp_path, - "--output-mode", - "copy", - ]) == 0) - assert (len( - os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS), "Incorrect number of events found." + assert ( + subprocess.call( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ + "--output-dir", + tmp_path, + "--output-mode", + "copy", + ] + ) + == 0 + ) + assert len(os.listdir(tmp_path)) == BASE_COMMAND_NUM_EVENTS, "Incorrect number of events found." def test_deprecated_roi(tmp_path): """Test deprecated ROI translation.""" - output = subprocess.check_output( - args=DVR_SCAN_COMMAND + BASE_COMMAND + [ + subprocess.check_output( + args=DVR_SCAN_COMMAND + + BASE_COMMAND + + [ "--output-dir", tmp_path, "--scan-only", @@ -294,6 +340,6 @@ def test_deprecated_roi(tmp_path): ) roi_path = os.path.join(tmp_path, "roi.txt") assert os.path.exists(roi_path) - with open(roi_path, "rt") as roi_file: + with open(roi_path) as roi_file: last_line_of_file = list(filter(None, roi_file.readlines()))[-1].strip() assert last_line_of_file == "10 20 20 20 20 35 10 35" diff --git a/tests/test_scan_context.py b/tests/test_scan_context.py index b18787d..6b239fb 100644 --- a/tests/test_scan_context.py +++ b/tests/test_scan_context.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -19,9 +18,9 @@ import pytest +from dvr_scan.region import Point from dvr_scan.scanner import DetectorType, MotionScanner from dvr_scan.subtractor import SubtractorCNT, SubtractorCudaMOG2 -from dvr_scan.region import Point MACHINE_ARCH = platform.machine().upper() @@ -79,9 +78,9 @@ ] -def compare_event_lists(a: ty.List[ty.Tuple[int, int]], - b: ty.List[ty.Tuple[int, int]], - tolerance: int = 0): +def compare_event_lists( + a: ty.List[ty.Tuple[int, int]], b: ty.List[ty.Tuple[int, int]], tolerance: int = 0 +): if tolerance == 0: assert a == b return @@ -90,7 +89,8 @@ def compare_event_lists(a: ty.List[ty.Tuple[int, int]], end_matches = abs(end - b[i][1]) <= tolerance assert start_matches and end_matches, ( f"Event mismatch at index {i} with tolerance {tolerance}.\n" - f"Actual = {a[i]}, Expected = {b[i]}") + f"Actual = {a[i]}, Expected = {b[i]}" + ) def test_scan_context(traffic_camera_video): @@ -161,11 +161,12 @@ def test_pre_event_shift_with_frame_skip(traffic_camera_video): # The start times should not differ from the ground truth (non-frame-skipped) by the amount # of frames that we are skipping. End times can vary more since the default value of # time_post_event is relatively large. - assert all([ - abs(x[0] - y[0]) <= frame_skip - for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_PRE_5) - ]), "Comparison failure when frame_skip = %d" % ( - frame_skip) + assert all( + [ + abs(x[0] - y[0]) <= frame_skip + for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_PRE_5) + ] + ), "Comparison failure when frame_skip = %d" % (frame_skip) def test_post_event_shift(traffic_camera_video): @@ -191,16 +192,17 @@ def test_post_event_shift_with_frame_skip(traffic_camera_video): assert len(event_list) == len(TRAFFIC_CAMERA_EVENTS_TIME_POST_40) event_list = [(event.start.frame_num, event.end.frame_num) for event in event_list] # The calculated end times should not differ by more than frame_skip from the ground truth. - assert all([ - abs(x[1] - y[1]) <= frame_skip - for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_POST_40) - ]), "Comparison failure when frame_skip = %d" % ( - frame_skip) + assert all( + [ + abs(x[1] - y[1]) <= frame_skip + for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_POST_40) + ] + ), "Comparison failure when frame_skip = %d" % (frame_skip) # The calculated end times must always be >= the ground truth's frame number, otherwise # we may be discarding frames containing motion due to skipping them. - assert all([x[1] >= y[1] for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_POST_40) - ]), "Comparison failure when frame_skip = %d" % ( - frame_skip) + assert all( + [x[1] >= y[1] for x, y in zip(event_list, TRAFFIC_CAMERA_EVENTS_TIME_POST_40)] + ), "Comparison failure when frame_skip = %d" % (frame_skip) def test_decode_corrupt_video(corrupt_video): diff --git a/tests/test_video_joiner.py b/tests/test_video_joiner.py index 9b69764..31c1e70 100644 --- a/tests/test_video_joiner.py +++ b/tests/test_video_joiner.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # DVR-Scan: Video Motion Event Detection & Extraction Tool # -------------------------------------------------------------- @@ -32,7 +31,7 @@ def test_decode_multiple(traffic_camera_video): splice_amount = 3 video = VideoJoiner([traffic_camera_video] * splice_amount) assert video.total_frames == TRAFFIC_CAMERA_VIDEO_TOTAL_FRAMES * splice_amount - while not video.read(False) is None: + while video.read(False) is not None: pass - assert (video.position.get_frames() == TRAFFIC_CAMERA_VIDEO_TOTAL_FRAMES * splice_amount) + assert video.position.get_frames() == TRAFFIC_CAMERA_VIDEO_TOTAL_FRAMES * splice_amount assert video.decode_failures == 0