Skip to content

Commit

Permalink
allow other ffi integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Nov 29, 2023
1 parent c7a431b commit 1282260
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 126 deletions.
1 change: 1 addition & 0 deletions livekit-api/src/signal_client/signal_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl SignalStream {
log::debug!("server closed the connection: {:?}", close);
break;
}
Ok(Message::Frame(_)) => {}
_ => {
log::error!("unhandled websocket message {:?}", msg);
break;
Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ webrtc-sys-build = { path = "../webrtc-sys/build", version = "0.2.0" }
livekit-api = { path = "../livekit-api", version = "0.2.0" }

[lib]
crate-type = ["cdylib"]
crate-type = ["lib", "cdylib"]
10 changes: 0 additions & 10 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import "audio_frame.proto";
// We always expect a response (FFIResponse, even if it's empty)
message FfiRequest {
oneof message {
InitializeRequest initialize = 1;
DisposeRequest dispose = 2;

// Room
Expand Down Expand Up @@ -94,7 +93,6 @@ message FfiRequest {
// This is the output of livekit_ffi_request function.
message FfiResponse {
oneof message {
InitializeResponse initialize = 1;
DisposeResponse dispose = 2;

// Room
Expand Down Expand Up @@ -154,14 +152,6 @@ message FfiEvent {
}
}

// Setup the callback where the foreign language can receive events
// and responses to asynchronous requests
message InitializeRequest {
uint64 event_callback_ptr = 1;
bool capture_logs = 2; // When true, the FfiServer will forward logs using LogRecord
}
message InitializeResponse {}

// Stop all rooms synchronously (Do we need async here?).
// e.g: This is used for the Unity Editor after each assemblies reload.
// TODO(theomonnom): Implement a debug mode where we can find all leaked handles?
Expand Down
73 changes: 73 additions & 0 deletions livekit-ffi/src/cabi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::{
proto,
server::{self, FfiConfig},
FfiHandleId, FFI_SERVER,
};
use prost::Message;
use server::FfiDataBuffer;
use std::sync::Arc;

/// # SAFTEY: The "C" callback must be threadsafe and not block
pub type FfiCallbackFn = unsafe extern "C" fn(*const u8, usize);

/// # Safety
///
/// The foreign language must only provide valid pointers
#[no_mangle]
pub unsafe extern "C" fn livekit_initialize(cb: FfiCallbackFn, capture_logs: bool) {
FFI_SERVER.setup(FfiConfig {
callback_fn: Arc::new(move |event| {
let data = event.encode_to_vec();
cb(data.as_ptr(), data.len());
}),
capture_logs,
});

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));
}

/// # Safety
///
/// The foreign language must only provide valid pointers
#[no_mangle]
pub unsafe extern "C" fn livekit_ffi_request(
data: *const u8,
len: usize,
res_ptr: *mut *const u8,
res_len: *mut usize,
) -> FfiHandleId {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let res = match proto::FfiRequest::decode(data) {
Ok(res) => res,
Err(err) => {
panic!("failed to decode request: {}", err);
}
};

let res = match server::requests::handle_request(&FFI_SERVER, res) {
Ok(res) => res,
Err(err) => {
panic!("failed to handle request: {}", err);
}
}
.encode_to_vec();

unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}

let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer {
handle: handle_id,
data: Arc::new(res),
};

FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
}

#[no_mangle]
pub extern "C" fn livekit_ffi_drop_handle(handle_id: FfiHandleId) -> bool {
FFI_SERVER.drop_handle(handle_id)
}
54 changes: 4 additions & 50 deletions livekit-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use std::{borrow::Cow, sync::Arc};
use thiserror::Error;

mod conversion;
mod proto;
mod server;

pub mod cabi;
pub mod proto;
pub mod server;

#[derive(Error, Debug)]
pub enum FfiError {
Expand All @@ -45,51 +47,3 @@ pub const INVALID_HANDLE: FfiHandleId = 0;
lazy_static! {
pub static ref FFI_SERVER: server::FfiServer = server::FfiServer::default();
}

/// # Safety
///
/// The foreign language must only provide valid pointers
#[no_mangle]
pub unsafe extern "C" fn livekit_ffi_request(
data: *const u8,
len: usize,
res_ptr: *mut *const u8,
res_len: *mut usize,
) -> FfiHandleId {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let res = match proto::FfiRequest::decode(data) {
Ok(res) => res,
Err(err) => {
log::error!("failed to decode request: {}", err);
return INVALID_HANDLE;
}
};

let res = match server::requests::handle_request(&FFI_SERVER, res) {
Ok(res) => res,
Err(err) => {
log::error!("failed to handle request: {}", err);
return INVALID_HANDLE;
}
}
.encode_to_vec();

unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}

let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer {
handle: handle_id,
data: Arc::new(res),
};

FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
}

#[no_mangle]
pub extern "C" fn livekit_ffi_drop_handle(handle_id: FfiHandleId) -> bool {
FFI_SERVER.drop_handle(handle_id)
}
23 changes: 2 additions & 21 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3015,16 +3015,14 @@ impl AudioSourceType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
pub mod ffi_request {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Message {
#[prost(message, tag="1")]
Initialize(super::InitializeRequest),
#[prost(message, tag="2")]
Dispose(super::DisposeRequest),
/// Room
Expand Down Expand Up @@ -3085,16 +3083,14 @@ pub mod ffi_request {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Message {
#[prost(message, tag="1")]
Initialize(super::InitializeResponse),
#[prost(message, tag="2")]
Dispose(super::DisposeResponse),
/// Room
Expand Down Expand Up @@ -3197,21 +3193,6 @@ pub mod ffi_event {
Logs(super::LogBatch),
}
}
/// Setup the callback where the foreign language can receive events
/// and responses to asynchronous requests
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InitializeRequest {
#[prost(uint64, tag="1")]
pub event_callback_ptr: u64,
/// When true, the FfiServer will forward logs using LogRecord
#[prost(bool, tag="2")]
pub capture_logs: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InitializeResponse {
}
/// Stop all rooms synchronously (Do we need async here?).
/// e.g: This is used for the Unity Editor after each assemblies reload.
/// TODO(theomonnom): Implement a debug mode where we can find all leaked handles?
Expand Down
38 changes: 21 additions & 17 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{proto, FfiCallbackFn, INVALID_HANDLE};
use crate::proto::FfiEvent;
use crate::{proto, INVALID_HANDLE};
use crate::{FfiError, FfiHandleId, FfiResult};
use dashmap::mapref::one::MappedRef;
use dashmap::DashMap;
Expand All @@ -21,7 +22,6 @@ use livekit::webrtc::native::audio_resampler::AudioResampler;
use livekit::webrtc::prelude::*;
use parking_lot::deadlock;
use parking_lot::Mutex;
use prost::Message;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
Expand All @@ -38,8 +38,10 @@ pub mod video_stream;
//#[cfg(test)]
//mod tests;

#[derive(Clone)]
pub struct FfiConfig {
callback_fn: FfiCallbackFn,
pub callback_fn: Arc<dyn Fn(FfiEvent) + Send + Sync>,
pub capture_logs: bool,
}

/// To make sure we use the right types, only types that implement this trait
Expand Down Expand Up @@ -115,6 +117,13 @@ impl Default for FfiServer {
// Using &'static self inside the implementation, not sure if this is really idiomatic
// It simplifies the code a lot tho. In most cases the server is used until the end of the process
impl FfiServer {
pub fn setup(&self, config: FfiConfig) {
*self.config.lock() = Some(config.clone());
self.logger.set_capture_logs(config.capture_logs);

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); // TODO: Move this log
}

pub async fn dispose(&self) {
log::info!("disposing the FfiServer, closing all rooms...");

Expand All @@ -136,20 +145,15 @@ impl FfiServer {
}

pub async fn send_event(&self, message: proto::ffi_event::Message) -> FfiResult<()> {
let callback_fn = self
.config
.lock()
.as_ref()
.map_or_else(|| Err(FfiError::NotConfigured), |c| Ok(c.callback_fn))?;

// TODO(theomonnom): Don't reallocate
let message = proto::FfiEvent {
message: Some(message),
}
.encode_to_vec();

let cb_task = self.async_runtime.spawn_blocking(move || unsafe {
callback_fn(message.as_ptr(), message.len());
let cb = self.config.lock().as_ref().map_or_else(
|| Err(FfiError::NotConfigured),
|c| Ok(c.callback_fn.clone()),
)?;

let cb_task = self.async_runtime.spawn_blocking(move || {
cb(proto::FfiEvent {
message: Some(message),
});
});

tokio::select! {
Expand Down
28 changes: 1 addition & 27 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

use super::room::{FfiParticipant, FfiPublication, FfiTrack};
use super::{
audio_source, audio_stream, room, video_source, video_stream, FfiConfig, FfiError, FfiResult,
FfiServer,
audio_source, audio_stream, room, video_source, video_stream, FfiError, FfiResult, FfiServer,
};
use crate::proto;
use livekit::prelude::*;
Expand All @@ -26,28 +25,6 @@ use parking_lot::Mutex;
use std::slice;
use std::sync::Arc;

/// This is the first request called by the foreign language
/// It sets the callback function to be called when an event is received
fn on_initialize(
server: &'static FfiServer,
init: proto::InitializeRequest,
) -> FfiResult<proto::InitializeResponse> {
if server.config.lock().is_some() {
return Err(FfiError::AlreadyInitialized);
}

server.logger.set_capture_logs(init.capture_logs);

// # SAFETY: The foreign language is responsible for ensuring that the callback function is valid
*server.config.lock() = Some(FfiConfig {
callback_fn: unsafe { std::mem::transmute(init.event_callback_ptr as usize) },
});

log::info!("initializing ffi server v{}", env!("CARGO_PKG_VERSION"));

Ok(proto::InitializeResponse::default())
}

/// Dispose the server, close all rooms and clean up all handles
/// It is not mandatory to call this function.
fn on_dispose(
Expand Down Expand Up @@ -769,9 +746,6 @@ pub fn handle_request(
let mut res = proto::FfiResponse::default();

res.message = Some(match request {
proto::ffi_request::Message::Initialize(init) => {
proto::ffi_response::Message::Initialize(on_initialize(server, init)?)
}
proto::ffi_request::Message::Dispose(dispose) => {
proto::ffi_response::Message::Dispose(on_dispose(server, dispose)?)
}
Expand Down

0 comments on commit 1282260

Please sign in to comment.