From 8e2d312a2deec48f49cf5936cfe16fd77cea4c7d Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Fri, 3 Jan 2025 12:04:17 -0800 Subject: [PATCH] Fix deadlocked engine in RPC (#532) * example * Fix deadlocked engine in RPC * generated protobuf * p * c * c * nanpa * mv --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- .nanpa/rpc-deadlock.kdl | 1 + examples/rpc/src/main.rs | 74 ++++++++++++++++++++++++++++++++++++++-- livekit/src/room/mod.rs | 23 +++++++------ 3 files changed, 86 insertions(+), 12 deletions(-) create mode 100644 .nanpa/rpc-deadlock.kdl diff --git a/.nanpa/rpc-deadlock.kdl b/.nanpa/rpc-deadlock.kdl new file mode 100644 index 00000000..35f6dbe7 --- /dev/null +++ b/.nanpa/rpc-deadlock.kdl @@ -0,0 +1 @@ +patch type="fixed" "Fixed deadlock with nested RPC calls" package="livekit" diff --git a/examples/rpc/src/main.rs b/examples/rpc/src/main.rs index 11e0694b..bc53ab69 100644 --- a/examples/rpc/src/main.rs +++ b/examples/rpc/src/main.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { connect_participant("math-genius", &room_name, &url, &api_key, &api_secret) )?; - register_receiver_methods(&greeters_room, &math_genius_room).await; + register_receiver_methods(greeters_room.clone(), math_genius_room.clone()).await; println!("\n\nRunning greeting example..."); perform_greeting(&callers_room).await?; @@ -67,6 +67,9 @@ async fn main() -> Result<(), Box> { sleep(Duration::from_secs(2)).await; perform_quantum_hypergeometric_series(&callers_room).await?; + println!("\n\nRunning nested calculation example..."); + perform_nested_calculation(&callers_room).await?; + println!("\n\nParticipants done, disconnecting..."); callers_room.close().await?; greeters_room.close().await?; @@ -77,7 +80,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: &Arc) { +async fn register_receiver_methods(greeters_room: Arc, math_genius_room: Arc) { greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| { Box::pin(async move { println!( @@ -133,6 +136,41 @@ async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: Ok(json!({"result": result}).to_string()) }) }); + + math_genius_room.local_participant().register_rpc_method("nested-calculation".to_string(), move |data| { + let math_genius_room = math_genius_room.clone(); + Box::pin(async move { + let json_data: Value = serde_json::from_str(&data.payload).unwrap(); + let number = json_data["number"].as_f64().unwrap(); + println!( + "[{}] [Math Genius] {} wants me to do a nested calculation on {}.", + elapsed_time(), + data.caller_identity, + number + ); + + match math_genius_room.local_participant().perform_rpc(PerformRpcData { + destination_identity: data.caller_identity.to_string(), + method: "provide-intermediate".to_string(), + payload: json!({"original": number}).to_string(), + ..Default::default() + }).await { + Ok(intermediate_response) => { + let intermediate: Value = serde_json::from_str(&intermediate_response).unwrap(); + let intermediate_value = intermediate["value"].as_f64().unwrap(); + let final_result = intermediate_value * 2.0; + println!("[{}] [Math Genius] Got intermediate value {}, final result is {}", + elapsed_time(), intermediate_value, final_result); + Ok(json!({"result": final_result}).to_string()) + } + Err(e) => Err(RpcError { + code: 1, + message: "Failed to get intermediate result".to_string(), + data: None, + }), + } + }) + }); } async fn perform_greeting(room: &Arc) -> Result<(), Box> { @@ -230,6 +268,38 @@ async fn perform_division(room: &Arc) -> Result<(), Box) -> Result<(), Box> { + room.local_participant().register_rpc_method("provide-intermediate".to_string(), |data| { + Box::pin(async move { + let json_data: Value = serde_json::from_str(&data.payload).unwrap(); + let original = json_data["original"].as_f64().unwrap(); + let intermediate = original + 10.0; + println!("[{}] [Caller] Providing intermediate calculation: {} + 10 = {}", + elapsed_time(), original, intermediate); + Ok(json!({"value": intermediate}).to_string()) + }) + }); + + println!("[{}] Starting nested calculation with value 5", elapsed_time()); + match room + .local_participant() + .perform_rpc(PerformRpcData { + destination_identity: "math-genius".to_string(), + method: "nested-calculation".to_string(), + payload: json!({"number": 5.0}).to_string(), + ..Default::default() + }) + .await + { + Ok(response) => { + let parsed_response: Value = serde_json::from_str(&response)?; + println!("[{}] Final result: {}", elapsed_time(), parsed_response["result"]); + } + Err(e) => log::error!("[{}] RPC call failed: {:?}", elapsed_time(), e), + } + Ok(()) +} + async fn connect_participant( identity: &str, room_name: &str, diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 502efb2c..5653002a 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -699,16 +699,19 @@ impl RoomSession { log::warn!("Received RPC request with null caller identity"); return Ok(()); } - self.local_participant - .handle_incoming_rpc_request( - caller_identity.unwrap(), - request_id, - method, - payload, - response_timeout, - version, - ) - .await; + let local_participant = self.local_participant.clone(); + livekit_runtime::spawn(async move { + local_participant + .handle_incoming_rpc_request( + caller_identity.unwrap(), + request_id, + method, + payload, + response_timeout, + version, + ) + .await; + }); } EngineEvent::RpcResponse { request_id, payload, error } => { self.local_participant.handle_incoming_rpc_response(request_id, payload, error);