Skip to content

Commit

Permalink
131 Reference to the last keyframe (#132)
Browse files Browse the repository at this point in the history
* wip previous keyframe uuid

* fixed benchmark code
  • Loading branch information
bwsw authored Apr 4, 2024
1 parent 095e05e commit 00dad69
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ savant_core = { path = "savant_core" }
savant_core_py = { path = "savant_core_py" }

[workspace.package]
version = "0.2.16"
version = "0.2.17"
edition = "2021"
authors = ["Ivan Kudriavtsev <[email protected]>"]
description = "Savant Rust core functions library"
Expand Down
49 changes: 49 additions & 0 deletions python/pipeline_prev_uuid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
from threading import Thread, current_thread

import savant_plugin_sample
import savant_rs
from savant_rs.logging import log, LogLevel, log_level_enabled
from savant_rs.logging import set_log_level
from savant_rs.pipeline import VideoPipelineStagePayloadType, VideoPipeline, VideoPipelineConfiguration, StageFunction
from savant_rs.primitives import AttributeValue

set_log_level(LogLevel.Trace)

from savant_rs.utils import gen_frame, TelemetrySpan, enable_dl_detection

if __name__ == "__main__":
savant_rs.savant_rs.version()
enable_dl_detection() # enables internal DL detection (checks every 5 secs)
log(LogLevel.Info, "root", "Begin operation", dict(savant_rs_version=savant_rs.version()))

# from savant_rs import init_jaeger_tracer
# init_jaeger_tracer("demo-pipeline", "localhost:6831")

conf = VideoPipelineConfiguration()
conf.append_frame_meta_to_otlp_span = True
conf.frame_period = 1 # every single frame, insane
conf.timestamp_period = 1000 # every sec

p = VideoPipeline("video-pipeline-root", [
("input", VideoPipelineStagePayloadType.Frame, StageFunction.none(), StageFunction.none()),
], conf)
p.sampling_period = 10

assert p.get_stage_type("input") == VideoPipelineStagePayloadType.Frame
frame1 = gen_frame()
frame1.keyframe = True
frame1.source_id = "test1"
frame_id1 = p.add_frame("input", frame1)
frame1, _ = p.get_independent_frame(frame_id1)
assert frame1.previous_keyframe_uuid is None
uuid = frame1.uuid

frame2 = gen_frame()
frame2.keyframe = False
frame2.source_id = "test1"
frame_id2 = p.add_frame("input", frame2)
frame2, _ = p.get_independent_frame(frame_id2)
assert frame2.previous_keyframe_uuid == uuid

del p
7 changes: 6 additions & 1 deletion savant_core/benches/bench_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ fn get_pipeline(
.frame_period(Some(100))
.build()?;

let pipeline = Pipeline::new(stages.clone(), conf)?;
let pipeline_stages = stages
.iter()
.map(|(name, payload)| (name.clone(), payload.clone(), None, None))
.collect();

let pipeline = Pipeline::new(pipeline_stages, conf)?;
stages.pop();
pipeline.set_root_span_name("bench".to_owned())?;
Ok((pipeline, stages))
Expand Down
17 changes: 16 additions & 1 deletion savant_core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub(super) mod implementation {
stages: Vec<PipelineStage>,
frame_locations: SavantRwLock<HashMap<i64, usize>>,
frame_ordering: SavantRwLock<LruCache<String, i64>>,
keyframe_tracking: SavantRwLock<LruCache<String, u128>>,
sampling_period: OnceLock<i64>,
root_span_name: OnceLock<String>,
configuration: PipelineConfiguration,
Expand All @@ -273,6 +274,9 @@ pub(super) mod implementation {
frame_ordering: SavantRwLock::new(LruCache::new(
NonZeroUsize::try_from(MAX_TRACKED_STREAMS).unwrap(),
)),
keyframe_tracking: SavantRwLock::new(LruCache::new(
NonZeroUsize::try_from(MAX_TRACKED_STREAMS).unwrap(),
)),
sampling_period: OnceLock::new(),
root_span_name: OnceLock::new(),
configuration: PipelineConfiguration::default(),
Expand Down Expand Up @@ -530,7 +534,18 @@ pub(super) mod implementation {
} else {
frame.set_previous_frame_seq_id(None);
}
ordering.put(source_id, id_counter);
ordering.put(source_id.clone(), id_counter);

let mut keyframe_tracking = self.keyframe_tracking.write();
let last_keyframe = keyframe_tracking.get(&source_id);
if let Some(last) = last_keyframe {
frame.set_previous_keyframe(Some(*last));
} else {
frame.set_previous_keyframe(None);
}
if let Some(true) = frame.get_keyframe() {
keyframe_tracking.put(source_id, frame.get_uuid_u128());
}

let ctx = self.get_stage_span(id_counter, format!("add/{}", stage_name));
let frame_payload = PipelinePayload::Frame(frame, Vec::new(), ctx);
Expand Down
23 changes: 23 additions & 0 deletions savant_core/src/primitives/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ impl ToSerdeJsonValue for VideoFrameTransformation {
pub struct VideoFrame {
#[builder(setter(skip))]
pub previous_frame_seq_id: Option<i64>,
#[builder(setter(skip))]
pub previous_keyframe: Option<u128>,
pub source_id: String,
pub uuid: u128,
#[builder(setter(skip))]
Expand Down Expand Up @@ -133,6 +135,7 @@ impl Default for VideoFrame {
fn default() -> Self {
Self {
previous_frame_seq_id: None,
previous_keyframe: None,
source_id: String::new(),
uuid: Uuid::new_v4().as_u128(),
creation_timestamp_ns: SystemTime::now()
Expand Down Expand Up @@ -164,6 +167,8 @@ impl ToSerdeJsonValue for VideoFrame {
let version = version();
serde_json::json!(
{
"previous_frame_seq_id": self.previous_frame_seq_id,
"previous_keyframe": self.previous_keyframe,
"version": version,
"uuid": frame_uuid,
"creation_timestamp_ns": if self.creation_timestamp_ns > 2^53 { 2^53 } else { self.creation_timestamp_ns },
Expand Down Expand Up @@ -805,6 +810,20 @@ impl VideoFrameProxy {
inner.previous_frame_seq_id = previous_frame_seq_id;
}

pub fn get_previous_keyframe(&self) -> Option<u128> {
trace!(self.inner.read_recursive()).previous_keyframe
}

pub fn get_previous_keyframe_as_string(&self) -> Option<String> {
self.get_previous_keyframe()
.map(|ku| Uuid::from_u128(ku).to_string())
}

pub(crate) fn set_previous_keyframe(&mut self, previous_keyframe: Option<u128>) {
let mut inner = trace!(self.inner.write());
inner.previous_keyframe = previous_keyframe;
}

pub fn set_source_id(&mut self, source_id: &str) {
let mut inner = trace!(self.inner.write());
inner.source_id = source_id.to_string();
Expand All @@ -822,6 +841,10 @@ impl VideoFrameProxy {
Uuid::from_u128(trace!(self.inner.read_recursive()).uuid)
}

pub fn get_uuid_u128(&self) -> u128 {
trace!(self.inner.read_recursive()).uuid
}

pub fn get_uuid_as_string(&self) -> String {
self.get_uuid().to_string()
}
Expand Down
2 changes: 2 additions & 0 deletions savant_core/src/protobuf/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ pub struct VideoFrame {
pub attributes: ::prost::alloc::vec::Vec<Attribute>,
#[prost(message, repeated, tag = "25")]
pub objects: ::prost::alloc::vec::Vec<VideoObject>,
#[prost(string, optional, tag = "26")]
pub previous_keyframe: ::core::option::Option<::prost::alloc::string::String>,
#[prost(oneof = "video_frame::Content", tags = "17, 18, 19")]
pub content: ::core::option::Option<video_frame::Content>,
}
Expand Down
1 change: 1 addition & 0 deletions savant_core/src/protobuf/savant_rs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ message VideoFrame {
repeated VideoFrameTransformation transformations = 23;
repeated Attribute attributes = 24;
repeated VideoObject objects = 25;
optional string previous_keyframe = 26;
}

message VideoFrameBatch {
Expand Down
8 changes: 8 additions & 0 deletions savant_core/src/protobuf/serialize/video_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ impl From<&Box<VideoFrame>> for generated::VideoFrame {

generated::VideoFrame {
previous_frame_seq_id: video_frame.previous_frame_seq_id,
previous_keyframe: video_frame
.previous_keyframe
.map(|ku| Uuid::from_u128(ku).to_string()),
source_id: video_frame.source_id.clone(),
uuid: Uuid::from_u128(video_frame.uuid).to_string(),
creation_timestamp_ns_high: (video_frame.creation_timestamp_ns >> 64) as u64,
Expand Down Expand Up @@ -102,6 +105,11 @@ impl TryFrom<&generated::VideoFrame> for VideoFrame {

Ok(VideoFrame {
previous_frame_seq_id: value.previous_frame_seq_id,
previous_keyframe: value
.previous_keyframe
.as_ref()
.map(|ku| Uuid::from_str(ku).map(|u| u.as_u128()))
.transpose()?,
source_id: value.source_id.clone(),
uuid: Uuid::from_str(&value.uuid)?.as_u128(),
creation_timestamp_ns: (value.creation_timestamp_ns_high as u128) << 64
Expand Down
5 changes: 5 additions & 0 deletions savant_core_py/src/primitives/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,11 @@ impl VideoFrame {
self.0.get_previous_frame_seq_id()
}

#[getter]
pub fn get_previous_keyframe_uuid(&self) -> Option<String> {
self.0.get_previous_keyframe_as_string()
}

#[getter]
#[pyo3(name = "json")]
pub fn json_gil(&self) -> String {
Expand Down

0 comments on commit 00dad69

Please sign in to comment.