Skip to content

Commit

Permalink
test FFI implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
typester committed Jan 13, 2025
1 parent ee64923 commit 82d6298
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 8 deletions.
5 changes: 5 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ message FfiRequest {
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;

// Data Channel
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 46;
}
}

Expand Down Expand Up @@ -177,6 +179,9 @@ message FfiResponse {
// Data Streams
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;

// Data Channel
SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 45;
}
}

Expand Down
11 changes: 10 additions & 1 deletion livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,13 @@ message SendStreamHeaderCallback {
message SendStreamChunkCallback {
required uint64 async_id = 1;
optional string error = 2;
}
}

message SetDataChannelBufferedAmountLowThresholdRequest {
required uint64 local_participant_handle = 1;
required uint64 threshold = 2;
}

message SetDataChannelBufferedAmountLowThresholdResponse {
required uint64 async_id = 1;
}
24 changes: 22 additions & 2 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3171,6 +3171,20 @@ pub struct SendStreamChunkCallback {
#[prost(string, optional, tag="2")]
pub error: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdResponse {
#[prost(uint64, required, tag="1")]
pub async_id: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down Expand Up @@ -3943,7 +3957,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -4046,13 +4060,16 @@ pub mod ffi_request {
SendStreamHeader(super::SendStreamHeaderRequest),
#[prost(message, tag="45")]
SendStreamChunk(super::SendStreamChunkRequest),
/// Data Channel
#[prost(message, tag="46")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -4153,6 +4170,9 @@ pub mod ffi_response {
SendStreamHeader(super::SendStreamHeaderResponse),
#[prost(message, tag="44")]
SendStreamChunk(super::SendStreamChunkResponse),
/// Data Channel
#[prost(message, tag="45")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down
20 changes: 20 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,21 @@ fn on_rpc_method_invocation_response(
Ok(proto::RpcMethodInvocationResponseResponse { error })
}

fn on_set_data_channel_buffered_amount_low_threshold(
server: &'static FfiServer,
set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> FfiResult<proto::SetDataChannelBufferedAmountLowThresholdResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(
set_data_channel_buffered_amount_low_threshold.local_participant_handle,
)?
.clone();
Ok(ffi_participant.room.set_data_channel_buffered_amount_low_threshold(
server,
set_data_channel_buffered_amount_low_threshold,
))
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -1063,6 +1078,11 @@ pub fn handle_request(
proto::ffi_request::Message::SendStreamChunk(request) => {
proto::ffi_response::Message::SendStreamChunk(on_send_stream_chunk(server, request)?)
}
proto::ffi_request::Message::SetDataChannelBufferedAmountLowThreshold(request) => {
proto::ffi_response::Message::SetDataChannelBufferedAmountLowThreshold(
on_set_data_channel_buffered_amount_low_threshold(server, request)?,
)
}
});

Ok(res)
Expand Down
20 changes: 20 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,26 @@ impl RoomInner {
) -> Option<oneshot::Sender<Result<String, RpcError>>> {
return self.rpc_method_invocation_waiters.lock().remove(&invocation_id);
}

pub fn set_data_channel_buffered_amount_low_threshold(
self: &Arc<Self>,
server: &'static FfiServer,
set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> proto::SetDataChannelBufferedAmountLowThresholdResponse {
let async_id = server.next_id();
let inner = self.clone();
let handle = server.async_runtime.spawn(async move {
let _ = inner
.room
.local_participant()
.set_data_channel_buffered_amount_low_threshold(
set_data_channel_buffered_amount_low_threshold.threshold,
)
.await;
});
server.watch_panic(handle);
proto::SetDataChannelBufferedAmountLowThresholdResponse { async_id }
}
}

// Task used to publish data without blocking the client thread
Expand Down
5 changes: 4 additions & 1 deletion livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,10 @@ impl LocalParticipant {
self.inner.rtc_engine.publish_data(&data, kind).await.map_err(Into::into)
}

pub async fn set_data_channel_buffered_amount_low_threshold(&self, threshold: u64) -> RoomResult<()> {
pub async fn set_data_channel_buffered_amount_low_threshold(
&self,
threshold: u64,
) -> RoomResult<()> {
self.inner.rtc_engine.session().set_data_channel_buffered_amount_low_threshold(threshold);
Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,6 @@ impl RtcSession {
pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse {
self.inner.get_response(request_id).await
}


}

impl SessionInner {
Expand Down Expand Up @@ -774,7 +772,8 @@ impl SessionInner {
}
DataPacketKind::Reliable => {
self.reliable_dc_buffered_amount.store(amount, Ordering::Relaxed);
let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed);
let threshold =
self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed);
if amount <= threshold {
self.reliable_dc_buffered_amount_low_notify.notify_one();
}
Expand Down Expand Up @@ -1007,7 +1006,7 @@ impl SessionInner {
let amount = self.reliable_dc_buffered_amount.load(Ordering::Relaxed);
let threshold = self.reliable_dc_buffered_amount_low_threshold.load(Ordering::Relaxed);
if amount <= threshold {
return Ok(())
return Ok(());
}
self.reliable_dc_buffered_amount_low_notify.notified().await;
Ok(())
Expand Down

0 comments on commit 82d6298

Please sign in to comment.