Skip to content

Commit

Permalink
[app] Integrate scan pipeline and config loading
Browse files Browse the repository at this point in the history
Not all options are implemented yet.
  • Loading branch information
Breakthrough committed Jan 4, 2025
1 parent 8d4f269 commit 49c8e39
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 199 deletions.
88 changes: 66 additions & 22 deletions dvr_scan/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import argparse
import logging
import sys
import typing as ty

import dvr_scan
from dvr_scan import get_license_info
from dvr_scan.app.application import Application
from dvr_scan.config import CHOICE_MAP
from dvr_scan.platform import init_logger
from dvr_scan.config import CHOICE_MAP, USER_CONFIG_FILE_PATH, ConfigLoadFailure, ConfigRegistry
from dvr_scan.shared import ScanSettings, init_logging
from dvr_scan.shared.cli import VERSION_STRING, LicenseAction, VersionAction, string_type_check

logger = logging.getLogger("dvr_scan")


EXIT_SUCCESS: int = 0
EXIT_ERROR: int = 1

Expand All @@ -32,6 +35,16 @@ def get_cli_parser():
if hasattr(parser, "_optionals"):
parser._optionals.title = "arguments"

parser.add_argument(
"-c",
"--config",
metavar="settings.cfg",
type=str,
help=(
"Path to config file. If not set, tries to load one from %s" % (USER_CONFIG_FILE_PATH)
),
)

parser.add_argument(
"-V",
"--version",
Expand Down Expand Up @@ -70,26 +83,57 @@ def get_cli_parser():
return parser


def _init_logging(args: ty.Optional[argparse.ArgumentParser]):
verbosity = logging.INFO
if args is not None and hasattr(args, "verbosity"):
verbosity = getattr(logging, args.verbosity.upper())

quiet_mode = False
if args is not None and hasattr(args, "quiet_mode"):
quiet_mode = args.quiet_mode

init_logger(
log_level=verbosity,
show_stdout=not quiet_mode,
log_file=args.logfile if hasattr(args, "logfile") else None,
)


def main():
args = get_cli_parser().parse_args()
_init_logging(args)
app = Application()
"""Parse command line options and load config file settings."""

init_log = []
config_load_error = None
failed_to_load_config = False
config = ConfigRegistry()
# Try to load config from user settings folder.
try:
user_config = ConfigRegistry()
user_config.load()
config = user_config
except ConfigLoadFailure as ex:
config_load_error = ex

# Parse CLI args, override config if an override was specified on the command line.
try:
args = get_cli_parser().parse_args()
init_logging(args, config)
init_log += [(logging.INFO, "DVR-Scan Application %s" % dvr_scan.__version__)]
if config_load_error and not hasattr(args, "config"):
raise config_load_error
if hasattr(args, "config"):
config_setting = ConfigRegistry()
config_setting.load(args.config)
init_logging(args, config_setting)
config = config_setting
init_log += config.consume_init_log()
except ConfigLoadFailure as ex:
init_log += ex.init_log
if ex.reason is not None:
init_log += [(logging.ERROR, "Error: %s" % str(ex.reason).replace("\t", " "))]
failed_to_load_config = True
config_load_error = ex
finally:
for log_level, log_str in init_log:
logger.log(log_level, log_str)
if failed_to_load_config:
logger.critical("Failed to load config file.")
logger.debug("Error loading config file:", exc_info=config_load_error)
# Intentionally suppress the exception in release mode since we've already logged the
# failure reason to the user above. We can now exit with an error code.
return None # noqa: B012

if config.config_dict:
logger.debug("Loaded configuration:\n%s", str(config.config_dict))

logger.debug("Program arguments:\n%s", str(args))
settings = ScanSettings(args=args, config=config)

app = Application(settings=settings)
app.run()
sys.exit(EXIT_SUCCESS)

Expand Down
91 changes: 78 additions & 13 deletions dvr_scan/app/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# LICENSE file, or visit one of the above pages for details.
#

import copy
import tkinter as tk
import tkinter.messagebox
import tkinter.scrolledtext
Expand All @@ -22,6 +23,7 @@
from dvr_scan.app.common import register_icon
from dvr_scan.app.scan_window import ScanWindow
from dvr_scan.config import CONFIG_MAP
from dvr_scan.shared import ScanSettings

WINDOW_TITLE = "DVR-Scan"

Expand Down Expand Up @@ -51,6 +53,7 @@

SETTING_INPUT_WIDTH = 12
PATH_INPUT_WIDTH = 32
MAX_KERNEL_SIZE = 21


class InputArea:
Expand Down Expand Up @@ -169,7 +172,7 @@ def _on_use_regions(self):
class SettingsArea:
# TODO: make this less busy by making it a notebook widget that can also include the
# output settings. Can also have an additional tab to load/save the various settings.
def __init__(self, root: tk.Widget):
def __init__(self, root: tk.Widget, settings: ScanSettings):
self._root = root

root.rowconfigure(0, pad=PADDING, weight=1)
Expand All @@ -195,11 +198,21 @@ def __init__(self, root: tk.Widget):
combo.grid(row=0, column=1, sticky=sticky)

tk.Label(root, text="Kernel Size").grid(row=1, column=0, sticky=sticky)
self._kernel_size = tk.StringVar()
combo = ttk.Combobox(root, textvariable=self._kernel_size, width=SETTING_INPUT_WIDTH)
combo["values"] = ("Auto", "Off", "3x3", "5x5", "7x7", "9x9")
combo.state(["readonly"]) # TODO: Custom kernel sizes.
combo.grid(row=1, column=1, sticky=sticky)

self._kernel_size = ttk.Combobox(root, width=SETTING_INPUT_WIDTH, state="readonly")
# 0: Auto
# 1: Off
# 2: 3x3
# 3: 5x5
# 4: 7x7
# 5: 9x9...
self._kernel_size["values"] = (
"Off",
"Auto",
*tuple(f"{n}x{n}" for n in range(3, MAX_KERNEL_SIZE + 1, 2)),
)
self._kernel_size.grid(row=1, column=1, sticky=sticky)
self._kernel_size.current(1)

tk.Label(root, text="Threshold").grid(row=2, column=0, sticky=sticky)
self._threshold = tk.StringVar()
Expand Down Expand Up @@ -260,11 +273,38 @@ def __init__(self, root: tk.Widget):

self._update_default_state()

@property
def use_default(self) -> bool:
return self._default.get()

def _update_default_state(self):
use_default = self._default.get()
for child in self._root.winfo_children():
child.configure(state=tk.DISABLED if use_default else tk.NORMAL)
self._default_button["state"] = tk.NORMAL
self._kernel_size["state"] = tk.DISABLED if use_default else "readonly"

@property
def kernel_size(self) -> int:
index = self._kernel_size.current()
if index == 0:
return 0
elif index == 1:
return -1
else:
assert index > 0
return (index * 2) - 1

@kernel_size.setter
def kernel_size(self, size):
# TODO: Handle this discrepency properly, we're clipping the user config right now.
if size > MAX_KERNEL_SIZE:
logger.warning("Kernel sizes above 21 are not supported yet, clipping to 21.")
kernel_size = min(size, MAX_KERNEL_SIZE)
auto_kernel = bool(kernel_size < 0)
none_kernel = bool(kernel_size == 0)
index = 0 if none_kernel else 1 if auto_kernel else (1 + (kernel_size // 2))
self._kernel_size.current(index)


class OutputArea:
Expand Down Expand Up @@ -359,9 +399,10 @@ def enable(self):


class Application:
def __init__(self):
def __init__(self, settings: ScanSettings):
self._root = tk.Tk()
self._root.withdraw()
self._settings: ScanSettings = None

self._root.option_add("*tearOff", False)
self._root.title(WINDOW_TITLE)
Expand All @@ -381,34 +422,58 @@ def __init__(self):
input_frame.grid(row=0, sticky=tk.NSEW, padx=PADDING, pady=(PADDING, 0))

settings_frame = ttk.Labelframe(self._root, text="Motion", padding=PADDING)
self._settings = SettingsArea(settings_frame)
self._settings_area = SettingsArea(settings_frame, settings)
settings_frame.grid(row=1, sticky=tk.EW, padx=PADDING, pady=(PADDING, 0))

output_frame = ttk.Labelframe(self._root, text="Output", padding=PADDING)
self._output = OutputArea(output_frame)
self._output_area = OutputArea(output_frame)
output_frame.grid(row=2, sticky=tk.EW, padx=PADDING, pady=(PADDING, 0))

scan_frame = ttk.Labelframe(self._root, text="Scan", padding=PADDING)
self._scan = ScanArea(self._root, scan_frame)
self._scan_area = ScanArea(self._root, scan_frame)
scan_frame.grid(row=3, sticky=tk.EW, padx=PADDING, pady=PADDING)

self._scan_window: ty.Optional[ScanWindow] = None
self._root.bind("<<StartScan>>", lambda _: self._start_new_scan())
self._root.protocol("WM_DELETE_WINDOW", self._on_delete)

self._set_from(settings)

def _set_from(self, settings: ScanSettings):
"""Initialize UI from config file."""
logger.debug("initializing UI state from settings")
self._settings = settings

# Scan Area
self._settings_area.kernel_size = self._settings.get("kernel-size")

def get_scan_settings(self) -> ScanSettings:
"""Get current UI state as a new ScanSettings."""
settings = copy.deepcopy(self._settings)

# Scan Area
if not self._settings_area.use_default:
settings.set("kernel-size", self._settings_area.kernel_size)

# HACK: Prevent output files.
settings.set("output-mode", "scan_only")
return settings

def _start_new_scan(self):
assert self._scan_window is None

settings = self.get_scan_settings()

def on_scan_window_close():
logger.debug("scan window closed, removing window and restoring focus")
self._scan_window = None
self._scan.enable()
self._scan_area.enable()
self._root.deiconify()
self._root.grab_set()
self._root.focus()

self._scan.disable()
self._scan_window = ScanWindow(self._root, on_scan_window_close)
self._scan_window = ScanWindow(self._root, settings, on_scan_window_close)
self._scan_area.disable()
self._root.grab_release()
self._scan_window.show()

Expand Down
Loading

0 comments on commit 49c8e39

Please sign in to comment.