diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..3e7d558 --- /dev/null +++ b/benchmark.py @@ -0,0 +1,125 @@ +import time +from dataclasses import dataclass +from typing import List + +@dataclass +class DurationSample: + begin : float + end : float + + def trace(self, key): + return '{"name":"%s","ph":"X","ts":%f,"dur":%f,"tid":1,"pid":1}\n' % (key, self.begin * 1000 * 1000, (self.end-self.begin) * 1000 * 1000) + +class DurationSeries: + def __init__(self, key:str, time_beginning:float): + self.key:str = key + self.time_beginning = time_beginning + self.samples:List(DurationSample) = [] + self.sample_capacitiy:int = 0 + self.sample_count:int = 0 + self.sampling:bool = False + + def add_capacity(self): + for n in range(100): + self.samples.append(DurationSample(0, 0)) + self.sample_capacitiy += 100 + + def begin(self): + if (self.sample_count >= self.sample_capacitiy): + self.add_capacity() + self.samples[self.sample_count].begin = time.time() - self.time_beginning + self.sampling = True + + def end(self): + assert self.sampling + self.samples[self.sample_count].end = time.time() - self.time_beginning + self.sample_count += 1 + self.sampling = False + +@dataclass +class MetricSample: + time : float + value : float + id : str + + def trace(self, key): + return '{"name":"%s","ts":%f,"ph":"C","pid":1,"args": {"%s":%f}}\n' % (key, self.time * 1000 * 1000, self.id, self.value) + +class MetricSeries: + def __init__(self, key:str, time_beginning:float): + self.key:str = key + self.time_beginning = time_beginning + self.samples:List(MetricSample) = [] + self.sample_capacitiy:int = 0 + self.sample_count:int = 0 + + def add_capacity(self): + for n in range(100): + self.samples.append(MetricSample(0, 0, "")) + self.sample_capacitiy += 100 + + def add(self, _id:str, value:float|int): + if (self.sample_count >= self.sample_capacitiy): + self.add_capacity() + self.samples[self.sample_count].time = time.time() - self.time_beginning + self.samples[self.sample_count].value = value + self.samples[self.sample_count].id = _id + self.sample_count += 1 + +@dataclass +class DummySample: + def end(self): + pass + +class Benchmarker: + "Simple benchmarker that generates a trace json file" + def __init__(self, run:bool): + "Run: Whether to generate a trace or not" + self.run = run + self.time_beginning = time.time() + self.timings = {} + self.metrics = {} + + def sample_begin(self, key:str): + if not self.run: + return DummySample() + sampler = self.timings.setdefault(key, DurationSeries(key, self.time_beginning)) + sampler.begin() + return sampler + + def sample_end(self, key:str): + if not self.run: + return + assert key in self.timings + sampler = self.timings[key] + sampler.end(key) + + def add_metric_sample(self, key:str, value_id:str, value:float|int): + if not self.run: + return DummySample() + sampler = self.metrics.setdefault(key, MetricSeries(key, self.time_beginning)) + sampler.add(value_id, value) + return sampler + + def report(self, stem:str): + # Chrome trace format + # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview + if not self.run: + return + fn = f"{stem}.json" + with open(fn, "w") as f: + # Manually write out as JSON, as we want it fairly compact + f.write("{\n") + f.write(' "displayTimeUnit": "ms",\n') + f.write(' "traceEvents": [\n') + for key, series in self.timings.items(): + for n in range(series.sample_count): + sample = series.samples[n] + f.write(sample.trace(key)) + for key, series in self.metrics.items(): + for n in range(series.sample_count): + sample = series.samples[n] + f.write(sample.trace(key)) + f.write("]\n") + f.write("}\n") + pass diff --git a/ts_probe.py b/ts_probe.py index fb62df4..c28b4ba 100644 --- a/ts_probe.py +++ b/ts_probe.py @@ -1,72 +1,126 @@ """ Transport stream probe utility """ import os import time -from collections import deque import av from av import VideoFrame, AudioFrame from audiobuffer import AudioBuffer from videobuffer import VideoBuffer from prometheus import Prometheus from logger import log +from benchmark import Benchmarker + -prom = Prometheus() DEFAULT_VIDEO_URL = 'http://simula.frikanalen.no:9094/frikanalen.ts' PROMETHEUS_PORT = 8000 +# Generates a trace-ts-probe.json that can be opened in about:tracing or https://ui.perfetto.dev/ +BENCHMARK = False +# Use threading. +DO_THREADING = False +# Interval between analysis +VIDEO_PROBE_INTERVAL = 5 +AUDIO_PROBE_INTERVAL = 5 + +# Benchmark tuning +BENCHMARK_STOP_FRAME = -1 +if BENCHMARK: + BENCHMARK_STOP_FRAME = 100 +if BENCHMARK: + VIDEO_PROBE_INTERVAL = 1 + AUDIO_PROBE_INTERVAL = 1 + # MPEG-2 transport stream URL url = os.environ.get('VIDEO_URL', None) - if url is None: url = DEFAULT_VIDEO_URL log.warning("No video URL specified, using default: %s" % url) +# Create buffers audio_buffer = AudioBuffer() video_buffer = VideoBuffer() -START_TIME = time.time() + # Start Prometheus HTTP server +prom = Prometheus() prom.listen(PROMETHEUS_PORT) def run(): - VIDEO_PROBE_INTERVAL = 10 - AUDIO_PROBE_INTERVAL = 10 + bench = Benchmarker(BENCHMARK) + smp = bench.sample_begin("Warmup") log.info("Opening stream: %s" % url) stream = av.open(url) + if DO_THREADING: + stream.streams.video[0].thread_type = "AUTO" + log.info("Stream is open") + smp.end() + # Bookkeeping + local_video_frame_count = 0 # Used by benchmark to track frame + video_interval_counter = 0 # Used to track intervals + audio_interval_counter = 0 + # Start the clock (for benchmarking) + START_TIME = time.time() while True: - log.info("Stream is open") + frame_smp = bench.sample_begin("Frame") + frame_decode_smp = bench.sample_begin("Frame-Decode") + frame_iter = iter(stream.decode()) try: - for frame in stream.decode(): - if isinstance(frame, VideoFrame): - prom.video_frame_count.inc() + frame = next(frame_iter) + frame_decode_smp.end() + if isinstance(frame, VideoFrame): + prom.video_frame_count.inc() + local_video_frame_count += 1 - VIDEO_PROBE_INTERVAL -= 1 - if VIDEO_PROBE_INTERVAL == 0: - video_buffer.append(frame) - prom.video_brightness_gauge.set( - video_buffer.avg_brightness) - prom.motion_gauge.set( - video_buffer.motion) - VIDEO_PROBE_INTERVAL = 20 + video_interval_counter += 1 + if VIDEO_PROBE_INTERVAL == video_interval_counter: + # + video_analysis_smp = bench.sample_begin("Video-Analysis") + # Add frame + video_buffer.append(frame) + avg_brightness = video_buffer.avg_brightness + motion = video_buffer.motion + prom.video_brightness_gauge.set(avg_brightness) + prom.motion_gauge.set(motion) + video_interval_counter = 0 + video_analysis_smp.end() + if BENCHMARK: + motion *= 1000 + avg_brightness *= 1000 + print(f"Motion: {motion:12.3f} Brightness: {avg_brightness:12.3f}") + bench.add_metric_sample("video", "motion", motion) + bench.add_metric_sample("video", "brightness", avg_brightness) - elif isinstance(frame, AudioFrame): - audio_buffer.append(frame) - AUDIO_PROBE_INTERVAL -= 1 - - if AUDIO_PROBE_INTERVAL == 0: - prom.audio_amplitude_lufs_gauge.set( - audio_buffer.lufs()) - prom.audio_amplitude_dbfs_gauge.set( - audio_buffer.dbfs(0)) - AUDIO_PROBE_INTERVAL = 20 + elif isinstance(frame, AudioFrame): + audio_buffer.append(frame) + audio_interval_counter += 1 + if AUDIO_PROBE_INTERVAL == audio_interval_counter: + audio_analysis_smp = bench.sample_begin("Audio-Analysis") + lufs = audio_buffer.lufs() + dbfs = audio_buffer.dbfs(0) + prom.audio_amplitude_lufs_gauge.set(lufs) + prom.audio_amplitude_dbfs_gauge.set(dbfs) + audio_interval_counter = 0 + audio_analysis_smp.end() + bench.add_metric_sample("audio", "lufs", lufs) + bench.add_metric_sample("audio", "dbfs", dbfs) + if BENCHMARK: + if local_video_frame_count == BENCHMARK_STOP_FRAME: + dt = time.time() - START_TIME + avg = dt / local_video_frame_count * 1000; + print(f"Average: {avg:.3}ms") + break except av.AVError as e: + frame_decode_smp.end() log.error(e) prom.decode_error_count.inc() + reopen_smp = bench.sample_begin("Re-open") stream = av.open(url) + reopen_smp.end() except KeyboardInterrupt: log.info("Keyboard interrupt") break - + frame_smp.end() + bench.report("trace-ts-probe") run() diff --git a/videobuffer.py b/videobuffer.py index 00ef0d1..a754c95 100644 --- a/videobuffer.py +++ b/videobuffer.py @@ -6,51 +6,60 @@ from av import VideoFrame from itertools import islice -BUFFER_SIZE = 10 -FRAME_RATE = 50 - - -def _motion(buffer: deque[npt.NDArray[np.uint8]]): - """ Returns the average motion between the given frames. """ - if len(buffer) < 2: - log.warning("Buffer too small to calculate motion") - return 0 - - frame_diffs = [ - np.mean(abs(buffer[i].astype(np.int16) - frame).astype(np.uint8)) - for i, frame in enumerate(islice(buffer, 1, None)) - ] - - # Return the mean of the absolute differences - return np.mean(frame_diffs) / 255 - - -def _avg_brightness(buffer: deque[npt.NDArray[np.uint8]]): - """ Returns the average brightness of the given frames. """ - - if len(buffer) == 0: - log.warning("Buffer empty, returning 0") - return 0 - - return np.mean(buffer) / 255 +BUFFER_SIZE = 25 +REDUCE_BEFORE_MOTION = True +BLUR_BEFORE_MOTION = False +BLUR_KERNEL = np.array([0.20, 0.20, 0.20, 0.20, 0.20]) +class ProbeFrame: + " A frame including metrics of the videoframe" + def __init__(self, frame: VideoFrame, last_frame: "ProbeFrame"): + self.frame:npt.NDArray[np.fl] = frame.reformat(format='gray8').to_ndarray().astype(np.uint8) + # Calculate brightness + self.brightness = np.mean(self.frame) + # Resize + if REDUCE_BEFORE_MOTION: + # Non-interpolating reduction. This is to reduce the chance of triggering on interlaced stalls + # If one wants to reduce further, I recommend proper resampling + self.frame = self.frame[::2, ::2].astype("float32") / 255 + # Box-blur + if BLUR_BEFORE_MOTION: + self.frame = np.apply_along_axis(lambda x: np.convolve(x, BLUR_KERNEL, mode='same'), 0, self.frame) + self.frame = np.apply_along_axis(lambda x: np.convolve(x, BLUR_KERNEL, mode='same'), 1, self.frame) + # Calculate motion + if last_frame: + self.motion = np.mean(np.power(np.abs(self.frame - last_frame.frame), 0.5)) + self.has_motion:bool = last_frame is not None class VideoBuffer(): - """ A circular buffer for video frames. + """ A circular buffer for video frames. Contains analysis functions. """ - video_buffer = deque(maxlen=BUFFER_SIZE * FRAME_RATE) + video_buffer = deque(maxlen=BUFFER_SIZE) + last_frame = None def append(self, frame: VideoFrame): """ Appends a frame to the buffer. """ - self.video_buffer.append(frame.reformat( - format='gray8').to_ndarray().astype(np.uint8)) + pb = ProbeFrame(frame, self.last_frame) + self.video_buffer.append(pb) + self.last_frame = pb @property def avg_brightness(self): """ Returns the average brightness of the frames in the buffer. """ - return _avg_brightness(self.video_buffer) + #return _avg_brightness(self.video_buffer) + l = [f.brightness for f in self.video_buffer] + nda = np.array(l, dtype="float") + if len(nda) == 0: + return 0 + return np.mean(nda) / 255 @property def motion(self): """ Returns the average motion between the frames in the buffer. """ - return _motion(self.video_buffer) + #return _motion(self.video_buffer) + l = [f.motion for f in self.video_buffer if f.has_motion] + nda = np.array(l, dtype="float") + if len(nda) == 0: + return 0 + return np.mean(nda) +