diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 4b3895d1..7d341fac 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -183,6 +183,7 @@ message RoomOptions { bool dynacast = 3; optional E2eeOptions e2ee = 4; optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration + uint32 join_retries = 6; } // diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index a4e04982..6677924b 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -166,6 +166,7 @@ impl From for RoomOptions { dynacast: value.dynacast, e2ee, rtc_config, + join_retries: value.join_retries, } } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 5b07b28f..a352fb9d 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2299,6 +2299,8 @@ pub struct RoomOptions { /// allow to setup a custom RtcConfiguration #[prost(message, optional, tag="5")] pub rtc_config: ::core::option::Option, + #[prost(uint32, tag="6")] + pub join_retries: u32, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 783c4fcd..7e642c5c 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -172,6 +172,7 @@ pub struct RoomOptions { pub dynacast: bool, pub e2ee: Option, pub rtc_config: RtcConfiguration, + pub join_retries: u32, } impl Default for RoomOptions { @@ -188,6 +189,7 @@ impl Default for RoomOptions { continual_gathering_policy: ContinualGatheringPolicy::GatherContinually, ice_transport_type: IceTransportsType::All, }, + join_retries: 3, } } } @@ -228,6 +230,7 @@ impl Room { auto_subscribe: options.auto_subscribe, adaptive_stream: options.adaptive_stream, }, + join_retries: options.join_retries, }, ) .await?; diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index f8b9d417..d3341c35 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -74,6 +74,7 @@ pub enum EngineError { pub struct EngineOptions { pub rtc_config: RtcConfiguration, pub signal_options: SignalOptions, + pub join_retries: u32, } #[derive(Debug)] @@ -258,35 +259,65 @@ impl EngineInner { options: EngineOptions, ) -> EngineResult<(Arc, proto::JoinResponse, EngineEvents)> { let lk_runtime = LkRuntime::instance(); + let max_retries = options.join_retries; + + let try_connect = { + move || { + let options = options.clone(); + let lk_runtime = lk_runtime.clone(); + async move { + let (session, join_response, session_events) = + RtcSession::connect(url, token, options.clone()).await?; + session.wait_pc_connection().await?; + + let (engine_tx, engine_rx) = mpsc::unbounded_channel(); + let inner = Arc::new(Self { + lk_runtime, + engine_tx, + close_notifier: Arc::new(Notify::new()), + running_handle: RwLock::new(EngineHandle { + session: Arc::new(session), + closed: false, + reconnecting: false, + full_reconnect: false, + engine_task: None, + }), + options, + reconnecting_lock: AsyncRwLock::default(), + reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)), + }); - let (session, join_response, session_events) = - RtcSession::connect(url, token, options.clone()).await?; - let (engine_tx, engine_rx) = mpsc::unbounded_channel(); - - session.wait_pc_connection().await?; - - let inner = Arc::new(Self { - lk_runtime, - engine_tx, - close_notifier: Arc::new(Notify::new()), - running_handle: RwLock::new(EngineHandle { - session: Arc::new(session), - closed: false, - reconnecting: false, - full_reconnect: false, - engine_task: None, - }), - options, - reconnecting_lock: AsyncRwLock::default(), - reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)), - }); + // Start initial tasks + let (close_tx, close_rx) = oneshot::channel(); + let session_task = + tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx)); + inner.running_handle.write().engine_task = Some((session_task, close_tx)); - // Start initial tasks - let (close_tx, close_rx) = oneshot::channel(); - let session_task = tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx)); - inner.running_handle.write().engine_task = Some((session_task, close_tx)); + Ok((inner, join_response, engine_rx)) + } + } + }; + + let mut last_error = None; + for i in 0..(max_retries + 1) { + match try_connect().await { + Ok(res) => return Ok(res), + Err(e) => { + let attempt_i = i + 1; + if i < max_retries { + log::warn!( + "failed to connect: {:?}, retrying... ({}/{})", + e, + attempt_i, + max_retries + ); + } + last_error = Some(e) + } + } + } - Ok((inner, join_response, engine_rx)) + Err(last_error.unwrap()) } async fn engine_task(