Skip to content

Commit

Permalink
Update savant-rs to 0.2.13 (#673)
Browse files Browse the repository at this point in the history
* Switch to Savant-Rs 0.2 (#622)

* #612 update savant-rs to 0.2.1

* #612 move message deserialization to zeromq_src

* #612 fix always-on-sink

* #612 move savant_rs serialization to zeromq_sink

* #612 use savant_rs.zmq

* #612 update savant-rs to 0.2.5

* #612 fix adapters

* #612 fix receiving EOS from ZeroMQ

* #612 fix adapters

* #612 fix video files sink adapter

* #612 update savant-rs to 0.2.9

* #612 don't save non-persistent attributes to VideoFrame

* #612 fix zeromq_sink gst plugin

* #612 fix adapters

* #612 fix module

* #612 fix configuring zmq

* #612 measure performance on A4000

* #612 update savant-rs to 0.2.12

* #612 fix copying bboxes

* #612 update savant-rs to 0.2.13

* #612 fix benchmark for panoptic_driving_perception

* #612 measure performance

* #612 fix typo

* Update version to 0.2.10
  • Loading branch information
tomskikh authored Feb 28, 2024
1 parent a0eb534 commit cc16746
Show file tree
Hide file tree
Showing 52 changed files with 2,085 additions and 2,299 deletions.
10 changes: 5 additions & 5 deletions adapters/ds/sinks/always_on_rtsp/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ def main():

if not config.source_id:
internal_socket = 'ipc:///tmp/ao-sink-internal-socket.ipc'
internal_zmq_endpoint = f'sub+connect:{internal_socket}'
zmq_reader_endpoint = f'sub+connect:{internal_socket}'
zmq_proxy = ZeroMqProxy(
input_socket=config.zmq_endpoint,
input_socket_type=config.zmq_socket_type,
input_bind=config.zmq_socket_bind,
output_socket=internal_socket,
output_socket=f'pub+bind:{internal_socket}',
)
zmq_proxy.start()
zmq_proxy_thread = Thread(target=zmq_proxy.run, daemon=True)
zmq_proxy_thread.start()
else:
internal_zmq_endpoint = config.zmq_endpoint
zmq_reader_endpoint = config.zmq_endpoint

if config.dev_mode:
mediamtx_process = Popen(
Expand All @@ -114,15 +114,15 @@ def main():
config.source_id: run_ao_sink_process(
config.source_id,
config.rtsp_uri,
internal_zmq_endpoint,
zmq_reader_endpoint,
)
}
else:
ao_sink_processes = {
source_id: run_ao_sink_process(
source_id,
f'{config.rtsp_uri.rstrip("/")}/{source_id}',
internal_zmq_endpoint,
zmq_reader_endpoint,
)
for source_id in config.source_ids
}
Expand Down
20 changes: 10 additions & 10 deletions adapters/ds/sinks/always_on_rtsp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ def __init__(self):
self.fps_output = opt_config('FPS_OUTPUT', 'stdout')

self.metadata_output = opt_config('METADATA_OUTPUT')
if self.metadata_output:
self.pipeline_stage_name = 'source'
self.video_pipeline: Optional[VideoPipeline] = VideoPipeline(
'always-on-sink',
[(self.pipeline_stage_name, VideoPipelineStagePayloadType.Frame)],
VideoPipelineConfiguration(),
)
else:
self.pipeline_stage_name = None
self.video_pipeline: Optional[VideoPipeline] = None
self.pipeline_source_stage_name = 'source'
self.pipeline_demux_stage_name = 'source-demux'
self.video_pipeline: Optional[VideoPipeline] = VideoPipeline(
'always-on-sink',
[
(self.pipeline_source_stage_name, VideoPipelineStagePayloadType.Frame),
(self.pipeline_demux_stage_name, VideoPipelineStagePayloadType.Frame),
],
VideoPipelineConfiguration(),
)

self.framerate = opt_config('FRAMERATE', '30/1')
self.sync = opt_config('SYNC_OUTPUT', False, strtobool)
Expand Down
41 changes: 29 additions & 12 deletions adapters/ds/sinks/always_on_rtsp/input_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ def log_frame_metadata(pad: Gst.Pad, info: Gst.PadProbeInfo, config: Config):
return Gst.PadProbeReturn.OK


def delete_frame_from_pipeline(pad: Gst.Pad, info: Gst.PadProbeInfo, config: Config):
buffer: Gst.Buffer = info.get_buffer()
savant_frame_meta = gst_buffer_get_savant_frame_meta(buffer)
if savant_frame_meta is None:
logger.warning(
'Source %s. No Savant Frame Metadata found on buffer with PTS %s.',
config.source_id,
buffer.pts,
)
return Gst.PadProbeReturn.PASS

config.video_pipeline.delete(savant_frame_meta.idx)
return Gst.PadProbeReturn.OK


def link_added_pad(
element: Gst.Element,
src_pad: Gst.Pad,
Expand Down Expand Up @@ -61,6 +76,8 @@ def on_demuxer_pad_added(
codec = CODEC_BY_CAPS_NAME[caps[0].get_name()]
if config.metadata_output:
src_pad.add_probe(Gst.PadProbeType.BUFFER, log_frame_metadata, config)
else:
src_pad.add_probe(Gst.PadProbeType.BUFFER, delete_frame_from_pipeline, config)

if codec == Codec.RAW_RGBA:
capsfilter = factory.create(
Expand Down Expand Up @@ -93,25 +110,25 @@ def build_input_pipeline(
factory: GstElementFactory,
):
pipeline: Gst.Pipeline = Gst.Pipeline.new('input-pipeline')
savant_rs_video_demux_properties = {
zeromq_src_properties = {
'source-id': config.source_id,
'socket': config.zmq_endpoint,
'socket-type': config.zmq_socket_type.name,
'bind': config.zmq_socket_bind,
'max-width': config.max_allowed_resolution[0],
'max-height': config.max_allowed_resolution[1],
'pipeline': config.video_pipeline,
'pipeline-stage-name': config.pipeline_source_stage_name,
}
savant_rs_video_demux_properties = {
'pipeline': config.video_pipeline,
'pipeline-stage-name': config.pipeline_demux_stage_name,
}
if config.pipeline_stage_name is not None:
savant_rs_video_demux_properties[
'pipeline-stage-name'
] = config.pipeline_stage_name
savant_rs_video_demux_properties['pipeline'] = config.video_pipeline

source_elements = [
PipelineElement(
'zeromq_src',
properties={
'source-id': config.source_id,
'socket': config.zmq_endpoint,
'socket-type': config.zmq_socket_type.name,
'bind': config.zmq_socket_bind,
},
properties=zeromq_src_properties,
),
PipelineElement(
'savant_rs_video_demux',
Expand Down
26 changes: 14 additions & 12 deletions adapters/ds/sinks/always_on_rtsp/zeromq_proxy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Optional

import zmq
from savant_rs.zmq import BlockingWriter, WriterConfigBuilder

from savant.utils.zeromq import Defaults, SenderSocketTypes, ZeroMQSource
from savant.utils.zeromq import ZeroMQSource


class ZeroMqProxy:
"""A proxy that receives messages from a ZeroMQ socket and forwards them
to another PUB ZeroMQ socket. Needed for multi-stream Always-On-RTSP sink.
to another ZeroMQ socket. Needed for multi-stream Always-On-RTSP sink.
"""

def __init__(
Expand All @@ -22,19 +22,21 @@ def __init__(
socket_type=input_socket_type,
bind=input_bind,
)
self.output_socket = output_socket
self.sender: Optional[zmq.Socket] = None
self.output_zmq_context: Optional[zmq.Context] = None
writer_config_builder = WriterConfigBuilder(output_socket)
self.writer_config = writer_config_builder.build()
self.sender: Optional[BlockingWriter] = None

def start(self):
self.output_zmq_context = zmq.Context()
self.sender = self.output_zmq_context.socket(SenderSocketTypes.PUB.value)
self.sender.setsockopt(zmq.SNDHWM, Defaults.SEND_HWM)
self.sender.bind(self.output_socket)
self.sender = BlockingWriter(self.writer_config)
self.sender.start()
self.source.start()

def run(self):
while True:
message = self.source.next_message_without_routing_id()
message = self.source.next_message()
if message is not None:
self.sender.send_multipart(message)
self.sender.send_message(
bytes(message.topic).decode(),
message.message,
message.content,
)
Loading

0 comments on commit cc16746

Please sign in to comment.