Skip to content

Commit

Permalink
feat: suport topic on data packets (#256)
Browse files Browse the repository at this point in the history
* Add support for topic in data transfer protocol (#248)

* ffi & nit api

---------

Co-authored-by: Damian Trzeciak <[email protected]>
  • Loading branch information
theomonnom and trzeciak authored Dec 4, 2023
1 parent 3685486 commit fdf5ab2
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 42 deletions.
13 changes: 6 additions & 7 deletions examples/basic_room/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use livekit_api::access_token;
use livekit::prelude::*;
use livekit_api::access_token;
use std::env;

// Connect to a room using the specified env variables
Expand All @@ -24,18 +24,17 @@ async fn main() {
.to_jwt()
.unwrap();


let (room, mut rx) = Room::connect(&url, &token, RoomOptions::default())
.await
.unwrap();
log::info!("Connected to room: {} - {}", room.name(), room.sid());

room.local_participant()
.publish_data(
"Hello world".to_owned().into_bytes(),
DataPacketKind::Reliable,
Default::default(),
)
.publish_data(DataPacket {
payload: "Hello world".to_owned().into_bytes(),
kind: DataPacketKind::Reliable,
..Default::default()
})
.await
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ message PublishDataRequest {
uint64 data_len = 3;
DataPacketKind kind = 4;
repeated string destination_sids = 5; // destination
optional string topic = 6;
}
message PublishDataResponse {
uint64 async_id = 1;
Expand Down Expand Up @@ -343,6 +344,7 @@ message DataReceived {
OwnedBuffer data = 1;
optional string participant_sid = 2; // Can be empty if the data is sent a server SDK
DataPacketKind kind = 3;
optional string topic = 4;
}

message ConnectionStateChanged { ConnectionState state = 1; }
Expand Down
4 changes: 4 additions & 0 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,8 @@ pub struct PublishDataRequest {
/// destination
#[prost(string, repeated, tag="5")]
pub destination_sids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="6")]
pub topic: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -2541,6 +2543,8 @@ pub struct DataReceived {
pub participant_sid: ::core::option::Option<::prost::alloc::string::String>,
#[prost(enumeration="DataPacketKind", tag="3")]
pub kind: i32,
#[prost(string, optional, tag="4")]
pub topic: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
43 changes: 24 additions & 19 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct FfiRoom {
pub struct RoomInner {
pub room: Room,
handle_id: FfiHandleId,
data_tx: mpsc::UnboundedSender<DataPacket>,
data_tx: mpsc::UnboundedSender<FfiDataPacket>,

// local tracks just published, it is used to synchronize the publish events:
// - make sure LocalTrackPublised is sent *after* the PublishTrack callback)
Expand All @@ -74,10 +74,8 @@ struct Handle {
close_tx: broadcast::Sender<()>,
}

struct DataPacket {
data: Vec<u8>,
kind: DataPacketKind,
destination_sids: Vec<String>,
struct FfiDataPacket {
payload: DataPacket,
async_id: u64,
}

Expand All @@ -99,9 +97,12 @@ impl FfiRoom {
Ok((room, mut events)) => {
// Successfully connected to the room
// Forward the initial state for the FfiClient
let Some(RoomEvent::Connected { participants_with_tracks}) = events.recv().await else {
unreachable!("Connected event should always be the first event");
};
let Some(RoomEvent::Connected {
participants_with_tracks,
}) = events.recv().await
else {
unreachable!("Connected event should always be the first event");
};

let (data_tx, data_rx) = mpsc::unbounded_channel();
let (close_tx, close_rx) = broadcast::channel(1);
Expand Down Expand Up @@ -201,14 +202,20 @@ impl RoomInner {
slice::from_raw_parts(publish.data_ptr as *const u8, publish.data_len as usize)
};
let kind = publish.kind();
let destination_sids: Vec<String> = publish.destination_sids;
let destination_sids = publish.destination_sids;
let async_id = server.next_id();

self.data_tx
.send(DataPacket {
data: data.to_vec(), // Avoid copy?
kind: kind.into(),
destination_sids,
.send(FfiDataPacket {
payload: DataPacket {
payload: data.to_vec(), // Avoid copy?
kind: kind.into(),
topic: publish.topic,
destination_sids: destination_sids
.into_iter()
.map(|str| str.try_into().unwrap())
.collect(),
},
async_id,
})
.map_err(|_| FfiError::InvalidRequest("failed to send data packet".into()))?;
Expand Down Expand Up @@ -384,17 +391,13 @@ impl RoomInner {
async fn data_task(
server: &'static FfiServer,
inner: Arc<RoomInner>,
mut data_rx: mpsc::UnboundedReceiver<DataPacket>,
mut data_rx: mpsc::UnboundedReceiver<FfiDataPacket>,
mut close_rx: broadcast::Receiver<()>,
) {
loop {
tokio::select! {
Some(event) = data_rx.recv() => {
let res = inner.room.local_participant().publish_data(
event.data,
event.kind,
event.destination_sids,
).await;
let res = inner.room.local_participant().publish_data(event.payload).await;

let cb = proto::PublishDataCallback {
async_id: event.async_id,
Expand Down Expand Up @@ -727,6 +730,7 @@ async fn forward_event(
payload,
kind,
participant,
topic,
} => {
let handle_id = server.next_id();
let buffer_info = proto::BufferInfo {
Expand All @@ -749,6 +753,7 @@ async fn forward_event(
}),
participant_sid: participant.map(|p| p.sid().to_string()),
kind: proto::DataPacketKind::from(kind).into(),
topic,
},
))
.await;
Expand Down
3 changes: 2 additions & 1 deletion livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
pub use crate::participant::{ConnectionQuality, LocalParticipant, Participant, RemoteParticipant};

pub use crate::{
ConnectionState, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions, RoomResult,
ConnectionState, DataPacket, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions,
RoomResult,
};

pub use crate::publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication};
Expand Down
33 changes: 28 additions & 5 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub enum RoomEvent {
},
DataReceived {
payload: Arc<Vec<u8>>,
topic: Option<String>,
kind: DataPacketKind,
participant: Option<RemoteParticipant>,
},
Expand Down Expand Up @@ -165,6 +166,25 @@ pub enum DataPacketKind {
Reliable,
}

#[derive(Debug, Clone)]
pub struct DataPacket {
pub payload: Vec<u8>,
pub topic: Option<String>,
pub kind: DataPacketKind,
pub destination_sids: Vec<ParticipantSid>,
}

impl Default for DataPacket {
fn default() -> Self {
Self {
payload: Vec::new(),
topic: None,
kind: DataPacketKind::Reliable,
destination_sids: Vec::new(),
}
}
}

#[derive(Clone)]
pub struct RoomOptions {
pub auto_subscribe: bool,
Expand Down Expand Up @@ -427,6 +447,10 @@ impl Room {
}
}

pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
self.inner.rtc_engine.simulate_scenario(scenario).await
}

pub fn subscribe(&self) -> mpsc::UnboundedReceiver<RoomEvent> {
self.inner.dispatcher.register()
}
Expand Down Expand Up @@ -455,10 +479,6 @@ impl Room {
self.inner.participants.read().0.clone()
}

pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
self.inner.rtc_engine.simulate_scenario(scenario).await
}

pub fn e2ee_manager(&self) -> &E2eeManager {
&self.inner.e2ee_manager
}
Expand Down Expand Up @@ -557,10 +577,11 @@ impl RoomSession {
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data {
payload,
topic,
kind,
participant_sid,
} => {
self.handle_data(payload, kind, participant_sid);
self.handle_data(payload, topic, kind, participant_sid);
}
EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers),
EngineEvent::ConnectionQuality { updates } => {
Expand Down Expand Up @@ -970,6 +991,7 @@ impl RoomSession {
fn handle_data(
&self,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
participant_sid: Option<ParticipantSid>,
) {
Expand All @@ -985,6 +1007,7 @@ impl RoomSession {

self.dispatcher.dispatch(&RoomEvent::DataReceived {
payload: Arc::new(payload),
topic,
kind,
participant,
});
Expand Down
21 changes: 11 additions & 10 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::options::video_layers_from_encodings;
use crate::options::TrackPublishOptions;
use crate::prelude::*;
use crate::rtc_engine::RtcEngine;
use crate::DataPacket;
use crate::DataPacketKind;
use libwebrtc::rtp_parameters::RtpEncodingParameters;
use livekit_protocol as proto;
Expand Down Expand Up @@ -286,24 +287,24 @@ impl LocalParticipant {
}
}

pub async fn publish_data(
&self,
data: Vec<u8>,
kind: DataPacketKind,
destination_sids: Vec<String>,
) -> RoomResult<()> {
pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> {
let data = proto::DataPacket {
kind: kind as i32,
kind: DataPacketKind::from(packet.kind) as i32,
value: Some(proto::data_packet::Value::User(proto::UserPacket {
payload: data,
destination_sids: destination_sids.to_owned(),
payload: packet.payload,
topic: packet.topic,
destination_sids: packet
.destination_sids
.into_iter()
.map(Into::into)
.collect(),
..Default::default()
})),
};

self.inner
.rtc_engine
.publish_data(&data, kind)
.publish_data(&data, packet.kind)
.await
.map_err(Into::into)
}
Expand Down
3 changes: 3 additions & 0 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub enum EngineEvent {
Data {
participant_sid: Option<ParticipantSid>,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
},
SpeakersChanged {
Expand Down Expand Up @@ -383,11 +384,13 @@ impl EngineInner {
SessionEvent::Data {
participant_sid,
payload,
topic,
kind,
} => {
let _ = self.engine_tx.send(EngineEvent::Data {
participant_sid,
payload,
topic,
kind,
});
}
Expand Down
2 changes: 2 additions & 0 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum SessionEvent {
// None when the data comes from the ServerSDK (So no real participant)
participant_sid: Option<ParticipantSid>,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
},
MediaTrack {
Expand Down Expand Up @@ -573,6 +574,7 @@ impl SessionInner {
kind: data.kind().into(),
participant_sid: participant_sid.map(|s| s.try_into().unwrap()),
payload: user.payload.clone(),
topic: user.topic.clone(),
});
}
proto::data_packet::Value::Speaker(_) => {}
Expand Down

0 comments on commit fdf5ab2

Please sign in to comment.