Skip to content

Commit

Permalink
Merge pull request #1 from Frikanalen/optimizations
Browse files Browse the repository at this point in the history
Optimizations
  • Loading branch information
toresbe authored Aug 10, 2023
2 parents 8f00bb4 + 3d40e86 commit 220dccc
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 62 deletions.
125 changes: 125 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import time
from dataclasses import dataclass
from typing import List

@dataclass
class DurationSample:
begin : float
end : float

def trace(self, key):
return '{"name":"%s","ph":"X","ts":%f,"dur":%f,"tid":1,"pid":1}\n' % (key, self.begin * 1000 * 1000, (self.end-self.begin) * 1000 * 1000)

class DurationSeries:
def __init__(self, key:str, time_beginning:float):
self.key:str = key
self.time_beginning = time_beginning
self.samples:List(DurationSample) = []
self.sample_capacitiy:int = 0
self.sample_count:int = 0
self.sampling:bool = False

def add_capacity(self):
for n in range(100):
self.samples.append(DurationSample(0, 0))
self.sample_capacitiy += 100

def begin(self):
if (self.sample_count >= self.sample_capacitiy):
self.add_capacity()
self.samples[self.sample_count].begin = time.time() - self.time_beginning
self.sampling = True

def end(self):
assert self.sampling
self.samples[self.sample_count].end = time.time() - self.time_beginning
self.sample_count += 1
self.sampling = False

@dataclass
class MetricSample:
time : float
value : float
id : str

def trace(self, key):
return '{"name":"%s","ts":%f,"ph":"C","pid":1,"args": {"%s":%f}}\n' % (key, self.time * 1000 * 1000, self.id, self.value)

class MetricSeries:
def __init__(self, key:str, time_beginning:float):
self.key:str = key
self.time_beginning = time_beginning
self.samples:List(MetricSample) = []
self.sample_capacitiy:int = 0
self.sample_count:int = 0

def add_capacity(self):
for n in range(100):
self.samples.append(MetricSample(0, 0, ""))
self.sample_capacitiy += 100

def add(self, _id:str, value:float|int):
if (self.sample_count >= self.sample_capacitiy):
self.add_capacity()
self.samples[self.sample_count].time = time.time() - self.time_beginning
self.samples[self.sample_count].value = value
self.samples[self.sample_count].id = _id
self.sample_count += 1

@dataclass
class DummySample:
def end(self):
pass

class Benchmarker:
"Simple benchmarker that generates a trace json file"
def __init__(self, run:bool):
"Run: Whether to generate a trace or not"
self.run = run
self.time_beginning = time.time()
self.timings = {}
self.metrics = {}

def sample_begin(self, key:str):
if not self.run:
return DummySample()
sampler = self.timings.setdefault(key, DurationSeries(key, self.time_beginning))
sampler.begin()
return sampler

def sample_end(self, key:str):
if not self.run:
return
assert key in self.timings
sampler = self.timings[key]
sampler.end(key)

def add_metric_sample(self, key:str, value_id:str, value:float|int):
if not self.run:
return DummySample()
sampler = self.metrics.setdefault(key, MetricSeries(key, self.time_beginning))
sampler.add(value_id, value)
return sampler

def report(self, stem:str):
# Chrome trace format
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview
if not self.run:
return
fn = f"{stem}.json"
with open(fn, "w") as f:
# Manually write out as JSON, as we want it fairly compact
f.write("{\n")
f.write(' "displayTimeUnit": "ms",\n')
f.write(' "traceEvents": [\n')
for key, series in self.timings.items():
for n in range(series.sample_count):
sample = series.samples[n]
f.write(sample.trace(key))
for key, series in self.metrics.items():
for n in range(series.sample_count):
sample = series.samples[n]
f.write(sample.trace(key))
f.write("]\n")
f.write("}\n")
pass
112 changes: 83 additions & 29 deletions ts_probe.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,126 @@
""" Transport stream probe utility """
import os
import time
from collections import deque
import av
from av import VideoFrame, AudioFrame
from audiobuffer import AudioBuffer
from videobuffer import VideoBuffer
from prometheus import Prometheus
from logger import log
from benchmark import Benchmarker


prom = Prometheus()
DEFAULT_VIDEO_URL = 'http://simula.frikanalen.no:9094/frikanalen.ts'
PROMETHEUS_PORT = 8000
# Generates a trace-ts-probe.json that can be opened in about:tracing or https://ui.perfetto.dev/
BENCHMARK = False
# Use threading.
DO_THREADING = False
# Interval between analysis
VIDEO_PROBE_INTERVAL = 5
AUDIO_PROBE_INTERVAL = 5

# Benchmark tuning
BENCHMARK_STOP_FRAME = -1
if BENCHMARK:
BENCHMARK_STOP_FRAME = 100
if BENCHMARK:
VIDEO_PROBE_INTERVAL = 1
AUDIO_PROBE_INTERVAL = 1

# MPEG-2 transport stream URL
url = os.environ.get('VIDEO_URL', None)

if url is None:
url = DEFAULT_VIDEO_URL
log.warning("No video URL specified, using default: %s" % url)

# Create buffers
audio_buffer = AudioBuffer()
video_buffer = VideoBuffer()
START_TIME = time.time()


# Start Prometheus HTTP server
prom = Prometheus()
prom.listen(PROMETHEUS_PORT)


def run():
VIDEO_PROBE_INTERVAL = 10
AUDIO_PROBE_INTERVAL = 10
bench = Benchmarker(BENCHMARK)
smp = bench.sample_begin("Warmup")
log.info("Opening stream: %s" % url)
stream = av.open(url)
if DO_THREADING:
stream.streams.video[0].thread_type = "AUTO"
log.info("Stream is open")
smp.end()
# Bookkeeping
local_video_frame_count = 0 # Used by benchmark to track frame
video_interval_counter = 0 # Used to track intervals
audio_interval_counter = 0
# Start the clock (for benchmarking)
START_TIME = time.time()
while True:
log.info("Stream is open")
frame_smp = bench.sample_begin("Frame")
frame_decode_smp = bench.sample_begin("Frame-Decode")
frame_iter = iter(stream.decode())
try:
for frame in stream.decode():
if isinstance(frame, VideoFrame):
prom.video_frame_count.inc()
frame = next(frame_iter)
frame_decode_smp.end()
if isinstance(frame, VideoFrame):
prom.video_frame_count.inc()
local_video_frame_count += 1

VIDEO_PROBE_INTERVAL -= 1
if VIDEO_PROBE_INTERVAL == 0:
video_buffer.append(frame)
prom.video_brightness_gauge.set(
video_buffer.avg_brightness)
prom.motion_gauge.set(
video_buffer.motion)
VIDEO_PROBE_INTERVAL = 20
video_interval_counter += 1
if VIDEO_PROBE_INTERVAL == video_interval_counter:
#
video_analysis_smp = bench.sample_begin("Video-Analysis")
# Add frame
video_buffer.append(frame)
avg_brightness = video_buffer.avg_brightness
motion = video_buffer.motion
prom.video_brightness_gauge.set(avg_brightness)
prom.motion_gauge.set(motion)
video_interval_counter = 0
video_analysis_smp.end()
if BENCHMARK:
motion *= 1000
avg_brightness *= 1000
print(f"Motion: {motion:12.3f} Brightness: {avg_brightness:12.3f}")
bench.add_metric_sample("video", "motion", motion)
bench.add_metric_sample("video", "brightness", avg_brightness)

elif isinstance(frame, AudioFrame):
audio_buffer.append(frame)
AUDIO_PROBE_INTERVAL -= 1

if AUDIO_PROBE_INTERVAL == 0:
prom.audio_amplitude_lufs_gauge.set(
audio_buffer.lufs())
prom.audio_amplitude_dbfs_gauge.set(
audio_buffer.dbfs(0))
AUDIO_PROBE_INTERVAL = 20
elif isinstance(frame, AudioFrame):
audio_buffer.append(frame)
audio_interval_counter += 1
if AUDIO_PROBE_INTERVAL == audio_interval_counter:
audio_analysis_smp = bench.sample_begin("Audio-Analysis")
lufs = audio_buffer.lufs()
dbfs = audio_buffer.dbfs(0)
prom.audio_amplitude_lufs_gauge.set(lufs)
prom.audio_amplitude_dbfs_gauge.set(dbfs)
audio_interval_counter = 0
audio_analysis_smp.end()
bench.add_metric_sample("audio", "lufs", lufs)
bench.add_metric_sample("audio", "dbfs", dbfs)

if BENCHMARK:
if local_video_frame_count == BENCHMARK_STOP_FRAME:
dt = time.time() - START_TIME
avg = dt / local_video_frame_count * 1000;
print(f"Average: {avg:.3}ms")
break
except av.AVError as e:
frame_decode_smp.end()
log.error(e)
prom.decode_error_count.inc()
reopen_smp = bench.sample_begin("Re-open")
stream = av.open(url)
reopen_smp.end()

except KeyboardInterrupt:
log.info("Keyboard interrupt")
break

frame_smp.end()
bench.report("trace-ts-probe")

run()
75 changes: 42 additions & 33 deletions videobuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,60 @@
from av import VideoFrame
from itertools import islice

BUFFER_SIZE = 10
FRAME_RATE = 50


def _motion(buffer: deque[npt.NDArray[np.uint8]]):
""" Returns the average motion between the given frames. """
if len(buffer) < 2:
log.warning("Buffer too small to calculate motion")
return 0

frame_diffs = [
np.mean(abs(buffer[i].astype(np.int16) - frame).astype(np.uint8))
for i, frame in enumerate(islice(buffer, 1, None))
]

# Return the mean of the absolute differences
return np.mean(frame_diffs) / 255


def _avg_brightness(buffer: deque[npt.NDArray[np.uint8]]):
""" Returns the average brightness of the given frames. """

if len(buffer) == 0:
log.warning("Buffer empty, returning 0")
return 0

return np.mean(buffer) / 255
BUFFER_SIZE = 25
REDUCE_BEFORE_MOTION = True
BLUR_BEFORE_MOTION = False
BLUR_KERNEL = np.array([0.20, 0.20, 0.20, 0.20, 0.20])

class ProbeFrame:
" A frame including metrics of the videoframe"
def __init__(self, frame: VideoFrame, last_frame: "ProbeFrame"):
self.frame:npt.NDArray[np.fl] = frame.reformat(format='gray8').to_ndarray().astype(np.uint8)
# Calculate brightness
self.brightness = np.mean(self.frame)
# Resize
if REDUCE_BEFORE_MOTION:
# Non-interpolating reduction. This is to reduce the chance of triggering on interlaced stalls
# If one wants to reduce further, I recommend proper resampling
self.frame = self.frame[::2, ::2].astype("float32") / 255
# Box-blur
if BLUR_BEFORE_MOTION:
self.frame = np.apply_along_axis(lambda x: np.convolve(x, BLUR_KERNEL, mode='same'), 0, self.frame)
self.frame = np.apply_along_axis(lambda x: np.convolve(x, BLUR_KERNEL, mode='same'), 1, self.frame)
# Calculate motion
if last_frame:
self.motion = np.mean(np.power(np.abs(self.frame - last_frame.frame), 0.5))
self.has_motion:bool = last_frame is not None

class VideoBuffer():
""" A circular buffer for video frames.
""" A circular buffer for video frames.
Contains analysis functions. """
video_buffer = deque(maxlen=BUFFER_SIZE * FRAME_RATE)
video_buffer = deque(maxlen=BUFFER_SIZE)
last_frame = None

def append(self, frame: VideoFrame):
""" Appends a frame to the buffer. """
self.video_buffer.append(frame.reformat(
format='gray8').to_ndarray().astype(np.uint8))
pb = ProbeFrame(frame, self.last_frame)
self.video_buffer.append(pb)
self.last_frame = pb

@property
def avg_brightness(self):
""" Returns the average brightness of the frames in the buffer. """
return _avg_brightness(self.video_buffer)
#return _avg_brightness(self.video_buffer)
l = [f.brightness for f in self.video_buffer]
nda = np.array(l, dtype="float")
if len(nda) == 0:
return 0
return np.mean(nda) / 255

@property
def motion(self):
""" Returns the average motion between the frames in the buffer. """
return _motion(self.video_buffer)
#return _motion(self.video_buffer)
l = [f.motion for f in self.video_buffer if f.has_motion]
nda = np.array(l, dtype="float")
if len(nda) == 0:
return 0
return np.mean(nda)

0 comments on commit 220dccc

Please sign in to comment.