-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revet some changes to video buffer calculation
Less optimized but less incorrect too
- Loading branch information
Showing
1 changed file
with
30 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,79 +1,56 @@ | ||
""" Video analysis """ | ||
from logger import log | ||
from collections import deque | ||
import numpy as np | ||
import numpy.typing as npt | ||
from av import VideoFrame | ||
from itertools import islice | ||
|
||
BUFFER_SIZE = 10 | ||
FRAME_RATE = 50 | ||
|
||
|
||
def compute_frame_difference(frame1: npt.NDArray[np.uint8], frame2: npt.NDArray[np.uint8]) -> float: | ||
"""Compute the difference between two frames.""" | ||
diff = np.subtract(frame1, frame2, dtype=np.int16) | ||
return np.sum(np.abs(diff)) | ||
def _motion(buffer: deque[npt.NDArray[np.uint8]]): | ||
""" Returns the average motion between the given frames. """ | ||
if len(buffer) < 2: | ||
log.warning("Buffer too small to calculate motion") | ||
return 0 | ||
|
||
frame_diffs = [ | ||
np.mean(abs(buffer[i].astype(np.int16) - frame).astype(np.uint8)) | ||
for i, frame in enumerate(islice(buffer, 1, None)) | ||
] | ||
|
||
def compute_frame_brightness(frame: npt.NDArray[np.uint8]) -> float: | ||
"""Compute the brightness of a frame.""" | ||
return np.sum(frame) | ||
# Return the mean of the absolute differences | ||
return np.mean(frame_diffs) / 255 | ||
|
||
|
||
class VideoBuffer(): | ||
height = None | ||
width = None | ||
def _avg_brightness(buffer: deque[npt.NDArray[np.uint8]]): | ||
""" Returns the average brightness of the given frames. """ | ||
|
||
if len(buffer) == 0: | ||
log.warning("Buffer empty, returning 0") | ||
return 0 | ||
|
||
""" A buffer for video frames. Contains analysis functions. """ | ||
return np.mean(buffer) / 255 | ||
|
||
def __init__(self): | ||
self.buffer_size = BUFFER_SIZE * FRAME_RATE | ||
self.video_buffer = np.empty( | ||
(self.buffer_size, 1, 1), dtype=np.uint8) | ||
self.index = 0 | ||
self.total_brightness = 0.0 | ||
self.total_motion = 0.0 | ||
|
||
def _initialize_buffer(self, height, width): | ||
""" Reinitialize the video buffer with a new resolution. """ | ||
self.video_buffer = np.empty( | ||
(self.buffer_size, height, width), dtype=np.uint8) | ||
self.index = 0 | ||
self.height = height | ||
self.width = width | ||
log.info( | ||
"Initialized video buffer with resolution: {}x{}".format(height, width)) | ||
class VideoBuffer(): | ||
""" A circular buffer for video frames. | ||
Contains analysis functions. """ | ||
video_buffer = deque(maxlen=BUFFER_SIZE * FRAME_RATE) | ||
|
||
def append(self, frame: VideoFrame): | ||
""" Appends a frame to the buffer. """ | ||
new_frame = frame.reformat(format='gray8').to_ndarray() | ||
frame_height, frame_width = new_frame.shape | ||
|
||
# Check if the resolution of the incoming frame matches the buffer's resolution | ||
if frame_height != self.height or frame_width != self.width: | ||
self._initialize_buffer(frame_height, frame_width) | ||
|
||
# Adjust total brightness | ||
self.total_brightness -= compute_frame_brightness( | ||
self.video_buffer[self.index]) | ||
self.total_brightness += compute_frame_brightness(new_frame) | ||
|
||
# Adjust total motion if there's a previous frame | ||
if self.index > 0: | ||
prev_index = self.index - 1 | ||
else: | ||
prev_index = self.buffer_size - 1 | ||
self.total_motion -= compute_frame_difference( | ||
self.video_buffer[prev_index], self.video_buffer[self.index]) | ||
self.total_motion += compute_frame_difference( | ||
new_frame, self.video_buffer[prev_index]) | ||
|
||
self.video_buffer[self.index] = new_frame | ||
self.index = (self.index + 1) % self.buffer_size | ||
self.video_buffer.append(frame.reformat( | ||
format='gray8').to_ndarray().astype(np.uint8)) | ||
|
||
@property | ||
def avg_brightness(self): | ||
return self.total_brightness / (self.buffer_size * self.height * self.width) | ||
""" Returns the average brightness of the frames in the buffer. """ | ||
return _avg_brightness(self.video_buffer) | ||
|
||
@property | ||
def motion(self): | ||
return self.total_motion / (self.buffer_size * self.height * self.width) | ||
""" Returns the average motion between the frames in the buffer. """ | ||
return _motion(self.video_buffer) |