Skip to content

Commit

Permalink
fix: redundant close on SignalClient (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Oct 19, 2023
1 parent 14a7956 commit bfa62d7
Showing 1 changed file with 6 additions and 21 deletions.
27 changes: 6 additions & 21 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{interval, sleep, Instant};
use tokio_tungstenite::tungstenite::Error as WsError;
Expand Down Expand Up @@ -92,7 +92,7 @@ struct SignalInner {
pub struct SignalClient {
inner: Arc<SignalInner>,
emitter: SignalEmitter,
handle: Mutex<Option<(JoinHandle<()>, oneshot::Sender<()>)>>,
handle: Mutex<Option<JoinHandle<()>>>,
}

impl Debug for SignalClient {
Expand All @@ -115,20 +115,13 @@ impl SignalClient {
SignalInner::connect(url, token, options).await?;

let (emitter, events) = mpsc::unbounded_channel();
let (close_tx, close_rx) = oneshot::channel();

let signal_task = tokio::spawn(signal_task(
inner.clone(),
emitter.clone(),
stream_events,
close_rx,
));
let signal_task = tokio::spawn(signal_task(inner.clone(), emitter.clone(), stream_events));

Ok((
Self {
inner,
emitter,
handle: Mutex::new(Some((signal_task, close_tx))),
handle: Mutex::new(Some(signal_task)),
},
join_response,
events,
Expand All @@ -141,15 +134,13 @@ impl SignalClient {
self.close().await;

let (reconnect_response, stream_events) = self.inner.restart().await?;
let (close_tx, close_rx) = oneshot::channel();
let signal_task = tokio::spawn(signal_task(
self.inner.clone(),
self.emitter.clone(),
stream_events,
close_rx,
));

*self.handle.lock() = Some((signal_task, close_tx));
*self.handle.lock() = Some(signal_task);
Ok(reconnect_response)
}

Expand All @@ -165,8 +156,7 @@ impl SignalClient {
self.inner.close().await;

let handle = self.handle.lock().take();
if let Some((signal_task, close_tx)) = handle {
let _ = close_tx.send(());
if let Some(signal_task) = handle {
let _ = signal_task.await;
}
}
Expand Down Expand Up @@ -343,7 +333,6 @@ async fn signal_task(
inner: Arc<SignalInner>,
emitter: SignalEmitter, // Public emitter
mut internal_events: mpsc::UnboundedReceiver<Box<proto::signal_response::Message>>,
mut close_rx: oneshot::Receiver<()>,
) {
let mut ping_interval = interval(Duration::from_secs(
inner.join_response.ping_interval as u64,
Expand Down Expand Up @@ -400,10 +389,6 @@ async fn signal_task(
let _ = emitter.send(SignalEvent::Close("ping timeout".into()));
break;
}
_ = &mut close_rx => {
let _ = emitter.send(SignalEvent::Close("client closed".into()));
break;
}
}
}

Expand Down

0 comments on commit bfa62d7

Please sign in to comment.