Skip to content

Commit

Permalink
Custom classes for Process and Metrics (#13950)
Browse files Browse the repository at this point in the history
* Subclass Process for audio_process

* Introduce custom mp.Process subclass

In preparation to switch the multiprocessing startup method away from
"fork", we cannot rely on os.fork cloning the log state at fork time.
Instead, we have to set up logging before we run the business logic of
each process.

* Make camera_metrics into a class

* Make ptz_metrics into a class

* Fixed PtzMotionEstimator.ptz_metrics type annotation

* Removed pointless variables

* Do not start audio processor when no audio cameras are configured
  • Loading branch information
gtsiam authored Sep 27, 2024
1 parent 1f328be commit c0bd3b3
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 448 deletions.
3 changes: 2 additions & 1 deletion benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np

import frigate.util as util
from frigate.config import DetectorTypeEnum
from frigate.object_detection import (
ObjectDetectProcess,
Expand Down Expand Up @@ -90,7 +91,7 @@ def start(id, num_detections, detection_queue, event):
)

for x in range(0, 10):
camera_process = mp.Process(
camera_process = util.Process(
target=start, args=(x, 300, detection_queue, events[str(x)])
)
camera_process.daemon = True
Expand Down
16 changes: 3 additions & 13 deletions frigate/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import faulthandler
import logging
import signal
import sys
import threading
Expand All @@ -9,29 +8,20 @@

from frigate.app import FrigateApp
from frigate.config import FrigateConfig
from frigate.log import log_thread
from frigate.log import setup_logging


def main() -> None:
faulthandler.enable()

# Clear all existing handlers.
logging.basicConfig(
level=logging.INFO,
handlers=[],
force=True,
)
# Setup the logging thread
setup_logging()

threading.current_thread().name = "frigate"

# Make sure we exit cleanly on SIGTERM.
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit())

run()


@log_thread()
def run() -> None:
# Parse the cli arguments.
parser = argparse.ArgumentParser(
prog="Frigate",
Expand Down
132 changes: 40 additions & 92 deletions frigate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
import shutil
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from typing import Any, Optional

import psutil
import uvicorn
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase

import frigate.util as util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.event_metadata_updater import (
Expand All @@ -37,7 +39,7 @@
RECORD_DIR,
)
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import listen_to_audio
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
Expand Down Expand Up @@ -65,7 +67,6 @@
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, PTZMetricsTypes
from frigate.util.builtin import empty_and_close_queue
from frigate.util.object import get_camera_regions_grid
from frigate.version import VERSION
Expand All @@ -76,6 +77,8 @@


class FrigateApp:
audio_process: Optional[mp.Process] = None

# TODO: Fix FrigateConfig usage, so we can properly annotate it here without mypy erroring out.
def __init__(self, config: Any) -> None:
self.stop_event: MpEvent = mp.Event()
Expand All @@ -84,8 +87,8 @@ def __init__(self, config: Any) -> None:
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.ptz_metrics: dict[str, PTZMetricsTypes] = {}
self.camera_metrics: dict[str, CameraMetrics] = {}
self.ptz_metrics: dict[str, PTZMetrics] = {}
self.processes: dict[str, int] = {}
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.config = config
Expand All @@ -106,64 +109,14 @@ def ensure_dirs(self) -> None:
logger.debug(f"Skipping directory: {d}")

def init_camera_metrics(self) -> None:
# create camera_metrics
for camera_name in self.config.cameras.keys():
# create camera_metrics
self.camera_metrics[camera_name] = {
"camera_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"skipped_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"process_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item]
"detection_fps": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"detection_frame": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"read_start": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"frame_queue": mp.Queue(maxsize=2),
"capture_process": None,
"process": None,
"audio_rms": mp.Value("d", 0.0), # type: ignore[typeddict-item]
"audio_dBFS": mp.Value("d", 0.0), # type: ignore[typeddict-item]
}
self.ptz_metrics[camera_name] = {
"ptz_autotracker_enabled": mp.Value( # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"i",
self.config.cameras[camera_name].onvif.autotracking.enabled,
),
"ptz_tracking_active": mp.Event(),
"ptz_motor_stopped": mp.Event(),
"ptz_reset": mp.Event(),
"ptz_start_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_stop_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_frame_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_zoom_level": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_max_zoom": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_min_zoom": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
}
self.ptz_metrics[camera_name]["ptz_motor_stopped"].set()
self.camera_metrics[camera_name] = CameraMetrics()
self.ptz_metrics[camera_name] = PTZMetrics(
autotracker_enabled=self.config.cameras[
camera_name
].onvif.autotracking.enabled
)

def init_queues(self) -> None:
# Queue for cameras to push tracked objects to
Expand Down Expand Up @@ -251,7 +204,7 @@ def init_go2rtc(self) -> None:
self.processes["go2rtc"] = proc.info["pid"]

def init_recording_manager(self) -> None:
recording_process = mp.Process(
recording_process = util.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config,),
Expand All @@ -263,7 +216,7 @@ def init_recording_manager(self) -> None:
logger.info(f"Recording process started: {recording_process.pid}")

def init_review_segment_manager(self) -> None:
review_segment_process = mp.Process(
review_segment_process = util.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
Expand All @@ -281,7 +234,7 @@ def init_embeddings_manager(self) -> None:

# Create a client for other processes to use
self.embeddings = EmbeddingsContext()
embedding_process = mp.Process(
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(self.config,),
Expand Down Expand Up @@ -422,7 +375,7 @@ def start_detected_frames_processor(self) -> None:
self.detected_frames_processor.start()

def start_video_output_processor(self) -> None:
output_processor = mp.Process(
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
Expand Down Expand Up @@ -451,7 +404,7 @@ def start_camera_processors(self) -> None:
logger.info(f"Camera processor not started for disabled camera {name}")
continue

camera_process = mp.Process(
camera_process = util.Process(
target=track_camera,
name=f"camera_processor:{name}",
args=(
Expand All @@ -466,9 +419,9 @@ def start_camera_processors(self) -> None:
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
camera_process.daemon = True
self.camera_metrics[name]["process"] = camera_process
self.camera_metrics[name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {name}: {camera_process.pid}")

Expand All @@ -478,31 +431,27 @@ def start_camera_capture_processes(self) -> None:
logger.info(f"Capture process not started for disabled camera {name}")
continue

capture_process = mp.Process(
capture_process = util.Process(
target=capture_camera,
name=f"camera_capture:{name}",
args=(name, config, self.shm_frame_count(), self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name]["capture_process"] = capture_process
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")

def start_audio_processors(self) -> None:
self.audio_process = None
if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0:
self.audio_process = mp.Process(
target=listen_to_audio,
name="audio_capture",
args=(
self.config,
self.camera_metrics,
),
)
self.audio_process.daemon = True
def start_audio_processor(self) -> None:
audio_cameras = [
c
for c in self.config.cameras.values()
if c.enabled and c.audio.enabled_in_config
]

if audio_cameras:
self.audio_process = AudioProcessor(audio_cameras, self.camera_metrics)
self.audio_process.start()
self.processes["audio_detector"] = self.audio_process.pid or 0
logger.info(f"Audio process started: {self.audio_process.pid}")

def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor(
Expand Down Expand Up @@ -640,7 +589,7 @@ def start(self) -> None:
self.start_detected_frames_processor()
self.start_camera_processors()
self.start_camera_capture_processes()
self.start_audio_processors()
self.start_audio_processor()
self.start_storage_maintainer()
self.init_external_event_processor()
self.start_stats_emitter()
Expand Down Expand Up @@ -686,28 +635,27 @@ def stop(self) -> None:
).execute()

# stop the audio process
if self.audio_process is not None:
if self.audio_process:
self.audio_process.terminate()
self.audio_process.join()

# ensure the capture processes are done
for camera in self.camera_metrics.keys():
capture_process = self.camera_metrics[camera]["capture_process"]
for camera, metrics in self.camera_metrics.items():
capture_process = metrics.capture_process
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join()

# ensure the camera processors are done
for camera in self.camera_metrics.keys():
camera_process = self.camera_metrics[camera]["process"]
for camera, metrics in self.camera_metrics.items():
camera_process = metrics.process
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
frame_queue = self.camera_metrics[camera]["frame_queue"]
empty_and_close_queue(frame_queue)
empty_and_close_queue(metrics.frame_queue)

# ensure the detectors are done
for detector in self.detectors.values():
Expand Down
68 changes: 68 additions & 0 deletions frigate/camera/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import multiprocessing as mp
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
from typing import Optional


class CameraMetrics:
camera_fps: Synchronized
detection_fps: Synchronized
detection_frame: Synchronized
process_fps: Synchronized
skipped_fps: Synchronized
read_start: Synchronized
audio_rms: Synchronized
audio_dBFS: Synchronized

frame_queue: mp.Queue

process: Optional[mp.Process]
capture_process: Optional[mp.Process]
ffmpeg_pid: Synchronized

def __init__(self):
self.camera_fps = mp.Value("d", 0)
self.detection_fps = mp.Value("d", 0)
self.detection_frame = mp.Value("d", 0)
self.process_fps = mp.Value("d", 0)
self.skipped_fps = mp.Value("d", 0)
self.read_start = mp.Value("d", 0)
self.audio_rms = mp.Value("d", 0)
self.audio_dBFS = mp.Value("d", 0)

self.frame_queue = mp.Queue(maxsize=2)

self.process = None
self.capture_process = None
self.ffmpeg_pid = mp.Value("i", 0)


class PTZMetrics:
autotracker_enabled: Synchronized

start_time: Synchronized
stop_time: Synchronized
frame_time: Synchronized
zoom_level: Synchronized
max_zoom: Synchronized
min_zoom: Synchronized

tracking_active: Event
motor_stopped: Event
reset: Event

def __init__(self, *, autotracker_enabled: bool):
self.autotracker_enabled = mp.Value("i", autotracker_enabled)

self.start_time = mp.Value("d", 0)
self.stop_time = mp.Value("d", 0)
self.frame_time = mp.Value("d", 0)
self.zoom_level = mp.Value("d", 0)
self.max_zoom = mp.Value("d", 0)
self.min_zoom = mp.Value("d", 0)

self.tracking_active = mp.Event()
self.motor_stopped = mp.Event()
self.reset = mp.Event()

self.motor_stopped.set()
Loading

0 comments on commit c0bd3b3

Please sign in to comment.