From 1282260f60ee4c3dfe0bff9f4bd153a088edfd65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?The=CC=81o=20Monnom?= Date: Wed, 29 Nov 2023 15:47:43 +0100 Subject: [PATCH] allow other ffi integrations --- .../src/signal_client/signal_stream.rs | 1 + livekit-ffi/Cargo.toml | 2 +- livekit-ffi/protocol/ffi.proto | 10 --- livekit-ffi/src/cabi.rs | 73 +++++++++++++++++++ livekit-ffi/src/lib.rs | 54 +------------- livekit-ffi/src/livekit.proto.rs | 23 +----- livekit-ffi/src/server/mod.rs | 38 +++++----- livekit-ffi/src/server/requests.rs | 28 +------ 8 files changed, 103 insertions(+), 126 deletions(-) create mode 100644 livekit-ffi/src/cabi.rs diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 7d21e3a2..eb771c24 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -190,6 +190,7 @@ impl SignalStream { log::debug!("server closed the connection: {:?}", close); break; } + Ok(Message::Frame(_)) => {} _ => { log::error!("unhandled websocket message {:?}", msg); break; diff --git a/livekit-ffi/Cargo.toml b/livekit-ffi/Cargo.toml index 276a272a..ead7095d 100644 --- a/livekit-ffi/Cargo.toml +++ b/livekit-ffi/Cargo.toml @@ -40,4 +40,4 @@ webrtc-sys-build = { path = "../webrtc-sys/build", version = "0.2.0" } livekit-api = { path = "../livekit-api", version = "0.2.0" } [lib] -crate-type = ["cdylib"] +crate-type = ["lib", "cdylib"] diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 3f97ff5d..6b0b1a77 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -54,7 +54,6 @@ import "audio_frame.proto"; // We always expect a response (FFIResponse, even if it's empty) message FfiRequest { oneof message { - InitializeRequest initialize = 1; DisposeRequest dispose = 2; // Room @@ -94,7 +93,6 @@ message FfiRequest { // This is the output of livekit_ffi_request function. message FfiResponse { oneof message { - InitializeResponse initialize = 1; DisposeResponse dispose = 2; // Room @@ -154,14 +152,6 @@ message FfiEvent { } } -// Setup the callback where the foreign language can receive events -// and responses to asynchronous requests -message InitializeRequest { - uint64 event_callback_ptr = 1; - bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord -} -message InitializeResponse {} - // Stop all rooms synchronously (Do we need async here?). // e.g: This is used for the Unity Editor after each assemblies reload. // TODO(theomonnom): Implement a debug mode where we can find all leaked handles? diff --git a/livekit-ffi/src/cabi.rs b/livekit-ffi/src/cabi.rs new file mode 100644 index 00000000..f8a71999 --- /dev/null +++ b/livekit-ffi/src/cabi.rs @@ -0,0 +1,73 @@ +use crate::{ + proto, + server::{self, FfiConfig}, + FfiHandleId, FFI_SERVER, +}; +use prost::Message; +use server::FfiDataBuffer; +use std::sync::Arc; + +/// # SAFTEY: The "C" callback must be threadsafe and not block +pub type FfiCallbackFn = unsafe extern "C" fn(*const u8, usize); + +/// # Safety +/// +/// The foreign language must only provide valid pointers +#[no_mangle] +pub unsafe extern "C" fn livekit_initialize(cb: FfiCallbackFn, capture_logs: bool) { + FFI_SERVER.setup(FfiConfig { + callback_fn: Arc::new(move |event| { + let data = event.encode_to_vec(); + cb(data.as_ptr(), data.len()); + }), + capture_logs, + }); + + log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); +} + +/// # Safety +/// +/// The foreign language must only provide valid pointers +#[no_mangle] +pub unsafe extern "C" fn livekit_ffi_request( + data: *const u8, + len: usize, + res_ptr: *mut *const u8, + res_len: *mut usize, +) -> FfiHandleId { + let data = unsafe { std::slice::from_raw_parts(data, len) }; + let res = match proto::FfiRequest::decode(data) { + Ok(res) => res, + Err(err) => { + panic!("failed to decode request: {}", err); + } + }; + + let res = match server::requests::handle_request(&FFI_SERVER, res) { + Ok(res) => res, + Err(err) => { + panic!("failed to handle request: {}", err); + } + } + .encode_to_vec(); + + unsafe { + *res_ptr = res.as_ptr(); + *res_len = res.len(); + } + + let handle_id = FFI_SERVER.next_id(); + let ffi_data = FfiDataBuffer { + handle: handle_id, + data: Arc::new(res), + }; + + FFI_SERVER.store_handle(handle_id, ffi_data); + handle_id +} + +#[no_mangle] +pub extern "C" fn livekit_ffi_drop_handle(handle_id: FfiHandleId) -> bool { + FFI_SERVER.drop_handle(handle_id) +} diff --git a/livekit-ffi/src/lib.rs b/livekit-ffi/src/lib.rs index b9959a06..3fbf12b8 100644 --- a/livekit-ffi/src/lib.rs +++ b/livekit-ffi/src/lib.rs @@ -20,8 +20,10 @@ use std::{borrow::Cow, sync::Arc}; use thiserror::Error; mod conversion; -mod proto; -mod server; + +pub mod cabi; +pub mod proto; +pub mod server; #[derive(Error, Debug)] pub enum FfiError { @@ -45,51 +47,3 @@ pub const INVALID_HANDLE: FfiHandleId = 0; lazy_static! { pub static ref FFI_SERVER: server::FfiServer = server::FfiServer::default(); } - -/// # Safety -/// -/// The foreign language must only provide valid pointers -#[no_mangle] -pub unsafe extern "C" fn livekit_ffi_request( - data: *const u8, - len: usize, - res_ptr: *mut *const u8, - res_len: *mut usize, -) -> FfiHandleId { - let data = unsafe { std::slice::from_raw_parts(data, len) }; - let res = match proto::FfiRequest::decode(data) { - Ok(res) => res, - Err(err) => { - log::error!("failed to decode request: {}", err); - return INVALID_HANDLE; - } - }; - - let res = match server::requests::handle_request(&FFI_SERVER, res) { - Ok(res) => res, - Err(err) => { - log::error!("failed to handle request: {}", err); - return INVALID_HANDLE; - } - } - .encode_to_vec(); - - unsafe { - *res_ptr = res.as_ptr(); - *res_len = res.len(); - } - - let handle_id = FFI_SERVER.next_id(); - let ffi_data = FfiDataBuffer { - handle: handle_id, - data: Arc::new(res), - }; - - FFI_SERVER.store_handle(handle_id, ffi_data); - handle_id -} - -#[no_mangle] -pub extern "C" fn livekit_ffi_drop_handle(handle_id: FfiHandleId) -> bool { - FFI_SERVER.drop_handle(handle_id) -} diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 4d318a76..aca0d99e 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -3015,7 +3015,7 @@ impl AudioSourceType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3023,8 +3023,6 @@ pub mod ffi_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Message { - #[prost(message, tag="1")] - Initialize(super::InitializeRequest), #[prost(message, tag="2")] Dispose(super::DisposeRequest), /// Room @@ -3085,7 +3083,7 @@ pub mod ffi_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3093,8 +3091,6 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Message { - #[prost(message, tag="1")] - Initialize(super::InitializeResponse), #[prost(message, tag="2")] Dispose(super::DisposeResponse), /// Room @@ -3197,21 +3193,6 @@ pub mod ffi_event { Logs(super::LogBatch), } } -/// Setup the callback where the foreign language can receive events -/// and responses to asynchronous requests -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitializeRequest { - #[prost(uint64, tag="1")] - pub event_callback_ptr: u64, - /// When true, the FfiServer will forward logs using LogRecord - #[prost(bool, tag="2")] - pub capture_logs: bool, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InitializeResponse { -} /// Stop all rooms synchronously (Do we need async here?). /// e.g: This is used for the Unity Editor after each assemblies reload. /// TODO(theomonnom): Implement a debug mode where we can find all leaked handles? diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index bf337bb4..9eaed35d 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{proto, FfiCallbackFn, INVALID_HANDLE}; +use crate::proto::FfiEvent; +use crate::{proto, INVALID_HANDLE}; use crate::{FfiError, FfiHandleId, FfiResult}; use dashmap::mapref::one::MappedRef; use dashmap::DashMap; @@ -21,7 +22,6 @@ use livekit::webrtc::native::audio_resampler::AudioResampler; use livekit::webrtc::prelude::*; use parking_lot::deadlock; use parking_lot::Mutex; -use prost::Message; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread; @@ -38,8 +38,10 @@ pub mod video_stream; //#[cfg(test)] //mod tests; +#[derive(Clone)] pub struct FfiConfig { - callback_fn: FfiCallbackFn, + pub callback_fn: Arc, + pub capture_logs: bool, } /// To make sure we use the right types, only types that implement this trait @@ -115,6 +117,13 @@ impl Default for FfiServer { // Using &'static self inside the implementation, not sure if this is really idiomatic // It simplifies the code a lot tho. In most cases the server is used until the end of the process impl FfiServer { + pub fn setup(&self, config: FfiConfig) { + *self.config.lock() = Some(config.clone()); + self.logger.set_capture_logs(config.capture_logs); + + log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); // TODO: Move this log + } + pub async fn dispose(&self) { log::info!("disposing the FfiServer, closing all rooms..."); @@ -136,20 +145,15 @@ impl FfiServer { } pub async fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> { - let callback_fn = self - .config - .lock() - .as_ref() - .map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn))?; - - // TODO(theomonnom): Don't reallocate - let message = proto::FfiEvent { - message: Some(message), - } - .encode_to_vec(); - - let cb_task = self.async_runtime.spawn_blocking(move || unsafe { - callback_fn(message.as_ptr(), message.len()); + let cb = self.config.lock().as_ref().map_or_else( + || Err(FfiError::NotConfigured), + |c| Ok(c.callback_fn.clone()), + )?; + + let cb_task = self.async_runtime.spawn_blocking(move || { + cb(proto::FfiEvent { + message: Some(message), + }); }); tokio::select! { diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 6534aee4..eb54a9ae 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -14,8 +14,7 @@ use super::room::{FfiParticipant, FfiPublication, FfiTrack}; use super::{ - audio_source, audio_stream, room, video_source, video_stream, FfiConfig, FfiError, FfiResult, - FfiServer, + audio_source, audio_stream, room, video_source, video_stream, FfiError, FfiResult, FfiServer, }; use crate::proto; use livekit::prelude::*; @@ -26,28 +25,6 @@ use parking_lot::Mutex; use std::slice; use std::sync::Arc; -/// This is the first request called by the foreign language -/// It sets the callback function to be called when an event is received -fn on_initialize( - server: &'static FfiServer, - init: proto::InitializeRequest, -) -> FfiResult { - if server.config.lock().is_some() { - return Err(FfiError::AlreadyInitialized); - } - - server.logger.set_capture_logs(init.capture_logs); - - // # SAFETY: The foreign language is responsible for ensuring that the callback function is valid - *server.config.lock() = Some(FfiConfig { - callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) }, - }); - - log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); - - Ok(proto::InitializeResponse::default()) -} - /// Dispose the server, close all rooms and clean up all handles /// It is not mandatory to call this function. fn on_dispose( @@ -769,9 +746,6 @@ pub fn handle_request( let mut res = proto::FfiResponse::default(); res.message = Some(match request { - proto::ffi_request::Message::Initialize(init) => { - proto::ffi_response::Message::Initialize(on_initialize(server, init)?) - } proto::ffi_request::Message::Dispose(dispose) => { proto::ffi_response::Message::Dispose(on_dispose(server, dispose)?) }