Skip to content

Commit

Permalink
Fix deadlocked engine in RPC (#532)
Browse files Browse the repository at this point in the history
* example

* Fix deadlocked engine in RPC

* generated protobuf

* p

* c

* c

* nanpa

* mv

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
bcherry and github-actions[bot] authored Jan 3, 2025
1 parent cc889da commit 8e2d312
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
1 change: 1 addition & 0 deletions .nanpa/rpc-deadlock.kdl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fixed deadlock with nested RPC calls" package="livekit"
74 changes: 72 additions & 2 deletions examples/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
connect_participant("math-genius", &room_name, &url, &api_key, &api_secret)
)?;

register_receiver_methods(&greeters_room, &math_genius_room).await;
register_receiver_methods(greeters_room.clone(), math_genius_room.clone()).await;

println!("\n\nRunning greeting example...");
perform_greeting(&callers_room).await?;
Expand All @@ -67,6 +67,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
sleep(Duration::from_secs(2)).await;
perform_quantum_hypergeometric_series(&callers_room).await?;

println!("\n\nRunning nested calculation example...");
perform_nested_calculation(&callers_room).await?;

println!("\n\nParticipants done, disconnecting...");
callers_room.close().await?;
greeters_room.close().await?;
Expand All @@ -77,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room: &Arc<Room>) {
async fn register_receiver_methods(greeters_room: Arc<Room>, math_genius_room: Arc<Room>) {
greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| {
Box::pin(async move {
println!(
Expand Down Expand Up @@ -133,6 +136,41 @@ async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room:
Ok(json!({"result": result}).to_string())
})
});

math_genius_room.local_participant().register_rpc_method("nested-calculation".to_string(), move |data| {
let math_genius_room = math_genius_room.clone();
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let number = json_data["number"].as_f64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to do a nested calculation on {}.",
elapsed_time(),
data.caller_identity,
number
);

match math_genius_room.local_participant().perform_rpc(PerformRpcData {
destination_identity: data.caller_identity.to_string(),
method: "provide-intermediate".to_string(),
payload: json!({"original": number}).to_string(),
..Default::default()
}).await {
Ok(intermediate_response) => {
let intermediate: Value = serde_json::from_str(&intermediate_response).unwrap();
let intermediate_value = intermediate["value"].as_f64().unwrap();
let final_result = intermediate_value * 2.0;
println!("[{}] [Math Genius] Got intermediate value {}, final result is {}",
elapsed_time(), intermediate_value, final_result);
Ok(json!({"result": final_result}).to_string())
}
Err(e) => Err(RpcError {
code: 1,
message: "Failed to get intermediate result".to_string(),
data: None,
}),
}
})
});
}

async fn perform_greeting(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -230,6 +268,38 @@ async fn perform_division(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Er
Ok(())
}

async fn perform_nested_calculation(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Error>> {
room.local_participant().register_rpc_method("provide-intermediate".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let original = json_data["original"].as_f64().unwrap();
let intermediate = original + 10.0;
println!("[{}] [Caller] Providing intermediate calculation: {} + 10 = {}",
elapsed_time(), original, intermediate);
Ok(json!({"value": intermediate}).to_string())
})
});

println!("[{}] Starting nested calculation with value 5", elapsed_time());
match room
.local_participant()
.perform_rpc(PerformRpcData {
destination_identity: "math-genius".to_string(),
method: "nested-calculation".to_string(),
payload: json!({"number": 5.0}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
let parsed_response: Value = serde_json::from_str(&response)?;
println!("[{}] Final result: {}", elapsed_time(), parsed_response["result"]);
}
Err(e) => log::error!("[{}] RPC call failed: {:?}", elapsed_time(), e),
}
Ok(())
}

async fn connect_participant(
identity: &str,
room_name: &str,
Expand Down
23 changes: 13 additions & 10 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,16 +699,19 @@ impl RoomSession {
log::warn!("Received RPC request with null caller identity");
return Ok(());
}
self.local_participant
.handle_incoming_rpc_request(
caller_identity.unwrap(),
request_id,
method,
payload,
response_timeout,
version,
)
.await;
let local_participant = self.local_participant.clone();
livekit_runtime::spawn(async move {
local_participant
.handle_incoming_rpc_request(
caller_identity.unwrap(),
request_id,
method,
payload,
response_timeout,
version,
)
.await;
});
}
EngineEvent::RpcResponse { request_id, payload, error } => {
self.local_participant.handle_incoming_rpc_response(request_id, payload, error);
Expand Down

0 comments on commit 8e2d312

Please sign in to comment.