From 82d62981e42f7ce13015774a3ea33933b2d1fa5b Mon Sep 17 00:00:00 2001 From: Daisuke Murase Date: Mon, 13 Jan 2025 13:11:00 -0800 Subject: [PATCH] test FFI implementation --- livekit-ffi/protocol/ffi.proto | 5 ++++ livekit-ffi/protocol/room.proto | 11 ++++++++- livekit-ffi/src/livekit.proto.rs | 24 +++++++++++++++++-- livekit-ffi/src/server/requests.rs | 20 ++++++++++++++++ livekit-ffi/src/server/room.rs | 20 ++++++++++++++++ .../src/room/participant/local_participant.rs | 5 +++- livekit/src/rtc_engine/rtc_session.rs | 7 +++--- 7 files changed, 84 insertions(+), 8 deletions(-) diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 8c715bbf..29a7712b 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -116,6 +116,8 @@ message FfiRequest { SendStreamHeaderRequest send_stream_header = 44; SendStreamChunkRequest send_stream_chunk = 45; + // Data Channel + SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 46; } } @@ -177,6 +179,9 @@ message FfiResponse { // Data Streams SendStreamHeaderResponse send_stream_header = 43; SendStreamChunkResponse send_stream_chunk = 44; + + // Data Channel + SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 45; } } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 90f14089..c9722daa 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -619,4 +619,13 @@ message SendStreamHeaderCallback { message SendStreamChunkCallback { required uint64 async_id = 1; optional string error = 2; -} \ No newline at end of file +} + +message SetDataChannelBufferedAmountLowThresholdRequest { + required uint64 local_participant_handle = 1; + required uint64 threshold = 2; +} + +message SetDataChannelBufferedAmountLowThresholdResponse { + required uint64 async_id = 1; +} diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 13b49548..b509ded0 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -3171,6 +3171,20 @@ pub struct SendStreamChunkCallback { #[prost(string, optional, tag="2")] pub error: ::core::option::Option<::prost::alloc::string::String>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetDataChannelBufferedAmountLowThresholdRequest { + #[prost(uint64, required, tag="1")] + pub local_participant_handle: u64, + #[prost(uint64, required, tag="2")] + pub threshold: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetDataChannelBufferedAmountLowThresholdResponse { + #[prost(uint64, required, tag="1")] + pub async_id: u64, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum IceTransportType { @@ -3943,7 +3957,7 @@ pub struct RpcMethodInvocationEvent { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -4046,13 +4060,16 @@ pub mod ffi_request { SendStreamHeader(super::SendStreamHeaderRequest), #[prost(message, tag="45")] SendStreamChunk(super::SendStreamChunkRequest), + /// Data Channel + #[prost(message, tag="46")] + SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -4153,6 +4170,9 @@ pub mod ffi_response { SendStreamHeader(super::SendStreamHeaderResponse), #[prost(message, tag="44")] SendStreamChunk(super::SendStreamChunkResponse), + /// Data Channel + #[prost(message, tag="45")] + SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdResponse), } } /// To minimize complexity, participant events are not included in the protocol. diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 16739361..a553dea8 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -895,6 +895,21 @@ fn on_rpc_method_invocation_response( Ok(proto::RpcMethodInvocationResponseResponse { error }) } +fn on_set_data_channel_buffered_amount_low_threshold( + server: &'static FfiServer, + set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest, +) -> FfiResult { + let ffi_participant = server + .retrieve_handle::( + set_data_channel_buffered_amount_low_threshold.local_participant_handle, + )? + .clone(); + Ok(ffi_participant.room.set_data_channel_buffered_amount_low_threshold( + server, + set_data_channel_buffered_amount_low_threshold, + )) +} + #[allow(clippy::field_reassign_with_default)] // Avoid uggly format pub fn handle_request( server: &'static FfiServer, @@ -1063,6 +1078,11 @@ pub fn handle_request( proto::ffi_request::Message::SendStreamChunk(request) => { proto::ffi_response::Message::SendStreamChunk(on_send_stream_chunk(server, request)?) } + proto::ffi_request::Message::SetDataChannelBufferedAmountLowThreshold(request) => { + proto::ffi_response::Message::SetDataChannelBufferedAmountLowThreshold( + on_set_data_channel_buffered_amount_low_threshold(server, request)?, + ) + } }); Ok(res) diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 75fbfa76..4ae02b82 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -746,6 +746,26 @@ impl RoomInner { ) -> Option>> { return self.rpc_method_invocation_waiters.lock().remove(&invocation_id); } + + pub fn set_data_channel_buffered_amount_low_threshold( + self: &Arc, + server: &'static FfiServer, + set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest, + ) -> proto::SetDataChannelBufferedAmountLowThresholdResponse { + let async_id = server.next_id(); + let inner = self.clone(); + let handle = server.async_runtime.spawn(async move { + let _ = inner + .room + .local_participant() + .set_data_channel_buffered_amount_low_threshold( + set_data_channel_buffered_amount_low_threshold.threshold, + ) + .await; + }); + server.watch_panic(handle); + proto::SetDataChannelBufferedAmountLowThresholdResponse { async_id } + } } // Task used to publish data without blocking the client thread diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 861b19b5..ab57b21f 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -481,7 +481,10 @@ impl LocalParticipant { self.inner.rtc_engine.publish_data(&data, kind).await.map_err(Into::into) } - pub async fn set_data_channel_buffered_amount_low_threshold(&self, threshold: u64) -> RoomResult<()> { + pub async fn set_data_channel_buffered_amount_low_threshold( + &self, + threshold: u64, + ) -> RoomResult<()> { self.inner.rtc_engine.session().set_data_channel_buffered_amount_low_threshold(threshold); Ok(()) } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 5b888d6b..6466148c 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -391,8 +391,6 @@ impl RtcSession { pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse { self.inner.get_response(request_id).await } - - } impl SessionInner { @@ -774,7 +772,8 @@ impl SessionInner { } DataPacketKind::Reliable => { self.reliable_dc_buffered_amount.store(amount, Ordering::Relaxed); - let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); + let threshold = + self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); if amount <= threshold { self.reliable_dc_buffered_amount_low_notify.notify_one(); } @@ -1007,7 +1006,7 @@ impl SessionInner { let amount = self.reliable_dc_buffered_amount.load(Ordering::Relaxed); let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed); if amount <= threshold { - return Ok(()) + return Ok(()); } self.reliable_dc_buffered_amount_low_notify.notified().await; Ok(())