diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index f1d68e5c..aa10986d 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -369,6 +369,7 @@ message RoomEvent { ChatMessageReceived chat_message = 29; DataStreamHeaderReceived stream_header_received = 30; DataStreamChunkReceived stream_chunk_received = 31; + DataStreamTrailerReceived stream_trailer_received = 32; } } @@ -594,6 +595,11 @@ message DataStreamChunkReceived { required DataStream.Chunk chunk = 2; } +message DataStreamTrailerReceived { + required string participant_identity = 1; + required DataStream.Trailer trailer = 2; +} + message SendStreamHeaderRequest { required uint64 local_participant_handle = 1; required DataStream.Header header = 2; diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index f987e3b4..45e0cd05 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,4 +1,5 @@ // @generated +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -2634,7 +2635,7 @@ pub struct OwnedBuffer { pub struct RoomEvent { #[prost(uint64, required, tag="1")] pub room_handle: u64, - #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31")] + #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32")] pub message: ::core::option::Option, } /// Nested message and enum types in `RoomEvent`. @@ -2704,6 +2705,8 @@ pub mod room_event { StreamHeaderReceived(super::DataStreamHeaderReceived), #[prost(message, tag="31")] StreamChunkReceived(super::DataStreamChunkReceived), + #[prost(message, tag="32")] + StreamTrailerReceived(super::DataStreamTrailerReceived), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -3130,6 +3133,14 @@ pub struct DataStreamChunkReceived { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct DataStreamTrailerReceived { + #[prost(string, required, tag="1")] + pub participant_identity: ::prost::alloc::string::String, + #[prost(message, required, tag="2")] + pub trailer: data_stream::Trailer, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct SendStreamHeaderRequest { #[prost(uint64, required, tag="1")] pub local_participant_handle: u64, diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 61d32161..fcfeee26 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -1241,6 +1241,11 @@ async fn forward_event( proto::DataStreamChunkReceived { chunk: chunk.into(), participant_identity }, )); } + RoomEvent::StreamTrailerReceived { trailer, participant_identity } => { + let _ = send_event(proto::room_event::Message::StreamTrailerReceived( + proto::DataStreamTrailerReceived { trailer: trailer.into(), participant_identity }, + )); + } _ => { log::warn!("unhandled room event: {:?}", event); } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 051b56f0..9437e5ff 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -176,6 +176,10 @@ pub enum RoomEvent { chunk: proto::data_stream::Chunk, participant_identity: String, }, + StreamTrailerReceived { + trailer: proto::data_stream::Trailer, + participant_identity: String, + }, E2eeStateChanged { participant: Participant, state: EncryptionState, @@ -734,6 +738,9 @@ impl RoomSession { EngineEvent::DataStreamChunk { chunk, participant_identity } => { self.handle_data_stream_chunk(chunk, participant_identity); } + EngineEvent::DataStreamTrailer { trailer, participant_identity } => { + self.handle_data_stream_trailer(trailer, participant_identity); + } _ => {} } @@ -1262,6 +1269,15 @@ impl RoomSession { self.dispatcher.dispatch(&event); } + fn handle_data_stream_trailer( + &self, + trailer: proto::data_stream::Trailer, + participant_identity: String, + ) { + let event = RoomEvent::StreamTrailerReceived { trailer, participant_identity }; + self.dispatcher.dispatch(&event); + } + /// Create a new participant /// Also add it to the participants list fn create_participant( diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 2cd54521..0b205909 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -167,6 +167,10 @@ pub enum EngineEvent { chunk: proto::data_stream::Chunk, participant_identity: String, }, + DataStreamTrailer { + trailer: proto::data_stream::Trailer, + participant_identity: String, + }, } /// Represents a running RtcSession with the ability to close the session @@ -542,6 +546,11 @@ impl EngineInner { .engine_tx .send(EngineEvent::DataStreamChunk { chunk, participant_identity }); } + SessionEvent::DataStreamTrailer { trailer, participant_identity } => { + let _ = self + .engine_tx + .send(EngineEvent::DataStreamTrailer { trailer, participant_identity }); + } } Ok(()) } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 91dcbfdd..8de9d959 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -143,6 +143,10 @@ pub enum SessionEvent { chunk: proto::data_stream::Chunk, participant_identity: String, }, + DataStreamTrailer { + trailer: proto::data_stream::Trailer, + participant_identity: String, + }, } #[derive(Serialize, Deserialize)] @@ -743,6 +747,12 @@ impl SessionInner { participant_identity: data.participant_identity.clone(), }); } + proto::data_packet::Value::StreamTrailer(message) => { + let _ = self.emitter.send(SessionEvent::DataStreamTrailer { + trailer: message.clone(), + participant_identity: data.participant_identity.clone(), + }); + } _ => {} } }