From 7044ba5a02561cf650143e76dc2d6780dacb46e6 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 10 Oct 2024 12:58:10 +0200 Subject: [PATCH] Add chat API (#436) * Add chat APIs - wip * generated protobuf * updates * fix types and build * Add conversion traits and update API * fix unwrapping * Populate timestamps and add edit support * fix default values * generated protobuf * fix match * remove explicit resolver * move conversion * error handling * no imports --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- Cargo.lock | 57 ++++++++++++- livekit-ffi/protocol/ffi.proto | 6 ++ livekit-ffi/protocol/room.proto | 37 ++++++++ livekit-ffi/src/conversion/room.rs | 29 ++++++- livekit-ffi/src/livekit.proto.rs | 84 ++++++++++++++++++- livekit-ffi/src/server/requests.rs | 28 +++++++ livekit-ffi/src/server/room.rs | 74 +++++++++++++++- livekit-protocol/Cargo.toml | 6 +- livekit/Cargo.toml | 1 + livekit/src/proto.rs | 30 ++++++- livekit/src/room/mod.rs | 32 +++++++ .../src/room/participant/local_participant.rs | 58 ++++++++++++- livekit/src/rtc_engine/mod.rs | 10 ++- livekit/src/rtc_engine/rtc_session.rs | 16 +++- 14 files changed, 452 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34fcf828..dc6e29d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -447,11 +462,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", + "windows-targets 0.52.0", ] [[package]] @@ -1266,6 +1286,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.5.0" @@ -1554,6 +1597,7 @@ checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" name = "livekit" version = "0.6.0" dependencies = [ + "chrono", "futures-util", "lazy_static", "libwebrtc", @@ -3315,6 +3359,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 5fead41c..2c9f2181 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -96,6 +96,9 @@ message FfiRequest { NewSoxResamplerRequest new_sox_resampler = 33; PushSoxResamplerRequest push_sox_resampler = 34; FlushSoxResamplerRequest flush_sox_resampler = 35; + + SendChatMessageRequest send_chat_message = 36; + EditChatMessageRequest edit_chat_message = 37; } } @@ -144,6 +147,8 @@ message FfiResponse { NewSoxResamplerResponse new_sox_resampler = 33; PushSoxResamplerResponse push_sox_resampler = 34; FlushSoxResamplerResponse flush_sox_resampler = 35; + + SendChatMessageResponse send_chat_message = 36; } } @@ -172,6 +177,7 @@ message FfiEvent { GetSessionStatsCallback get_session_stats = 19; Panic panic = 20; PublishSipDtmfCallback publish_sip_dtmf = 21; + SendChatMessageCallback chat_message = 22; } } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 1cdbb977..6c46e6d9 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -144,6 +144,28 @@ message SetLocalMetadataCallback { optional string error = 2; } +message SendChatMessageRequest { + uint64 local_participant_handle = 1; + string message = 2; + repeated string destination_identities = 3; + optional string sender_identity = 4; +} +message EditChatMessageRequest { + uint64 local_participant_handle = 1; + string edit_text = 2; + ChatMessage original_message = 3; + repeated string destination_identities = 4; + optional string sender_identity = 5; +} +message SendChatMessageResponse { + uint64 async_id = 1; +} +message SendChatMessageCallback { + uint64 async_id = 1; + optional string error = 2; + optional ChatMessage chat_message = 3; +} + // Change the local participant's attributes message SetLocalAttributesRequest { uint64 local_participant_handle = 1; @@ -343,6 +365,7 @@ message RoomEvent { RoomEOS eos = 26; // The stream of room events has ended DataPacketReceived data_packet_received = 27; TranscriptionReceived transcription_received = 28; + ChatMessageReceived chat_message = 29; } } @@ -457,6 +480,20 @@ message UserPacket { optional string topic = 2; } +message ChatMessage { + string id = 1; + int64 timestamp = 2; + string message = 3; + optional int64 edit_timestamp = 4; + optional bool deleted = 5; + optional bool generated = 6; +} + +message ChatMessageReceived { + ChatMessage message = 1; + string participant_identity = 2; +} + message SipDTMF { uint32 code = 1; optional string digit = 2; diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index a42d58fc..199fd734 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::{proto, server::room::FfiRoom}; use livekit::{ e2ee::{ key_provider::{KeyProvider, KeyProviderOptions}, @@ -25,8 +26,6 @@ use livekit::{ }, }; -use crate::{proto, server::room::FfiRoom}; - impl From for proto::EncryptionState { fn from(value: EncryptionState) -> Self { match value { @@ -241,3 +240,29 @@ impl From<&FfiRoom> for proto::RoomInfo { } } } + +impl From for livekit::ChatMessage { + fn from(proto_msg: proto::ChatMessage) -> Self { + livekit::ChatMessage { + id: proto_msg.id, + message: proto_msg.message, + timestamp: proto_msg.timestamp, + edit_timestamp: proto_msg.edit_timestamp, + deleted: proto_msg.deleted, + generated: proto_msg.generated, + } + } +} + +impl From for proto::ChatMessage { + fn from(msg: livekit::ChatMessage) -> Self { + proto::ChatMessage { + id: msg.id, + message: msg.message, + timestamp: msg.timestamp, + edit_timestamp: msg.edit_timestamp, + deleted: msg.deleted.into(), + generated: msg.generated.into(), + } + } +} diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 6f003c09..b4cda5cd 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2210,6 +2210,48 @@ pub struct SetLocalMetadataCallback { #[prost(string, optional, tag="2")] pub error: ::core::option::Option<::prost::alloc::string::String>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendChatMessageRequest { + #[prost(uint64, tag="1")] + pub local_participant_handle: u64, + #[prost(string, tag="2")] + pub message: ::prost::alloc::string::String, + #[prost(string, repeated, tag="3")] + pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag="4")] + pub sender_identity: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EditChatMessageRequest { + #[prost(uint64, tag="1")] + pub local_participant_handle: u64, + #[prost(string, tag="2")] + pub edit_text: ::prost::alloc::string::String, + #[prost(message, optional, tag="3")] + pub original_message: ::core::option::Option, + #[prost(string, repeated, tag="4")] + pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag="5")] + pub sender_identity: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendChatMessageResponse { + #[prost(uint64, tag="1")] + pub async_id: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SendChatMessageCallback { + #[prost(uint64, tag="1")] + pub async_id: u64, + #[prost(string, optional, tag="2")] + pub error: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag="3")] + pub chat_message: ::core::option::Option, +} /// Change the local participant's attributes #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2407,7 +2449,7 @@ pub struct OwnedBuffer { pub struct RoomEvent { #[prost(uint64, tag="1")] pub room_handle: u64, - #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28")] + #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29")] pub message: ::core::option::Option, } /// Nested message and enum types in `RoomEvent`. @@ -2471,6 +2513,8 @@ pub mod room_event { DataPacketReceived(super::DataPacketReceived), #[prost(message, tag="28")] TranscriptionReceived(super::TranscriptionReceived), + #[prost(message, tag="29")] + ChatMessage(super::ChatMessageReceived), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -2655,6 +2699,30 @@ pub struct UserPacket { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChatMessage { + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + #[prost(int64, tag="2")] + pub timestamp: i64, + #[prost(string, tag="3")] + pub message: ::prost::alloc::string::String, + #[prost(int64, optional, tag="4")] + pub edit_timestamp: ::core::option::Option, + #[prost(bool, optional, tag="5")] + pub deleted: ::core::option::Option, + #[prost(bool, optional, tag="6")] + pub generated: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChatMessageReceived { + #[prost(message, optional, tag="1")] + pub message: ::core::option::Option, + #[prost(string, tag="2")] + pub participant_identity: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct SipDtmf { #[prost(uint32, tag="1")] pub code: u32, @@ -3443,7 +3511,7 @@ impl AudioSourceType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3523,13 +3591,17 @@ pub mod ffi_request { PushSoxResampler(super::PushSoxResamplerRequest), #[prost(message, tag="35")] FlushSoxResampler(super::FlushSoxResamplerRequest), + #[prost(message, tag="36")] + SendChatMessage(super::SendChatMessageRequest), + #[prost(message, tag="37")] + EditChatMessage(super::EditChatMessageRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3609,6 +3681,8 @@ pub mod ffi_response { PushSoxResampler(super::PushSoxResamplerResponse), #[prost(message, tag="35")] FlushSoxResampler(super::FlushSoxResamplerResponse), + #[prost(message, tag="36")] + SendChatMessage(super::SendChatMessageResponse), } } /// To minimize complexity, participant events are not included in the protocol. @@ -3617,7 +3691,7 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiEvent { - #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21")] + #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiEvent`. @@ -3665,6 +3739,8 @@ pub mod ffi_event { Panic(super::Panic), #[prost(message, tag="21")] PublishSipDtmf(super::PublishSipDtmfCallback), + #[prost(message, tag="22")] + ChatMessage(super::SendChatMessageCallback), } } /// Stop all rooms synchronously (Do we need async here?). diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index a1043c8c..febb0cec 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -182,6 +182,28 @@ fn on_set_local_attributes( Ok(ffi_participant.room.set_local_attributes(server, set_local_attributes)) } +fn on_send_chat_message( + server: &'static FfiServer, + send_chat_message: proto::SendChatMessageRequest, +) -> FfiResult { + let ffi_participant = server + .retrieve_handle::(send_chat_message.local_participant_handle)? + .clone(); + + Ok(ffi_participant.room.send_chat_message(server, send_chat_message)) +} + +fn on_edit_chat_message( + server: &'static FfiServer, + edit_chat_message: proto::EditChatMessageRequest, +) -> FfiResult { + let ffi_participant = server + .retrieve_handle::(edit_chat_message.local_participant_handle)? + .clone(); + + Ok(ffi_participant.room.edit_chat_message(server, edit_chat_message)) +} + /// Create a new video track from a source fn on_create_video_track( server: &'static FfiServer, @@ -813,6 +835,12 @@ pub fn handle_request( server, update, )?) } + proto::ffi_request::Message::SendChatMessage(update) => { + proto::ffi_response::Message::SendChatMessage(on_send_chat_message(server, update)?) + } + proto::ffi_request::Message::EditChatMessage(update) => { + proto::ffi_response::Message::SendChatMessage(on_edit_chat_message(server, update)?) + } proto::ffi_request::Message::CreateVideoTrack(create) => { proto::ffi_response::Message::CreateVideoTrack(on_create_video_track(server, create)?) } diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index cf6fe02f..92a6fcb1 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::{collections::HashSet, slice, sync::Arc, time::Duration}; use livekit::prelude::*; -use livekit::{participant, track}; +use livekit::{participant, track, ChatMessage}; use parking_lot::Mutex; use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex}; use tokio::task::JoinHandle; @@ -569,6 +569,67 @@ impl RoomInner { server.watch_panic(handle); proto::SetLocalAttributesResponse { async_id } } + + pub fn send_chat_message( + self: &Arc, + server: &'static FfiServer, + send_chat_message: proto::SendChatMessageRequest, + ) -> proto::SendChatMessageResponse { + let async_id = server.next_id(); + let inner = self.clone(); + let handle = server.async_runtime.spawn(async move { + let res = inner + .room + .local_participant() + .send_chat_message( + send_chat_message.message, + send_chat_message.destination_identities.into(), + send_chat_message.sender_identity, + ) + .await; + let sent_message = res.as_ref().unwrap().clone(); + let _ = server.send_event(proto::ffi_event::Message::ChatMessage( + proto::SendChatMessageCallback { + async_id, + error: res.err().map(|e| e.to_string()), + chat_message: proto::ChatMessage::from(sent_message).into(), + }, + )); + }); + server.watch_panic(handle); + proto::SendChatMessageResponse { async_id } + } + + pub fn edit_chat_message( + self: &Arc, + server: &'static FfiServer, + edit_chat_message: proto::EditChatMessageRequest, + ) -> proto::SendChatMessageResponse { + let async_id = server.next_id(); + let inner = self.clone(); + let handle = server.async_runtime.spawn(async move { + let res = inner + .room + .local_participant() + .edit_chat_message( + edit_chat_message.edit_text, + edit_chat_message.original_message.unwrap().into(), + edit_chat_message.destination_identities.into(), + edit_chat_message.sender_identity, + ) + .await; + let sent_message: ChatMessage = res.as_ref().unwrap().clone(); + let _ = server.send_event(proto::ffi_event::Message::ChatMessage( + proto::SendChatMessageCallback { + async_id, + error: res.err().map(|e| e.to_string()), + chat_message: proto::ChatMessage::from(sent_message).into(), + }, + )); + }); + server.watch_panic(handle); + proto::SendChatMessageResponse { async_id } + } } // Task used to publish data without blocking the client thread @@ -977,6 +1038,17 @@ async fn forward_event( }, )); } + RoomEvent::ChatMessage { message, participant } => { + let (sid, identity) = match participant { + Some(p) => (Some(p.sid().to_string()), p.identity().to_string()), + None => (None, String::new()), + }; + let _ = + send_event(proto::room_event::Message::ChatMessage(proto::ChatMessageReceived { + message: proto::ChatMessage::from(message).into(), + participant_identity: identity, + })); + } RoomEvent::ConnectionStateChanged(state) => { let _ = send_event(proto::room_event::Message::ConnectionStateChanged( proto::ConnectionStateChanged { state: proto::ConnectionState::from(state).into() }, diff --git a/livekit-protocol/Cargo.toml b/livekit-protocol/Cargo.toml index 7831959a..7d44495a 100644 --- a/livekit-protocol/Cargo.toml +++ b/livekit-protocol/Cargo.toml @@ -8,7 +8,11 @@ repository = "https://github.com/livekit/rust-sdks" [dependencies] livekit-runtime = { path = "../livekit-runtime", version = "0.3.0" } -tokio = { version = "1", default-features = false, features = [ "sync", "macros", "rt" ] } +tokio = { version = "1", default-features = false, features = [ + "sync", + "macros", + "rt", +] } futures-util = { version = "0.3", features = ["sink"] } parking_lot = "0.12" prost = "0.12" diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index be31f592..269ffdf1 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -40,3 +40,4 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"] thiserror = "1.0" lazy_static = "1.4" log = "0.4" +chrono = "0.4.38" diff --git a/livekit/src/proto.rs b/livekit/src/proto.rs index ed6a4aaf..b96ae2c5 100644 --- a/livekit/src/proto.rs +++ b/livekit/src/proto.rs @@ -14,7 +14,9 @@ use livekit_protocol::*; -use crate::{e2ee::EncryptionType, participant, track, DataPacketKind}; +use crate::{ + e2ee::EncryptionType, participant, room::ChatMessage as RoomChatMessage, track, DataPacketKind, +}; // Conversions impl From for participant::ConnectionQuality { @@ -122,3 +124,29 @@ impl From for participant::ParticipantKind { } } } + +impl From for RoomChatMessage { + fn from(proto_msg: ChatMessage) -> Self { + RoomChatMessage { + id: proto_msg.id, + message: proto_msg.message, + timestamp: proto_msg.timestamp, + edit_timestamp: proto_msg.edit_timestamp, + deleted: proto_msg.deleted.into(), + generated: proto_msg.generated.into(), + } + } +} + +impl From for ChatMessage { + fn from(msg: RoomChatMessage) -> Self { + ChatMessage { + id: msg.id, + message: msg.message, + timestamp: msg.timestamp, + edit_timestamp: msg.edit_timestamp, + deleted: msg.deleted.unwrap_or(false), + generated: msg.generated.unwrap_or(false), + } + } +} diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 9b3e1bdc..346d4145 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -162,6 +162,10 @@ pub enum RoomEvent { digit: Option, participant: Option, }, + ChatMessage { + message: ChatMessage, + participant: Option, + }, E2eeStateChanged { participant: Participant, state: EncryptionState, @@ -236,6 +240,16 @@ pub struct SipDTMF { pub destination_identities: Vec, } +#[derive(Default, Debug, Clone)] +pub struct ChatMessage { + pub id: String, + pub message: String, + pub timestamp: i64, + pub edit_timestamp: Option, + pub deleted: Option, + pub generated: Option, +} + #[derive(Debug, Clone)] pub struct RoomOptions { pub auto_subscribe: bool, @@ -606,6 +620,9 @@ impl RoomSession { EngineEvent::Data { payload, topic, kind, participant_sid, participant_identity } => { self.handle_data(payload, topic, kind, participant_sid, participant_identity); } + EngineEvent::ChatMessage { participant_identity, message } => { + self.handle_chat_message(participant_identity, message); + } EngineEvent::Transcription { participant_identity, track_sid, segments } => { self.handle_transcription(participant_identity, track_sid, segments); } @@ -1068,6 +1085,21 @@ impl RoomSession { }); } + fn handle_chat_message( + &self, + participant_identity: ParticipantIdentity, + chat_message: ChatMessage, + ) { + let participant = self.get_participant_by_identity(&participant_identity); + + if participant.is_none() { + // We received a data packet from a participant that is not in the participants list + return; + } + + self.dispatcher.dispatch(&RoomEvent::ChatMessage { message: chat_message, participant }); + } + fn handle_dtmf( &self, code: u32, diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index b9dde45d..6b334abb 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -19,7 +19,9 @@ use std::{ time::Duration, }; -use libwebrtc::rtp_parameters::RtpEncodingParameters; +use chrono::{TimeZone, Utc}; + +use libwebrtc::{native::create_random_uuid, rtp_parameters::RtpEncodingParameters}; use livekit_api::signal_client::SignalError; use livekit_protocol as proto; use livekit_runtime::timeout; @@ -32,7 +34,7 @@ use crate::{ options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions}, prelude::*, rtc_engine::{EngineError, RtcEngine}, - DataPacket, SipDTMF, Transcription, + ChatMessage, DataPacket, SipDTMF, Transcription, }; const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); @@ -331,6 +333,58 @@ impl LocalParticipant { } } + pub async fn send_chat_message( + &self, + text: String, + destination_identities: Option>, + sender_identity: Option, + ) -> RoomResult { + let chat_message = proto::ChatMessage { + id: create_random_uuid(), + timestamp: Utc::now().timestamp_millis(), + message: text, + ..Default::default() + }; + + let data = proto::DataPacket { + value: Some(proto::data_packet::Value::ChatMessage(chat_message.clone())), + participant_identity: sender_identity.unwrap_or_default(), + destination_identities: destination_identities.unwrap_or_default(), + ..Default::default() + }; + + match self.inner.rtc_engine.publish_data(&data, DataPacketKind::Reliable).await { + Ok(_) => Ok(ChatMessage::from(chat_message)), + Err(e) => Err(Into::into(e)), + } + } + + pub async fn edit_chat_message( + &self, + edit_text: String, + original_message: ChatMessage, + destination_identities: Option>, + sender_identity: Option, + ) -> RoomResult { + let edited_message = ChatMessage { + message: edit_text, + edit_timestamp: Utc::now().timestamp_millis().into(), + ..original_message + }; + let proto_msg = proto::ChatMessage::from(edited_message); + let data = proto::DataPacket { + value: Some(proto::data_packet::Value::ChatMessage(proto_msg.clone())), + participant_identity: sender_identity.unwrap_or_default(), + destination_identities: destination_identities.unwrap_or_default(), + ..Default::default() + }; + + match self.inner.rtc_engine.publish_data(&data, DataPacketKind::Reliable).await { + Ok(_) => Ok(ChatMessage::from(proto_msg)), + Err(e) => Err(Into::into(e)), + } + } + pub async fn unpublish_track( &self, sid: &TrackSid, diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index d0a23551..efaf2874 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -27,7 +27,6 @@ use tokio::sync::{ pub use self::rtc_session::SessionStats; use crate::prelude::ParticipantIdentity; -use crate::TranscriptionSegment; use crate::{ id::ParticipantSid, options::TrackPublishOptions, @@ -39,6 +38,7 @@ use crate::{ }, DataPacketKind, }; +use crate::{ChatMessage, TranscriptionSegment}; pub mod lk_runtime; mod peer_transport; @@ -99,6 +99,10 @@ pub enum EngineEvent { topic: Option, kind: DataPacketKind, }, + ChatMessage { + participant_identity: ParticipantIdentity, + message: ChatMessage, + }, Transcription { participant_identity: ParticipantIdentity, track_sid: String, @@ -443,6 +447,10 @@ impl EngineInner { kind, }); } + SessionEvent::ChatMessage { participant_identity, message } => { + let _ = + self.engine_tx.send(EngineEvent::ChatMessage { participant_identity, message }); + } SessionEvent::SipDTMF { participant_identity, code, digit } => { let _ = self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit }); diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 100ef839..58168359 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -34,11 +34,11 @@ use proto::{ debouncer::{self, Debouncer}, SignalTarget, }; -use serde::{Deserialize, Serialize}; +use serde::{de::IntoDeserializer, Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot, watch}; use super::{rtc_events, EngineError, EngineOptions, EngineResult, SimulateScenario}; -use crate::{id::ParticipantIdentity, TranscriptionSegment}; +use crate::{id::ParticipantIdentity, ChatMessage, TranscriptionSegment}; use crate::{ id::ParticipantSid, options::TrackPublishOptions, @@ -81,6 +81,10 @@ pub enum SessionEvent { topic: Option, kind: DataPacketKind, }, + ChatMessage { + participant_identity: ParticipantIdentity, + message: ChatMessage, + }, Transcription { participant_identity: ParticipantIdentity, track_sid: String, @@ -657,6 +661,14 @@ impl SessionInner { segments, }); } + proto::data_packet::Value::ChatMessage(message) => { + let _ = self.emitter.send(SessionEvent::ChatMessage { + participant_identity: ParticipantIdentity( + data.participant_identity, + ), + message: ChatMessage::from(message.clone()), + }); + } _ => {} } }