Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStream sending support and recv fixes #533

Merged
merged 12 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ message FfiRequest {
// Track Publication
EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42;
UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43;

// Data Streams
SendStreamHeaderRequest send_stream_header = 44;
SendStreamChunkRequest send_stream_chunk = 45;

}
}

Expand Down Expand Up @@ -168,6 +173,10 @@ message FfiResponse {
// Track Publication
EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41;
UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42;

// Data Streams
SendStreamHeaderResponse send_stream_header = 43;
SendStreamChunkResponse send_stream_chunk = 44;
}
}

Expand Down Expand Up @@ -199,6 +208,8 @@ message FfiEvent {
SendChatMessageCallback chat_message = 22;
PerformRpcCallback perform_rpc = 23;
RpcMethodInvocationEvent rpc_method_invocation = 24;
SendStreamHeaderCallback send_stream_header = 25;
SendStreamChunkCallback send_stream_chunk = 26;
}
}

Expand Down
66 changes: 53 additions & 13 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ message RoomEvent {
DataPacketReceived data_packet_received = 27;
TranscriptionReceived transcription_received = 28;
ChatMessageReceived chat_message = 29;
DataStream.Header stream_header = 30;
DataStream.Chunk stream_chunk = 31;
DataStreamHeaderReceived stream_header_received = 30;
DataStreamChunkReceived stream_chunk_received = 31;
}
}

Expand Down Expand Up @@ -541,10 +541,10 @@ message DataStream {
// header properties specific to text streams
message TextHeader {
required OperationType operation_type = 1;
required int32 version = 2; // Optional: Version for updates/edits
required string reply_to_stream_id = 3; // Optional: Reply to specific message
optional int32 version = 2; // Optional: Version for updates/edits
optional string reply_to_stream_id = 3; // Optional: Reply to specific message
repeated string attached_stream_ids = 4; // file attachments for text streams
required bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription
optional bool generated = 5; // true if the text has been generated by an agent from a participant's audio transcription

}

Expand All @@ -557,26 +557,66 @@ message DataStream {
message Header {
required string stream_id = 1; // unique identifier for this data stream
required int64 timestamp = 2; // using int64 for Unix timestamp
required string topic = 3;
required string mime_type = 4;
required string mime_type = 3;
required string topic = 4;
optional uint64 total_length = 5; // only populated for finite streams, if it's a stream of unknown size this stays empty
optional uint64 total_chunks = 6; // only populated for finite streams, if it's a stream of unknown size this stays empty
map<string, string> extensions = 7; // user defined extensions map that can carry additional info
map<string, string> extensions = 6; // user defined extensions map that can carry additional info

// oneof to choose between specific header types
oneof content_header {
TextHeader text_header = 8;
FileHeader file_header = 9;
TextHeader text_header = 7;
FileHeader file_header = 8;
}
}

message Chunk {
required string stream_id = 1; // unique identifier for this data stream to map it to the correct header
required uint64 chunk_index = 2;
required bytes content = 3; // content as binary (bytes)
required bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
required int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bool complete = 4; // true only if this is the last chunk of this stream - can also be sent with empty content
optional int32 version = 5; // a version indicating that this chunk_index has been retroactively modified and the original one needs to be replaced
optional bytes iv = 6; // optional, initialization vector for AES-GCM encryption
}
}

message DataStreamHeaderReceived {
required string participant_identity = 1;
required DataStream.Header header = 2;
}

message DataStreamChunkReceived {
required string participant_identity = 1;
required DataStream.Chunk chunk = 2;
}

message SendStreamHeaderRequest {
required uint64 local_participant_handle = 1;
required DataStream.Header header = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

message SendStreamChunkRequest {
required uint64 local_participant_handle = 1;
required DataStream.Chunk chunk = 2;
repeated string destination_identities = 3;
optional string sender_identity = 4;
}

message SendStreamHeaderResponse {
required uint64 async_id = 1;
}

message SendStreamChunkResponse {
required uint64 async_id = 1;
}

message SendStreamHeaderCallback {
required uint64 async_id = 1;
optional string error = 2;
}

message SendStreamChunkCallback {
required uint64 async_id = 1;
optional string error = 2;
}
60 changes: 54 additions & 6 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header
Some(proto::data_stream::header::ContentHeader::TextHeader(
proto::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version,
reply_to_stream_id: text_header.reply_to_stream_id,
version: Some(text_header.version),
reply_to_stream_id: Some(text_header.reply_to_stream_id),
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated,
generated: Some(text_header.generated),
},
))
}
Expand All @@ -309,22 +309,70 @@ impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_chunks: msg.total_chunks,
total_length: msg.total_length,
extensions: msg.extensions,
content_header,
}
}
}

impl From<proto::data_stream::Header> for livekit_protocol::data_stream::Header {
fn from(msg: proto::data_stream::Header) -> Self {
let content_header = match msg.content_header {
Some(proto::data_stream::header::ContentHeader::TextHeader(text_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(
livekit_protocol::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version.unwrap_or_default(),
reply_to_stream_id: text_header.reply_to_stream_id.unwrap_or_default(),
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated.unwrap_or(false),
},
))
}
Some(proto::data_stream::header::ContentHeader::FileHeader(file_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(
livekit_protocol::data_stream::FileHeader { file_name: file_header.file_name },
))
}
None => None,
};

livekit_protocol::data_stream::Header {
stream_id: msg.stream_id,
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_length: msg.total_length,
total_chunks: None,
extensions: msg.extensions,
content_header,
encryption_type: 0,
}
}
}

impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
fn from(msg: livekit_protocol::data_stream::Chunk) -> Self {
proto::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete,
complete: Some(msg.complete),
chunk_index: msg.chunk_index,
version: Some(msg.version),
iv: msg.iv,
}
}
}

impl From<proto::data_stream::Chunk> for livekit_protocol::data_stream::Chunk {
fn from(msg: proto::data_stream::Chunk) -> Self {
livekit_protocol::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete.unwrap_or(false),
chunk_index: msg.chunk_index,
version: msg.version,
version: msg.version.unwrap_or(0),
iv: msg.iv,
}
}
Expand Down
Loading
Loading