diff --git a/Cargo.toml b/Cargo.toml index 3d44566..bec9c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ savant_core = { path = "savant_core" } savant_core_py = { path = "savant_core_py" } [workspace.package] -version = "0.2.16" +version = "0.2.17" edition = "2021" authors = ["Ivan Kudriavtsev "] description = "Savant Rust core functions library" diff --git a/python/pipeline_prev_uuid.py b/python/pipeline_prev_uuid.py new file mode 100644 index 0000000..a7a5e96 --- /dev/null +++ b/python/pipeline_prev_uuid.py @@ -0,0 +1,49 @@ +import time +from threading import Thread, current_thread + +import savant_plugin_sample +import savant_rs +from savant_rs.logging import log, LogLevel, log_level_enabled +from savant_rs.logging import set_log_level +from savant_rs.pipeline import VideoPipelineStagePayloadType, VideoPipeline, VideoPipelineConfiguration, StageFunction +from savant_rs.primitives import AttributeValue + +set_log_level(LogLevel.Trace) + +from savant_rs.utils import gen_frame, TelemetrySpan, enable_dl_detection + +if __name__ == "__main__": + savant_rs.savant_rs.version() + enable_dl_detection() # enables internal DL detection (checks every 5 secs) + log(LogLevel.Info, "root", "Begin operation", dict(savant_rs_version=savant_rs.version())) + + # from savant_rs import init_jaeger_tracer + # init_jaeger_tracer("demo-pipeline", "localhost:6831") + + conf = VideoPipelineConfiguration() + conf.append_frame_meta_to_otlp_span = True + conf.frame_period = 1 # every single frame, insane + conf.timestamp_period = 1000 # every sec + + p = VideoPipeline("video-pipeline-root", [ + ("input", VideoPipelineStagePayloadType.Frame, StageFunction.none(), StageFunction.none()), + ], conf) + p.sampling_period = 10 + + assert p.get_stage_type("input") == VideoPipelineStagePayloadType.Frame + frame1 = gen_frame() + frame1.keyframe = True + frame1.source_id = "test1" + frame_id1 = p.add_frame("input", frame1) + frame1, _ = p.get_independent_frame(frame_id1) + assert frame1.previous_keyframe_uuid is None + uuid = frame1.uuid + + frame2 = gen_frame() + frame2.keyframe = False + frame2.source_id = "test1" + frame_id2 = p.add_frame("input", frame2) + frame2, _ = p.get_independent_frame(frame_id2) + assert frame2.previous_keyframe_uuid == uuid + + del p diff --git a/savant_core/benches/bench_pipeline.rs b/savant_core/benches/bench_pipeline.rs index 5a5ada6..594083c 100644 --- a/savant_core/benches/bench_pipeline.rs +++ b/savant_core/benches/bench_pipeline.rs @@ -58,7 +58,12 @@ fn get_pipeline( .frame_period(Some(100)) .build()?; - let pipeline = Pipeline::new(stages.clone(), conf)?; + let pipeline_stages = stages + .iter() + .map(|(name, payload)| (name.clone(), payload.clone(), None, None)) + .collect(); + + let pipeline = Pipeline::new(pipeline_stages, conf)?; stages.pop(); pipeline.set_root_span_name("bench".to_owned())?; Ok((pipeline, stages)) diff --git a/savant_core/src/pipeline.rs b/savant_core/src/pipeline.rs index 7a2a144..ad6892b 100644 --- a/savant_core/src/pipeline.rs +++ b/savant_core/src/pipeline.rs @@ -256,6 +256,7 @@ pub(super) mod implementation { stages: Vec, frame_locations: SavantRwLock>, frame_ordering: SavantRwLock>, + keyframe_tracking: SavantRwLock>, sampling_period: OnceLock, root_span_name: OnceLock, configuration: PipelineConfiguration, @@ -273,6 +274,9 @@ pub(super) mod implementation { frame_ordering: SavantRwLock::new(LruCache::new( NonZeroUsize::try_from(MAX_TRACKED_STREAMS).unwrap(), )), + keyframe_tracking: SavantRwLock::new(LruCache::new( + NonZeroUsize::try_from(MAX_TRACKED_STREAMS).unwrap(), + )), sampling_period: OnceLock::new(), root_span_name: OnceLock::new(), configuration: PipelineConfiguration::default(), @@ -530,7 +534,18 @@ pub(super) mod implementation { } else { frame.set_previous_frame_seq_id(None); } - ordering.put(source_id, id_counter); + ordering.put(source_id.clone(), id_counter); + + let mut keyframe_tracking = self.keyframe_tracking.write(); + let last_keyframe = keyframe_tracking.get(&source_id); + if let Some(last) = last_keyframe { + frame.set_previous_keyframe(Some(*last)); + } else { + frame.set_previous_keyframe(None); + } + if let Some(true) = frame.get_keyframe() { + keyframe_tracking.put(source_id, frame.get_uuid_u128()); + } let ctx = self.get_stage_span(id_counter, format!("add/{}", stage_name)); let frame_payload = PipelinePayload::Frame(frame, Vec::new(), ctx); diff --git a/savant_core/src/primitives/frame.rs b/savant_core/src/primitives/frame.rs index d918758..6eb7e0d 100644 --- a/savant_core/src/primitives/frame.rs +++ b/savant_core/src/primitives/frame.rs @@ -97,6 +97,8 @@ impl ToSerdeJsonValue for VideoFrameTransformation { pub struct VideoFrame { #[builder(setter(skip))] pub previous_frame_seq_id: Option, + #[builder(setter(skip))] + pub previous_keyframe: Option, pub source_id: String, pub uuid: u128, #[builder(setter(skip))] @@ -133,6 +135,7 @@ impl Default for VideoFrame { fn default() -> Self { Self { previous_frame_seq_id: None, + previous_keyframe: None, source_id: String::new(), uuid: Uuid::new_v4().as_u128(), creation_timestamp_ns: SystemTime::now() @@ -164,6 +167,8 @@ impl ToSerdeJsonValue for VideoFrame { let version = version(); serde_json::json!( { + "previous_frame_seq_id": self.previous_frame_seq_id, + "previous_keyframe": self.previous_keyframe, "version": version, "uuid": frame_uuid, "creation_timestamp_ns": if self.creation_timestamp_ns > 2^53 { 2^53 } else { self.creation_timestamp_ns }, @@ -805,6 +810,20 @@ impl VideoFrameProxy { inner.previous_frame_seq_id = previous_frame_seq_id; } + pub fn get_previous_keyframe(&self) -> Option { + trace!(self.inner.read_recursive()).previous_keyframe + } + + pub fn get_previous_keyframe_as_string(&self) -> Option { + self.get_previous_keyframe() + .map(|ku| Uuid::from_u128(ku).to_string()) + } + + pub(crate) fn set_previous_keyframe(&mut self, previous_keyframe: Option) { + let mut inner = trace!(self.inner.write()); + inner.previous_keyframe = previous_keyframe; + } + pub fn set_source_id(&mut self, source_id: &str) { let mut inner = trace!(self.inner.write()); inner.source_id = source_id.to_string(); @@ -822,6 +841,10 @@ impl VideoFrameProxy { Uuid::from_u128(trace!(self.inner.read_recursive()).uuid) } + pub fn get_uuid_u128(&self) -> u128 { + trace!(self.inner.read_recursive()).uuid + } + pub fn get_uuid_as_string(&self) -> String { self.get_uuid().to_string() } diff --git a/savant_core/src/protobuf/generated.rs b/savant_core/src/protobuf/generated.rs index 03a2796..91e022e 100644 --- a/savant_core/src/protobuf/generated.rs +++ b/savant_core/src/protobuf/generated.rs @@ -425,6 +425,8 @@ pub struct VideoFrame { pub attributes: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "25")] pub objects: ::prost::alloc::vec::Vec, + #[prost(string, optional, tag = "26")] + pub previous_keyframe: ::core::option::Option<::prost::alloc::string::String>, #[prost(oneof = "video_frame::Content", tags = "17, 18, 19")] pub content: ::core::option::Option, } diff --git a/savant_core/src/protobuf/savant_rs.proto b/savant_core/src/protobuf/savant_rs.proto index 2ef2749..40ce09c 100644 --- a/savant_core/src/protobuf/savant_rs.proto +++ b/savant_core/src/protobuf/savant_rs.proto @@ -281,6 +281,7 @@ message VideoFrame { repeated VideoFrameTransformation transformations = 23; repeated Attribute attributes = 24; repeated VideoObject objects = 25; + optional string previous_keyframe = 26; } message VideoFrameBatch { diff --git a/savant_core/src/protobuf/serialize/video_frame.rs b/savant_core/src/protobuf/serialize/video_frame.rs index bd05555..81a24f7 100644 --- a/savant_core/src/protobuf/serialize/video_frame.rs +++ b/savant_core/src/protobuf/serialize/video_frame.rs @@ -29,6 +29,9 @@ impl From<&Box> for generated::VideoFrame { generated::VideoFrame { previous_frame_seq_id: video_frame.previous_frame_seq_id, + previous_keyframe: video_frame + .previous_keyframe + .map(|ku| Uuid::from_u128(ku).to_string()), source_id: video_frame.source_id.clone(), uuid: Uuid::from_u128(video_frame.uuid).to_string(), creation_timestamp_ns_high: (video_frame.creation_timestamp_ns >> 64) as u64, @@ -102,6 +105,11 @@ impl TryFrom<&generated::VideoFrame> for VideoFrame { Ok(VideoFrame { previous_frame_seq_id: value.previous_frame_seq_id, + previous_keyframe: value + .previous_keyframe + .as_ref() + .map(|ku| Uuid::from_str(ku).map(|u| u.as_u128())) + .transpose()?, source_id: value.source_id.clone(), uuid: Uuid::from_str(&value.uuid)?.as_u128(), creation_timestamp_ns: (value.creation_timestamp_ns_high as u128) << 64 diff --git a/savant_core_py/src/primitives/frame.rs b/savant_core_py/src/primitives/frame.rs index e0ddf85..9ca361c 100644 --- a/savant_core_py/src/primitives/frame.rs +++ b/savant_core_py/src/primitives/frame.rs @@ -704,6 +704,11 @@ impl VideoFrame { self.0.get_previous_frame_seq_id() } + #[getter] + pub fn get_previous_keyframe_uuid(&self) -> Option { + self.0.get_previous_keyframe_as_string() + } + #[getter] #[pyo3(name = "json")] pub fn json_gil(&self) -> String {