From a5492b3ea6b00577a8461223844f1a3c47ac1d4f Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 18 Oct 2024 16:08:56 -0600 Subject: [PATCH 1/4] Revert "SSH reconnect reliability (#19398)" (#19440) This reverts commit 98ecb43b2dac3d70f43d745ab32be5a3d4bf323b. Tests fail on main?! Closes #ISSUE Release Notes: - N/A --- Cargo.lock | 1 - .../remote_editing_collaboration_tests.rs | 3 +- crates/project/src/project.rs | 4 - crates/proto/proto/zed.proto | 8 +- crates/proto/src/macros.rs | 1 - crates/proto/src/proto.rs | 2 - crates/remote/Cargo.toml | 1 - crates/remote/src/ssh_session.rs | 435 +++--------------- .../remote_server/src/remote_editing_tests.rs | 46 +- 9 files changed, 79 insertions(+), 422 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f32d24dbfb6c2f..a27005e5d12df9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9119,7 +9119,6 @@ name = "remote" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "collections", "fs", "futures 0.3.30", diff --git a/crates/collab/src/tests/remote_editing_collaboration_tests.rs b/crates/collab/src/tests/remote_editing_collaboration_tests.rs index a23aadf5e02a52..dae33457555ec4 100644 --- a/crates/collab/src/tests/remote_editing_collaboration_tests.rs +++ b/crates/collab/src/tests/remote_editing_collaboration_tests.rs @@ -26,7 +26,7 @@ async fn test_sharing_an_ssh_remote_project( .await; // Set up project on remote FS - let (forwarder, server_ssh) = SshRemoteClient::fake_server(server_cx); + let (client_ssh, server_ssh) = SshRemoteClient::fake(cx_a, server_cx); let remote_fs = FakeFs::new(server_cx.executor()); remote_fs .insert_tree( @@ -67,7 +67,6 @@ async fn test_sharing_an_ssh_remote_project( ) }); - let client_ssh = SshRemoteClient::fake_client(forwarder, cx_a).await; let (project_a, worktree_id) = client_a .build_ssh_project("/code/project1", client_ssh, cx_a) .await; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ea9e78cb7a914..70d5962647c487 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1243,10 +1243,6 @@ impl Project { self.client.clone() } - pub fn ssh_client(&self) -> Option> { - self.ssh_client.clone() - } - pub fn user_store(&self) -> Model { self.user_store.clone() } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index e089fcdebaa3f4..09891de6fecd3b 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -12,7 +12,6 @@ message Envelope { uint32 id = 1; optional uint32 responding_to = 2; optional PeerId original_sender_id = 3; - optional uint32 ack_id = 266; oneof payload { Hello hello = 4; @@ -296,9 +295,7 @@ message Envelope { OpenServerSettings open_server_settings = 263; GetPermalinkToLine get_permalink_to_line = 264; - GetPermalinkToLineResponse get_permalink_to_line_response = 265; - - FlushBufferedMessages flush_buffered_messages = 267; + GetPermalinkToLineResponse get_permalink_to_line_response = 265; // current max } reserved 87 to 88; @@ -2524,6 +2521,3 @@ message GetPermalinkToLine { message GetPermalinkToLineResponse { string permalink = 1; } - -message FlushBufferedMessages {} -message FlushBufferedMessagesResponse {} diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs index 2ce0c0df259d8d..4fdbfff81b6c4f 100644 --- a/crates/proto/src/macros.rs +++ b/crates/proto/src/macros.rs @@ -32,7 +32,6 @@ macro_rules! messages { responding_to, original_sender_id, payload: Some(envelope::Payload::$name(self)), - ack_id: None, } } diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 8179473feaa5f4..ffbbeb49c2b48a 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -372,7 +372,6 @@ messages!( (OpenServerSettings, Foreground), (GetPermalinkToLine, Foreground), (GetPermalinkToLineResponse, Foreground), - (FlushBufferedMessages, Foreground), ); request_messages!( @@ -499,7 +498,6 @@ request_messages!( (RemoveWorktree, Ack), (OpenServerSettings, OpenBufferResponse), (GetPermalinkToLine, GetPermalinkToLineResponse), - (FlushBufferedMessages, Ack), ); entity_messages!( diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 937a69ee592787..b8c5f34cc5854a 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -19,7 +19,6 @@ test-support = ["fs/test-support"] [dependencies] anyhow.workspace = true -async-trait.workspace = true collections.workspace = true fs.workspace = true futures.workspace = true diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index 74ee837e46584f..f7ef74ce392398 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -6,7 +6,6 @@ use crate::{ proxy::ProxyLaunchError, }; use anyhow::{anyhow, Context as _, Result}; -use async_trait::async_trait; use collections::HashMap; use futures::{ channel::{ @@ -32,7 +31,6 @@ use smol::{ }; use std::{ any::TypeId, - collections::VecDeque, ffi::OsStr, fmt, ops::ControlFlow, @@ -278,7 +276,7 @@ async fn run_cmd(command: &mut process::Command) -> Result { } } -pub struct ChannelForwarder { +struct ChannelForwarder { quit_tx: UnboundedSender<()>, forwarding_task: Task<(UnboundedSender, UnboundedReceiver)>, } @@ -349,7 +347,7 @@ const MAX_RECONNECT_ATTEMPTS: usize = 3; enum State { Connecting, Connected { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -359,7 +357,7 @@ enum State { HeartbeatMissed { missed_heartbeats: usize, - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -368,7 +366,7 @@ enum State { }, Reconnecting, ReconnectFailed { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -394,11 +392,11 @@ impl fmt::Display for State { } impl State { - fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> { + fn ssh_connection(&self) -> Option<&SshRemoteConnection> { match self { - Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()), + Self::Connected { ssh_connection, .. } => Some(ssh_connection), + Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection), + Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection), _ => None, } } @@ -543,19 +541,23 @@ impl SshRemoteClient { let (proxy, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); - let (ssh_connection, io_task) = Self::establish_connection( + let (ssh_connection, ssh_proxy_process) = Self::establish_connection( unique_identifier, false, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) .await?; - let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.downgrade(), + ssh_proxy_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { log::error!("failed to establish connection: {}", error); @@ -701,24 +703,30 @@ impl SshRemoteClient { }; } - if let Err(error) = ssh_connection.kill().await.context("Failed to kill ssh process") { + if let Err(error) = ssh_connection.master_process.kill() { failed!(error, attempts, ssh_connection, delegate, forwarder); }; - let connection_options = ssh_connection.connection_options(); + if let Err(error) = ssh_connection + .master_process + .status() + .await + .context("Failed to kill ssh process") + { + failed!(error, attempts, ssh_connection, delegate, forwarder); + } + + let connection_options = ssh_connection.socket.connection_options.clone(); let (incoming_tx, outgoing_rx) = forwarder.into_channels().await; let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1); - let (ssh_connection, io_task) = match Self::establish_connection( + let (ssh_connection, ssh_process) = match Self::establish_connection( identifier, true, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) @@ -730,9 +738,16 @@ impl SshRemoteClient { } }; - let multiplex_task = Self::monitor(this.clone(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.clone(), + ssh_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); - if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await { + if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { failed!(error, attempts, ssh_connection, delegate, forwarder); }; @@ -896,17 +911,18 @@ impl SshRemoteClient { } fn multiplex( + this: WeakModel, mut ssh_proxy_process: Child, incoming_tx: UnboundedSender, mut outgoing_rx: UnboundedReceiver, mut connection_activity_tx: Sender<()>, cx: &AsyncAppContext, - ) -> Task>> { + ) -> Task> { let mut child_stderr = ssh_proxy_process.stderr.take().unwrap(); let mut child_stdout = ssh_proxy_process.stdout.take().unwrap(); let mut child_stdin = ssh_proxy_process.stdin.take().unwrap(); - cx.background_executor().spawn(async move { + let io_task = cx.background_executor().spawn(async move { let mut stdin_buffer = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -985,14 +1001,8 @@ impl SshRemoteClient { } } } - }) - } + }); - fn monitor( - this: WeakModel, - io_task: Task>>, - cx: &AsyncAppContext, - ) -> Task> { cx.spawn(|mut cx| async move { let result = io_task.await; @@ -1051,40 +1061,21 @@ impl SshRemoteClient { cx.notify(); } - #[allow(clippy::too_many_arguments)] async fn establish_connection( unique_identifier: String, reconnect: bool, connection_options: SshConnectionOptions, - proxy_incoming_tx: UnboundedSender, - proxy_outgoing_rx: UnboundedReceiver, - connection_activity_tx: Sender<()>, delegate: Arc, cx: &mut AsyncAppContext, - ) -> Result<(Box, Task>>)> { - #[cfg(any(test, feature = "test-support"))] - if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) { - let io_task = fake::SshRemoteConnection::multiplex( - fake.connection_options(), - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - cx, - ) - .await; - return Ok((fake, io_task)); - } - + ) -> Result<(SshRemoteConnection, Child)> { let ssh_connection = SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?; let platform = ssh_connection.query_platform().await?; let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?; - if !reconnect { - ssh_connection - .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) - .await?; - } + ssh_connection + .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) + .await?; let socket = ssh_connection.socket.clone(); run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?; @@ -1109,15 +1100,7 @@ impl SshRemoteClient { .spawn() .context("failed to spawn remote server")?; - let io_task = Self::multiplex( - ssh_proxy_process, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - &cx, - ); - - Ok((Box::new(ssh_connection), io_task)) + Ok((ssh_connection, ssh_proxy_process)) } pub fn subscribe_to_entity(&self, remote_id: u64, entity: &Model) { @@ -1129,7 +1112,7 @@ impl SshRemoteClient { .lock() .as_ref() .and_then(|state| state.ssh_connection()) - .map(|ssh_connection| ssh_connection.ssh_args()) + .map(|ssh_connection| ssh_connection.socket.ssh_args()) } pub fn proto_client(&self) -> AnyProtoClient { @@ -1144,6 +1127,7 @@ impl SshRemoteClient { self.connection_options.clone() } + #[cfg(not(any(test, feature = "test-support")))] pub fn connection_state(&self) -> ConnectionState { self.state .lock() @@ -1152,69 +1136,37 @@ impl SshRemoteClient { .unwrap_or(ConnectionState::Disconnected) } - pub fn is_disconnected(&self) -> bool { - self.connection_state() == ConnectionState::Disconnected - } - #[cfg(any(test, feature = "test-support"))] - pub fn simulate_disconnect(&self, cx: &mut AppContext) -> Task<()> { - use gpui::BorrowAppContext; - - let port = self.connection_options().port.unwrap(); - - let disconnect = - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.take(port).into_channels()); - cx.spawn(|mut cx| async move { - let (input_rx, output_tx) = disconnect.await; - let (forwarder, _, _) = ChannelForwarder::new(input_rx, output_tx, &mut cx); - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.replace(port, forwarder)) - .unwrap() - }) + pub fn connection_state(&self) -> ConnectionState { + ConnectionState::Connected } - #[cfg(any(test, feature = "test-support"))] - pub fn fake_server( - server_cx: &mut gpui::TestAppContext, - ) -> (ChannelForwarder, Arc) { - server_cx.update(|cx| { - let (outgoing_tx, outgoing_rx) = mpsc::unbounded::(); - let (incoming_tx, incoming_rx) = mpsc::unbounded::(); - - // We use the forwarder on the server side (in production we only use one on the client side) - // the idea is that we can simulate a disconnect/reconnect by just messing with the forwarder. - let (forwarder, _, _) = - ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx.to_async()); - - let client = ChannelClient::new(incoming_rx, outgoing_tx, cx); - (forwarder, client) - }) + pub fn is_disconnected(&self) -> bool { + self.connection_state() == ConnectionState::Disconnected } #[cfg(any(test, feature = "test-support"))] - pub async fn fake_client( - forwarder: ChannelForwarder, + pub fn fake( client_cx: &mut gpui::TestAppContext, - ) -> Model { - use gpui::BorrowAppContext; - client_cx - .update(|cx| { - let port = cx.update_default_global(|c: &mut fake::GlobalConnections, _cx| { - c.push(forwarder) - }); + server_cx: &mut gpui::TestAppContext, + ) -> (Model, Arc) { + use gpui::Context; - Self::new( - "fake".to_string(), - SshConnectionOptions { - host: "".to_string(), - port: Some(port), - ..Default::default() - }, - Arc::new(fake::Delegate), - cx, - ) - }) - .await - .unwrap() + let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded(); + let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded(); + + ( + client_cx.update(|cx| { + let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx); + cx.new_model(|_| Self { + client, + unique_identifier: "fake".to_string(), + connection_options: SshConnectionOptions::default(), + state: Arc::new(Mutex::new(None)), + }) + }), + server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)), + ) } } @@ -1224,13 +1176,6 @@ impl From for AnyProtoClient { } } -#[async_trait] -trait SshRemoteProcess: Send + Sync { - async fn kill(&mut self) -> Result<()>; - fn ssh_args(&self) -> Vec; - fn connection_options(&self) -> SshConnectionOptions; -} - struct SshRemoteConnection { socket: SshSocket, master_process: process::Child, @@ -1245,25 +1190,6 @@ impl Drop for SshRemoteConnection { } } -#[async_trait] -impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - self.master_process.kill()?; - - self.master_process.status().await?; - - Ok(()) - } - - fn ssh_args(&self) -> Vec { - self.socket.ssh_args() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.socket.connection_options.clone() - } -} - impl SshRemoteConnection { #[cfg(not(unix))] async fn new( @@ -1546,10 +1472,8 @@ type ResponseChannels = Mutex, - buffer: Mutex>, - response_channels: ResponseChannels, - message_handlers: Mutex, - max_received: AtomicU32, + response_channels: ResponseChannels, // Lock + message_handlers: Mutex, // Lock } impl ChannelClient { @@ -1561,10 +1485,8 @@ impl ChannelClient { let this = Arc::new(Self { outgoing_tx, next_message_id: AtomicU32::new(0), - max_received: AtomicU32::new(0), response_channels: ResponseChannels::default(), message_handlers: Default::default(), - buffer: Mutex::new(VecDeque::new()), }); Self::start_handling_messages(this.clone(), incoming_rx, cx); @@ -1585,27 +1507,6 @@ impl ChannelClient { let Some(this) = this.upgrade() else { return anyhow::Ok(()); }; - if let Some(ack_id) = incoming.ack_id { - let mut buffer = this.buffer.lock(); - while buffer.front().is_some_and(|msg| msg.id <= ack_id) { - buffer.pop_front(); - } - } - if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = - &incoming.payload - { - { - let buffer = this.buffer.lock(); - for envelope in buffer.iter() { - this.outgoing_tx.unbounded_send(envelope.clone()).ok(); - } - } - let response = proto::Ack {}.into_envelope(0, Some(incoming.id), None); - this.send_dynamic(response).ok(); - continue; - } - - this.max_received.store(incoming.id, SeqCst); if let Some(request_id) = incoming.responding_to { let request_id = MessageId(request_id); @@ -1682,23 +1583,6 @@ impl ChannelClient { } } - pub async fn resync(&self, timeout: Duration) -> Result<()> { - smol::future::or( - async { - self.request(proto::FlushBufferedMessages {}).await?; - for envelope in self.buffer.lock().iter() { - self.outgoing_tx.unbounded_send(envelope.clone()).ok(); - } - Ok(()) - }, - async { - smol::Timer::after(timeout).await; - Err(anyhow!("Timeout detected")) - }, - ) - .await - } - pub async fn ping(&self, timeout: Duration) -> Result<()> { smol::future::or( async { @@ -1728,8 +1612,7 @@ impl ChannelClient { let mut response_channels_lock = self.response_channels.lock(); response_channels_lock.insert(MessageId(envelope.id), tx); drop(response_channels_lock); - - let result = self.send_buffered(envelope); + let result = self.outgoing_tx.unbounded_send(envelope); async move { if let Err(error) = &result { log::error!("failed to send message: {}", error); @@ -1746,12 +1629,6 @@ impl ChannelClient { pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> { envelope.id = self.next_message_id.fetch_add(1, SeqCst); - self.send_buffered(envelope) - } - - pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> { - envelope.ack_id = Some(self.max_received.load(SeqCst)); - self.buffer.lock().push_back(envelope.clone()); self.outgoing_tx.unbounded_send(envelope)?; Ok(()) } @@ -1782,165 +1659,3 @@ impl ProtoClient for ChannelClient { false } } - -#[cfg(any(test, feature = "test-support"))] -mod fake { - use std::path::PathBuf; - - use anyhow::Result; - use async_trait::async_trait; - use futures::{ - channel::{ - mpsc::{self, Sender}, - oneshot, - }, - select_biased, FutureExt, SinkExt, StreamExt, - }; - use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task}; - use rpc::proto::Envelope; - - use super::{ - ChannelForwarder, SshClientDelegate, SshConnectionOptions, SshPlatform, SshRemoteProcess, - }; - - pub(super) struct SshRemoteConnection { - connection_options: SshConnectionOptions, - } - - impl SshRemoteConnection { - pub(super) fn new( - connection_options: &SshConnectionOptions, - ) -> Option> { - if connection_options.host == "" { - return Some(Box::new(Self { - connection_options: connection_options.clone(), - })); - } - return None; - } - pub(super) async fn multiplex( - connection_options: SshConnectionOptions, - mut client_tx: mpsc::UnboundedSender, - mut client_rx: mpsc::UnboundedReceiver, - mut connection_activity_tx: Sender<()>, - cx: &mut AsyncAppContext, - ) -> Task>> { - let (server_tx, server_rx) = cx - .update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.take(connection_options.port.unwrap()) - }) - }) - .unwrap() - .into_channels() - .await; - - let (forwarder, mut proxy_tx, mut proxy_rx) = - ChannelForwarder::new(server_tx, server_rx, cx); - - cx.update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.replace(connection_options.port.unwrap(), forwarder) - }) - }) - .unwrap(); - - cx.background_executor().spawn(async move { - loop { - select_biased! { - server_to_client = proxy_rx.next().fuse() => { - let Some(server_to_client) = server_to_client else { - return Ok(Some(1)) - }; - connection_activity_tx.try_send(()).ok(); - client_tx.send(server_to_client).await.ok(); - } - client_to_server = client_rx.next().fuse() => { - let Some(client_to_server) = client_to_server else { - return Ok(None) - }; - proxy_tx.send(client_to_server).await.ok(); - - } - } - } - }) - } - } - - #[async_trait] - impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - Ok(()) - } - - fn ssh_args(&self) -> Vec { - Vec::new() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.connection_options.clone() - } - } - - #[derive(Default)] - pub(super) struct GlobalConnections(Vec>); - impl Global for GlobalConnections {} - - impl GlobalConnections { - pub(super) fn push(&mut self, forwarder: ChannelForwarder) -> u16 { - self.0.push(Some(forwarder)); - self.0.len() as u16 - 1 - } - - pub(super) fn take(&mut self, port: u16) -> ChannelForwarder { - self.0 - .get_mut(port as usize) - .expect("no fake server for port") - .take() - .expect("fake server is already borrowed") - } - pub(super) fn replace(&mut self, port: u16, forwarder: ChannelForwarder) { - let ret = self - .0 - .get_mut(port as usize) - .expect("no fake server for port") - .replace(forwarder); - if ret.is_some() { - panic!("fake server is already replaced"); - } - } - } - - pub(super) struct Delegate; - - impl SshClientDelegate for Delegate { - fn ask_password( - &self, - _: String, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn remote_server_binary_path( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> Result { - unreachable!() - } - fn get_server_binary( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) { - unreachable!() - } - fn set_error(&self, _: String, _: &mut AsyncAppContext) { - unreachable!() - } - } -} diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index bfe58931c9e89b..41065ad5508310 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -641,47 +641,6 @@ async fn test_open_server_settings(cx: &mut TestAppContext, server_cx: &mut Test }) } -#[gpui::test(iterations = 10)] -async fn test_reconnect(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { - let (project, _headless, fs) = init_test(cx, server_cx).await; - - let (worktree, _) = project - .update(cx, |project, cx| { - project.find_or_create_worktree("/code/project1", true, cx) - }) - .await - .unwrap(); - - let worktree_id = worktree.read_with(cx, |worktree, _| worktree.id()); - let buffer = project - .update(cx, |project, cx| { - project.open_buffer((worktree_id, Path::new("src/lib.rs")), cx) - }) - .await - .unwrap(); - - buffer.update(cx, |buffer, cx| { - assert_eq!(buffer.text(), "fn one() -> usize { 1 }"); - let ix = buffer.text().find('1').unwrap(); - buffer.edit([(ix..ix + 1, "100")], None, cx); - }); - - let client = cx.read(|cx| project.read(cx).ssh_client().unwrap()); - client - .update(cx, |client, cx| client.simulate_disconnect(cx)) - .detach(); - - project - .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx)) - .await - .unwrap(); - - assert_eq!( - fs.load("/code/project1/src/lib.rs".as_ref()).await.unwrap(), - "fn one() -> usize { 100 }" - ); -} - fn init_logger() { if std::env::var("RUST_LOG").is_ok() { env_logger::try_init().ok(); @@ -692,9 +651,9 @@ async fn init_test( cx: &mut TestAppContext, server_cx: &mut TestAppContext, ) -> (Model, Model, Arc) { + let (ssh_remote_client, ssh_server_client) = SshRemoteClient::fake(cx, server_cx); init_logger(); - let (forwarder, ssh_server_client) = SshRemoteClient::fake_server(server_cx); let fs = FakeFs::new(server_cx.executor()); fs.insert_tree( "/code", @@ -735,9 +694,8 @@ async fn init_test( cx, ) }); + let project = build_project(ssh_remote_client, cx); - let ssh = SshRemoteClient::fake_client(forwarder, cx).await; - let project = build_project(ssh, cx); project .update(cx, { let headless = headless.clone(); From 30e081b3f738846cf26b68efabc6e72b2df8ed0d Mon Sep 17 00:00:00 2001 From: Marshall Bowers Date: Fri, 18 Oct 2024 18:31:08 -0400 Subject: [PATCH 2/4] elixir: Bump to v0.1.1 (#19437) This PR bumps the Elixir extension to v0.1.1. Changes: - https://github.com/zed-industries/zed/pull/19135 Release Notes: - N/A --- Cargo.lock | 2 +- extensions/elixir/Cargo.toml | 2 +- extensions/elixir/extension.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a27005e5d12df9..2b51a987642276 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14742,7 +14742,7 @@ dependencies = [ [[package]] name = "zed_elixir" -version = "0.1.0" +version = "0.1.1" dependencies = [ "zed_extension_api 0.1.0", ] diff --git a/extensions/elixir/Cargo.toml b/extensions/elixir/Cargo.toml index 174711b1ba1931..139d21f1c5e895 100644 --- a/extensions/elixir/Cargo.toml +++ b/extensions/elixir/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zed_elixir" -version = "0.1.0" +version = "0.1.1" edition = "2021" publish = false license = "Apache-2.0" diff --git a/extensions/elixir/extension.toml b/extensions/elixir/extension.toml index 6f00c29a79650d..ba8a1f66872659 100644 --- a/extensions/elixir/extension.toml +++ b/extensions/elixir/extension.toml @@ -1,7 +1,7 @@ id = "elixir" name = "Elixir" description = "Elixir support." -version = "0.1.0" +version = "0.1.1" schema_version = 1 authors = ["Marshall Bowers "] repository = "https://github.com/zed-industries/zed" From 8a912726d7dd4136aae10fcc868e36e59db9c434 Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Fri, 18 Oct 2024 15:41:43 -0700 Subject: [PATCH 3/4] Fix flakey SSH connection (#19439) Fixes a bug due to the `select!` macro tossing futures that had partially read messages, causing us to desync our message reading with the input stream. Release Notes: - N/A --------- Co-authored-by: Conrad Irwin Co-authored-by: conrad --- crates/remote/src/protocol.rs | 5 +- crates/remote/src/ssh_session.rs | 144 +++++++++++++++---------------- crates/remote_server/src/unix.rs | 23 +++-- 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/crates/remote/src/protocol.rs b/crates/remote/src/protocol.rs index 311385f73b15b0..787094781d831b 100644 --- a/crates/remote/src/protocol.rs +++ b/crates/remote/src/protocol.rs @@ -2,7 +2,6 @@ use anyhow::Result; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use prost::Message as _; use rpc::proto::Envelope; -use std::mem::size_of; #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct MessageId(pub u32); @@ -30,8 +29,10 @@ pub async fn read_message( ) -> Result { buffer.resize(MESSAGE_LEN_SIZE, 0); stream.read_exact(buffer).await?; + let len = message_len_from_buffer(buffer); - read_message_with_len(stream, buffer, len).await + let result = read_message_with_len(stream, buffer, len).await; + result } pub async fn write_message( diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index f7ef74ce392398..1d8006a0608a70 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -13,8 +13,7 @@ use futures::{ oneshot, }, future::BoxFuture, - select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt, - StreamExt as _, + select_biased, AsyncReadExt as _, Future, FutureExt as _, SinkExt, StreamExt as _, }; use gpui::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task, @@ -922,92 +921,90 @@ impl SshRemoteClient { let mut child_stdout = ssh_proxy_process.stdout.take().unwrap(); let mut child_stdin = ssh_proxy_process.stdin.take().unwrap(); - let io_task = cx.background_executor().spawn(async move { - let mut stdin_buffer = Vec::new(); - let mut stdout_buffer = Vec::new(); - let mut stderr_buffer = Vec::new(); - let mut stderr_offset = 0; + let mut stdin_buffer = Vec::new(); + let mut stdout_buffer = Vec::new(); + let mut stderr_buffer = Vec::new(); + let mut stderr_offset = 0; - loop { - stdout_buffer.resize(MESSAGE_LEN_SIZE, 0); - stderr_buffer.resize(stderr_offset + 1024, 0); + let stdin_task = cx.background_executor().spawn(async move { + while let Some(outgoing) = outgoing_rx.next().await { + write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?; + } + anyhow::Ok(()) + }); - select_biased! { - outgoing = outgoing_rx.next().fuse() => { - let Some(outgoing) = outgoing else { - return anyhow::Ok(None); - }; + let stdout_task = cx.background_executor().spawn({ + let mut connection_activity_tx = connection_activity_tx.clone(); + async move { + loop { + stdout_buffer.resize(MESSAGE_LEN_SIZE, 0); + let len = child_stdout.read(&mut stdout_buffer).await?; - write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?; + if len == 0 { + return anyhow::Ok(()); } - result = child_stdout.read(&mut stdout_buffer).fuse() => { - match result { - Ok(0) => { - child_stdin.close().await?; - outgoing_rx.close(); - let status = ssh_proxy_process.status().await?; - // If we don't have a code, we assume process - // has been killed and treat it as non-zero exit - // code - return Ok(status.code().or_else(|| Some(1))); - } - Ok(len) => { - if len < stdout_buffer.len() { - child_stdout.read_exact(&mut stdout_buffer[len..]).await?; - } - - let message_len = message_len_from_buffer(&stdout_buffer); - match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await { - Ok(envelope) => { - connection_activity_tx.try_send(()).ok(); - incoming_tx.unbounded_send(envelope).ok(); - } - Err(error) => { - log::error!("error decoding message {error:?}"); - } - } - } - Err(error) => { - Err(anyhow!("error reading stdout: {error:?}"))?; - } - } + if len < MESSAGE_LEN_SIZE { + child_stdout.read_exact(&mut stdout_buffer[len..]).await?; } - result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => { - match result { - Ok(len) => { - stderr_offset += len; - let mut start_ix = 0; - while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') { - let line_ix = start_ix + ix; - let content = &stderr_buffer[start_ix..line_ix]; - start_ix = line_ix + 1; - if let Ok(record) = serde_json::from_slice::(content) { - record.log(log::logger()) - } else { - eprintln!("(remote) {}", String::from_utf8_lossy(content)); - } - } - stderr_buffer.drain(0..start_ix); - stderr_offset -= start_ix; + let message_len = message_len_from_buffer(&stdout_buffer); + let envelope = + read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len) + .await?; + connection_activity_tx.try_send(()).ok(); + incoming_tx.unbounded_send(envelope).ok(); + } + } + }); - connection_activity_tx.try_send(()).ok(); - } - Err(error) => { - Err(anyhow!("error reading stderr: {error:?}"))?; - } - } + let stderr_task: Task> = cx.background_executor().spawn(async move { + loop { + stderr_buffer.resize(stderr_offset + 1024, 0); + + let len = child_stderr + .read(&mut stderr_buffer[stderr_offset..]) + .await?; + + stderr_offset += len; + let mut start_ix = 0; + while let Some(ix) = stderr_buffer[start_ix..stderr_offset] + .iter() + .position(|b| b == &b'\n') + { + let line_ix = start_ix + ix; + let content = &stderr_buffer[start_ix..line_ix]; + start_ix = line_ix + 1; + if let Ok(record) = serde_json::from_slice::(content) { + record.log(log::logger()) + } else { + eprintln!("(remote) {}", String::from_utf8_lossy(content)); } } + stderr_buffer.drain(0..start_ix); + stderr_offset -= start_ix; + + connection_activity_tx.try_send(()).ok(); } }); cx.spawn(|mut cx| async move { - let result = io_task.await; + let result = futures::select! { + result = stdin_task.fuse() => { + result.context("stdin") + } + result = stdout_task.fuse() => { + result.context("stdout") + } + result = stderr_task.fuse() => { + result.context("stderr") + } + }; match result { - Ok(Some(exit_code)) => { + Ok(_) => { + let exit_code = ssh_proxy_process.status().await?.code().unwrap_or(1); + if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) { match error { ProxyLaunchError::ServerNotRunning => { @@ -1025,7 +1022,6 @@ impl SshRemoteClient { })?; } } - Ok(None) => {} Err(error) => { log::warn!("ssh io task died with error: {:?}. reconnecting...", error); this.update(&mut cx, |this, cx| { @@ -1033,6 +1029,7 @@ impl SshRemoteClient { })?; } } + Ok(()) }) } @@ -1206,6 +1203,7 @@ impl SshRemoteConnection { delegate: Arc, cx: &mut AsyncAppContext, ) -> Result { + use futures::AsyncWriteExt as _; use futures::{io::BufReader, AsyncBufReadExt as _}; use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener}; use util::ResultExt as _; diff --git a/crates/remote_server/src/unix.rs b/crates/remote_server/src/unix.rs index 60b7fc458d0b43..30b2bacd0a6133 100644 --- a/crates/remote_server/src/unix.rs +++ b/crates/remote_server/src/unix.rs @@ -12,6 +12,7 @@ use language::LanguageRegistry; use node_runtime::{NodeBinaryOptions, NodeRuntime}; use paths::logs_dir; use project::project_settings::ProjectSettings; + use remote::proxy::ProxyLaunchError; use remote::ssh_session::ChannelClient; use remote::{ @@ -213,19 +214,27 @@ fn start_server( let mut input_buffer = Vec::new(); let mut output_buffer = Vec::new(); + + let (mut stdin_msg_tx, mut stdin_msg_rx) = mpsc::unbounded::(); + cx.background_executor().spawn(async move { + while let Ok(msg) = read_message(&mut stdin_stream, &mut input_buffer).await { + if let Err(_) = stdin_msg_tx.send(msg).await { + break; + } + } + }).detach(); + loop { + select_biased! { _ = app_quit_rx.next().fuse() => { return anyhow::Ok(()); } - stdin_message = read_message(&mut stdin_stream, &mut input_buffer).fuse() => { - let message = match stdin_message { - Ok(message) => message, - Err(error) => { - log::warn!("error reading message on stdin: {}. exiting.", error); - break; - } + stdin_message = stdin_msg_rx.next().fuse() => { + let Some(message) = stdin_message else { + log::warn!("error reading message on stdin. exiting."); + break; }; if let Err(error) = incoming_tx.unbounded_send(message) { log::error!("failed to send message to application: {:?}. exiting.", error); From 3e0c5c10b73fe8db87780ab63c3be2ee995d6870 Mon Sep 17 00:00:00 2001 From: Vitaly Slobodin Date: Sat, 19 Oct 2024 00:52:17 +0200 Subject: [PATCH 4/4] lsp: Handle unregistration "textDocument/rename" from a server (#19427) Hi. While working on https://github.com/zed-industries/zed/pull/19230 I noticed that some servers send a request to unregistered the `textDocument/rename` capability. I thought it would be good to handle that message in Zed: ```plaintext [2024-10-18T21:25:07+02:00 WARN project::lsp_store] unhandled capability unregistration: Unregistration { id: "biome_rename", method: "textDocument/rename" } ``` So this pull request implements that. Thanks. Release Notes: - N/A --- crates/project/src/lsp_store.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index 61f959259d5ab8..fe0a6443bc8118 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -6057,6 +6057,16 @@ impl LspStore { ); })?; } + "textDocument/rename" => { + this.update(&mut cx, |this, _| { + if let Some(server) = this.language_server_for_id(server_id) + { + server.update_capabilities(|capabilities| { + capabilities.rename_provider = None + }) + } + })?; + } "textDocument/rangeFormatting" => { this.update(&mut cx, |this, _| { if let Some(server) = this.language_server_for_id(server_id)