Skip to content

Commit

Permalink
Make publish_data wait until the DataChannel's bufferedAmount becom…
Browse files Browse the repository at this point in the history
…es low. (#545)

* expose buffered_amount method to Rust

* test to implement wait_for_dc_buffer_low

* remove wait_for_low function, add functionality to wait it in publish_data

* test FFI implementation

* add callback

* revert unused changes

* not necessary to make this async

* update lock

* add nanpa changeset

* create dc_task for more reliable data publishing

* change get/set dc buffered_amount_low_threshold FFI functions to support Lossy kind as well

* fmt

* add logs if buffer amount become unexpected value

* set default threshold to 2MB

* fmt

* ignore error here

* add buffered_amount_low_threshold in RoomInfo

* remove Get ffi function for dc buffered_low_threshold, instead, add it to RoomInfo

* update changeset

* flatten DataChannelOptions in protobuf

* fmt
  • Loading branch information
typester authored Jan 17, 2025
1 parent 50cb059 commit f162413
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 54 deletions.
6 changes: 6 additions & 0 deletions .nanpa/dc-buffered-amount-low-threshold.kdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
patch type="added" package="libwebrtc" "Expose DataChannel.bufferedAmount property"
patch type="fixed" package="livekit" "Wait for the buffered amount to become low before sending data during publish_data for Reliable Data Channel"
patch type="added" package="livekit" "Add an API to set buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit" "Update RoomInfo to contain buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit-ffi" "Add an API to set buffer_amount_low_threshold for DataChannel"
patch type="added" package="livekit-ffi" "Update RoomInfo to contain buffer_amount_low_threshold for DataChannel"
4 changes: 4 additions & 0 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl DataChannel {
self.handle.close()
}

pub fn buffered_amount(&self) -> u64 {
self.handle.buffered_amount()
}

pub fn on_state_change(&self, callback: Option<OnStateChange>) {
self.handle.on_state_change(callback)
}
Expand Down
4 changes: 4 additions & 0 deletions libwebrtc/src/native/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl DataChannel {
self.sys_handle.close();
}

pub fn buffered_amount(&self) -> u64 {
self.sys_handle.buffered_amount()
}

pub fn on_state_change(&self, handler: Option<OnStateChange>) {
*self.observer.state_change_handler.lock() = handler;
}
Expand Down
6 changes: 6 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ message FfiRequest {
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;
SendStreamTrailerRequest send_stream_trailer = 46;

// Data Channel
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 47;
}
}

Expand Down Expand Up @@ -178,6 +181,9 @@ message FfiResponse {
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
SendStreamTrailerResponse send_stream_trailer = 45;

// Data Channel
SetDataChannelBufferedAmountLowThresholdResponse set_data_channel_buffered_amount_low_threshold = 46;
}
}

Expand Down
17 changes: 17 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,16 @@ message RoomEvent {
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
DataStreamTrailerReceived stream_trailer_received = 32;
DataChannelBufferedAmountLowThresholdChanged data_channel_low_threshold_changed = 33;
}
}

message RoomInfo {
optional string sid = 1;
required string name = 2;
required string metadata = 3;
required uint64 lossy_dc_buffered_amount_low_threshold = 4;
required uint64 reliable_dc_buffered_amount_low_threshold = 5;
}

message OwnedRoom {
Expand Down Expand Up @@ -647,3 +650,17 @@ message SendStreamTrailerCallback {
required uint64 async_id = 1;
optional string error = 2;
}

message SetDataChannelBufferedAmountLowThresholdRequest {
required uint64 local_participant_handle = 1;
required uint64 threshold = 2;
required DataPacketKind kind = 3;
}

message SetDataChannelBufferedAmountLowThresholdResponse {
}

message DataChannelBufferedAmountLowThresholdChanged {
required DataPacketKind kind = 1;
required uint64 threshold = 2;
}
6 changes: 6 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ impl From<&FfiRoom> for proto::RoomInfo {
sid: room.maybe_sid().map(|x| x.to_string()),
name: room.name(),
metadata: room.metadata(),
lossy_dc_buffered_amount_low_threshold: room
.data_channel_options(DataPacketKind::Lossy)
.buffered_amount_low_threshold,
reliable_dc_buffered_amount_low_threshold: room
.data_channel_options(DataPacketKind::Reliable)
.buffered_amount_low_threshold,
}
}
}
Expand Down
41 changes: 37 additions & 4 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FrameCryptor {
Expand Down Expand Up @@ -2635,7 +2634,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, required, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2707,6 +2706,8 @@ pub mod room_event {
StreamChunkReceived(super::DataStreamChunkReceived),
#[prost(message, tag="32")]
StreamTrailerReceived(super::DataStreamTrailerReceived),
#[prost(message, tag="33")]
DataChannelLowThresholdChanged(super::DataChannelBufferedAmountLowThresholdChanged),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -2718,6 +2719,10 @@ pub struct RoomInfo {
pub name: ::prost::alloc::string::String,
#[prost(string, required, tag="3")]
pub metadata: ::prost::alloc::string::String,
#[prost(uint64, required, tag="4")]
pub lossy_dc_buffered_amount_low_threshold: u64,
#[prost(uint64, required, tag="5")]
pub reliable_dc_buffered_amount_low_threshold: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -3217,6 +3222,28 @@ pub struct SendStreamTrailerCallback {
#[prost(string, optional, tag="2")]
pub error: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdRequest {
#[prost(uint64, required, tag="1")]
pub local_participant_handle: u64,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
#[prost(enumeration="DataPacketKind", required, tag="3")]
pub kind: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetDataChannelBufferedAmountLowThresholdResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataChannelBufferedAmountLowThresholdChanged {
#[prost(enumeration="DataPacketKind", required, tag="1")]
pub kind: i32,
#[prost(uint64, required, tag="2")]
pub threshold: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum IceTransportType {
Expand Down Expand Up @@ -3989,7 +4016,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -4094,13 +4121,16 @@ pub mod ffi_request {
SendStreamChunk(super::SendStreamChunkRequest),
#[prost(message, tag="46")]
SendStreamTrailer(super::SendStreamTrailerRequest),
/// Data Channel
#[prost(message, tag="47")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -4203,6 +4233,9 @@ pub mod ffi_response {
SendStreamChunk(super::SendStreamChunkResponse),
#[prost(message, tag="45")]
SendStreamTrailer(super::SendStreamTrailerResponse),
/// Data Channel
#[prost(message, tag="46")]
SetDataChannelBufferedAmountLowThreshold(super::SetDataChannelBufferedAmountLowThresholdResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down
19 changes: 19 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,20 @@ fn on_rpc_method_invocation_response(
Ok(proto::RpcMethodInvocationResponseResponse { error })
}

fn on_set_data_channel_buffered_amount_low_threshold(
server: &'static FfiServer,
set_data_channel_buffered_amount_low_threshold: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> FfiResult<proto::SetDataChannelBufferedAmountLowThresholdResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(
set_data_channel_buffered_amount_low_threshold.local_participant_handle,
)?
.clone();
Ok(ffi_participant.room.set_data_channel_buffered_amount_low_threshold(
set_data_channel_buffered_amount_low_threshold,
))
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -1078,6 +1092,11 @@ pub fn handle_request(
server, request,
)?)
}
proto::ffi_request::Message::SetDataChannelBufferedAmountLowThreshold(request) => {
proto::ffi_response::Message::SetDataChannelBufferedAmountLowThreshold(
on_set_data_channel_buffered_amount_low_threshold(server, request)?,
)
}
});

Ok(res)
Expand Down
19 changes: 19 additions & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,17 @@ impl RoomInner {
) -> Option<oneshot::Sender<Result<String, RpcError>>> {
return self.rpc_method_invocation_waiters.lock().remove(&invocation_id);
}

pub fn set_data_channel_buffered_amount_low_threshold(
&self,
request: proto::SetDataChannelBufferedAmountLowThresholdRequest,
) -> proto::SetDataChannelBufferedAmountLowThresholdResponse {
let _ = self.room.local_participant().set_data_channel_buffered_amount_low_threshold(
request.threshold,
request.kind().into(),
);
proto::SetDataChannelBufferedAmountLowThresholdResponse {}
}
}

// Task used to publish data without blocking the client thread
Expand Down Expand Up @@ -1246,6 +1257,14 @@ async fn forward_event(
proto::DataStreamTrailerReceived { trailer: trailer.into(), participant_identity },
));
}
RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
let _ = send_event(proto::room_event::Message::DataChannelLowThresholdChanged(
proto::DataChannelBufferedAmountLowThresholdChanged {
kind: proto::DataPacketKind::from(kind).into(),
threshold,
},
));
}
_ => {
log::warn!("unhandled room event: {:?}", event);
}
Expand Down
49 changes: 48 additions & 1 deletion livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
prelude::*,
rtc_engine::{
EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine,
SessionStats,
SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
},
};

Expand Down Expand Up @@ -196,6 +196,10 @@ pub enum RoomEvent {
},
Reconnecting,
Reconnected,
DataChannelBufferedAmountLowThresholdChanged {
kind: DataPacketKind,
threshold: u64,
},
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
Expand Down Expand Up @@ -360,6 +364,19 @@ impl Debug for Room {
struct RoomInfo {
metadata: String,
state: ConnectionState,
lossy_dc_options: DataChannelOptions,
reliable_dc_options: DataChannelOptions,
}

#[derive(Clone)]
pub struct DataChannelOptions {
pub buffered_amount_low_threshold: u64,
}

impl Default for DataChannelOptions {
fn default() -> Self {
Self { buffered_amount_low_threshold: INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD }
}
}

pub(crate) struct RoomSession {
Expand Down Expand Up @@ -506,6 +523,8 @@ impl Room {
info: RwLock::new(RoomInfo {
state: ConnectionState::Disconnected,
metadata: room_info.metadata,
lossy_dc_options: Default::default(),
reliable_dc_options: Default::default(),
}),
remote_participants: Default::default(),
active_speakers: Default::default(),
Expand Down Expand Up @@ -623,6 +642,13 @@ impl Room {
pub fn e2ee_manager(&self) -> &E2eeManager {
&self.inner.e2ee_manager
}

pub fn data_channel_options(&self, kind: DataPacketKind) -> DataChannelOptions {
match kind {
DataPacketKind::Lossy => self.inner.info.read().lossy_dc_options.clone(),
DataPacketKind::Reliable => self.inner.info.read().reliable_dc_options.clone(),
}
}
}

impl RoomSession {
Expand Down Expand Up @@ -741,6 +767,9 @@ impl RoomSession {
EngineEvent::DataStreamTrailer { trailer, participant_identity } => {
self.handle_data_stream_trailer(trailer, participant_identity);
}
EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
self.handle_data_channel_buffered_low_threshold_change(kind, threshold);
}
_ => {}
}

Expand Down Expand Up @@ -1278,6 +1307,24 @@ impl RoomSession {
self.dispatcher.dispatch(&event);
}

fn handle_data_channel_buffered_low_threshold_change(
&self,
kind: DataPacketKind,
threshold: u64,
) {
let mut info = self.info.write();
match kind {
DataPacketKind::Lossy => {
info.lossy_dc_options.buffered_amount_low_threshold = threshold;
}
DataPacketKind::Reliable => {
info.reliable_dc_options.buffered_amount_low_threshold = threshold;
}
}
let event = RoomEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold };
self.dispatcher.dispatch(&event);
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
Loading

0 comments on commit f162413

Please sign in to comment.