Skip to content

Commit

Permalink
ssh remote: Only send a single FlushBufferedMessages (zed-industries#…
Browse files Browse the repository at this point in the history
…19541)

Release Notes:

- N/A

---------

Co-authored-by: Bennet <[email protected]>
  • Loading branch information
mrnugget and bennetbo authored Oct 22, 2024
1 parent 27d1a56 commit a367c6d
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions crates/remote/src/ssh_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1613,9 +1613,18 @@ impl ChannelClient {
pub fn request<T: RequestMessage>(
&self,
payload: T,
) -> impl 'static + Future<Output = Result<T::Response>> {
self.request_internal(payload, true)
}

fn request_internal<T: RequestMessage>(
&self,
payload: T,
use_buffer: bool,
) -> impl 'static + Future<Output = Result<T::Response>> {
log::debug!("ssh request start. name:{}", T::NAME);
let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
let response =
self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
async move {
let response = response.await?;
log::debug!("ssh request finish. name:{}", T::NAME);
Expand All @@ -1627,7 +1636,9 @@ impl ChannelClient {
pub async fn resync(&self, timeout: Duration) -> Result<()> {
smol::future::or(
async {
self.request(proto::FlushBufferedMessages {}).await?;
self.request_internal(proto::FlushBufferedMessages {}, false)
.await?;

for envelope in self.buffer.lock().iter() {
self.outgoing_tx
.lock()
Expand Down Expand Up @@ -1663,18 +1674,23 @@ impl ChannelClient {
self.send_dynamic(payload.into_envelope(0, None, None))
}

pub fn request_dynamic(
fn request_dynamic(
&self,
mut envelope: proto::Envelope,
type_name: &'static str,
use_buffer: bool,
) -> impl 'static + Future<Output = Result<proto::Envelope>> {
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
let (tx, rx) = oneshot::channel();
let mut response_channels_lock = self.response_channels.lock();
response_channels_lock.insert(MessageId(envelope.id), tx);
drop(response_channels_lock);

let result = self.send_buffered(envelope);
let result = if use_buffer {
self.send_buffered(envelope)
} else {
self.send_unbuffered(envelope)
};
async move {
if let Err(error) = &result {
log::error!("failed to send message: {}", error);
Expand All @@ -1694,14 +1710,20 @@ impl ChannelClient {
self.send_buffered(envelope)
}

pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
envelope.ack_id = Some(self.max_received.load(SeqCst));
self.buffer.lock().push_back(envelope.clone());
// ignore errors on send (happen while we're reconnecting)
// assume that the global "disconnected" overlay is sufficient.
self.outgoing_tx.lock().unbounded_send(envelope).ok();
Ok(())
}

fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
envelope.ack_id = Some(self.max_received.load(SeqCst));
self.outgoing_tx.lock().unbounded_send(envelope).ok();
Ok(())
}
}

impl ProtoClient for ChannelClient {
Expand All @@ -1710,7 +1732,7 @@ impl ProtoClient for ChannelClient {
envelope: proto::Envelope,
request_type: &'static str,
) -> BoxFuture<'static, Result<proto::Envelope>> {
self.request_dynamic(envelope, request_type).boxed()
self.request_dynamic(envelope, request_type, true).boxed()
}

fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
Expand Down

0 comments on commit a367c6d

Please sign in to comment.