Skip to content

Commit

Permalink
Added stop_event to util.Process (#14142)
Browse files Browse the repository at this point in the history
* Added stop_event to util.Process

util.Process will take care of receiving signals when the stop_event is
accessed in the subclass. If it never is, SystemExit is raised instead.

This has the effect of still behaving like multiprocessing.Process when
stop_event is not accessed, while still allowing subclasses to not deal
with the hassle of setting it up.

* Give each util.Process their own logger

This will help to reduce boilerplate in subclasses.

* Give explicit types to util.Process.__init__

This gives better type hinting in the editor.

* Use util.Process facilities in AudioProcessor

Boilerplate begone!

* Removed pointless check in util.Process

The log_listener.queue should never be None, unless something has gone
extremely wrong in the log setup code. If we're that far gone, crashing
is better.

* Make sure faulthandler is enabled in all processes

This has no effect currently since we're using the fork start_method.
However, when we inevidably switch to forkserver (either by choice, or
by upgrading to python 3.14+) not having this makes for some really fun
failure modes :D
  • Loading branch information
gtsiam authored Oct 3, 2024
1 parent e725730 commit a468ed3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 37 deletions.
56 changes: 25 additions & 31 deletions frigate/events/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import datetime
import logging
import signal
import sys
import threading
import time
from typing import Tuple
Expand Down Expand Up @@ -73,46 +71,42 @@ def __init__(
):
super().__init__(name="frigate.audio_manager", daemon=True)

self.logger = logging.getLogger(self.name)
self.camera_metrics = camera_metrics
self.cameras = cameras

def run(self) -> None:
stop_event = threading.Event()
audio_threads: list[AudioEventMaintainer] = []

threading.current_thread().name = "process:audio_manager"
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit())

if len(self.cameras) == 0:
return

try:
for camera in self.cameras:
audio_thread = AudioEventMaintainer(
camera,
self.camera_metrics,
stop_event,
)
audio_threads.append(audio_thread)
audio_thread.start()

self.logger.info(f"Audio processor started (pid: {self.pid})")

while True:
signal.pause()
finally:
stop_event.set()
for thread in audio_threads:
thread.join(1)
if thread.is_alive():
self.logger.info(f"Waiting for thread {thread.name:s} to exit")
thread.join(10)

for thread in audio_threads:
if thread.is_alive():
self.logger.warning(f"Thread {thread.name} is still alive")
self.logger.info("Exiting audio processor")
for camera in self.cameras:
audio_thread = AudioEventMaintainer(
camera,
self.camera_metrics,
self.stop_event,
)
audio_threads.append(audio_thread)
audio_thread.start()

self.logger.info(f"Audio processor started (pid: {self.pid})")

while not self.stop_event.wait():
pass

for thread in audio_threads:
thread.join(1)
if thread.is_alive():
self.logger.info(f"Waiting for thread {thread.name:s} to exit")
thread.join(10)

for thread in audio_threads:
if thread.is_alive():
self.logger.warning(f"Thread {thread.name} is still alive")

self.logger.info("Exiting audio processor")


class AudioEventMaintainer(threading.Thread):
Expand Down
52 changes: 46 additions & 6 deletions frigate/util/process.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import faulthandler
import logging
import multiprocessing as mp
import signal
import sys
import threading
from functools import wraps
from logging.handlers import QueueHandler
from typing import Any
from typing import Any, Callable, Optional

import frigate.log


class BaseProcess(mp.Process):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __init__(
self,
*,
name: Optional[str] = None,
target: Optional[Callable] = None,
args: tuple = (),
kwargs: dict = {},
daemon: Optional[bool] = None,
):
super().__init__(
name=name, target=target, args=args, kwargs=kwargs, daemon=daemon
)

def start(self, *args, **kwargs):
self.before_start()
Expand Down Expand Up @@ -46,10 +60,36 @@ def after_run(self) -> None:


class Process(BaseProcess):
logger: logging.Logger

@property
def stop_event(self) -> threading.Event:
# Lazily create the stop_event. This allows the signal handler to tell if anyone is
# monitoring the stop event, and to raise a SystemExit if not.
if "stop_event" not in self.__dict__:
self.__dict__["stop_event"] = threading.Event()
return self.__dict__["stop_event"]

def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue

def before_run(self) -> None:
if self.__log_queue:
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))
faulthandler.enable()

def receiveSignal(signalNumber, frame):
# Get the stop_event through the dict to bypass lazy initialization.
stop_event = self.__dict__.get("stop_event")
if stop_event is not None:
# Someone is monitoring stop_event. We should set it.
stop_event.set()
else:
# Nobody is monitoring stop_event. We should raise SystemExit.
sys.exit()

signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)

self.logger = logging.getLogger(self.name)

logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))

0 comments on commit a468ed3

Please sign in to comment.