diff --git a/Cargo.lock b/Cargo.lock index f32d24dbfb6c2f..2b51a987642276 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9119,7 +9119,6 @@ name = "remote" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "collections", "fs", "futures 0.3.30", @@ -14743,7 +14742,7 @@ dependencies = [ [[package]] name = "zed_elixir" -version = "0.1.0" +version = "0.1.1" dependencies = [ "zed_extension_api 0.1.0", ] diff --git a/crates/collab/src/tests/remote_editing_collaboration_tests.rs b/crates/collab/src/tests/remote_editing_collaboration_tests.rs index a23aadf5e02a52..dae33457555ec4 100644 --- a/crates/collab/src/tests/remote_editing_collaboration_tests.rs +++ b/crates/collab/src/tests/remote_editing_collaboration_tests.rs @@ -26,7 +26,7 @@ async fn test_sharing_an_ssh_remote_project( .await; // Set up project on remote FS - let (forwarder, server_ssh) = SshRemoteClient::fake_server(server_cx); + let (client_ssh, server_ssh) = SshRemoteClient::fake(cx_a, server_cx); let remote_fs = FakeFs::new(server_cx.executor()); remote_fs .insert_tree( @@ -67,7 +67,6 @@ async fn test_sharing_an_ssh_remote_project( ) }); - let client_ssh = SshRemoteClient::fake_client(forwarder, cx_a).await; let (project_a, worktree_id) = client_a .build_ssh_project("/code/project1", client_ssh, cx_a) .await; diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index 61f959259d5ab8..fe0a6443bc8118 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -6057,6 +6057,16 @@ impl LspStore { ); })?; } + "textDocument/rename" => { + this.update(&mut cx, |this, _| { + if let Some(server) = this.language_server_for_id(server_id) + { + server.update_capabilities(|capabilities| { + capabilities.rename_provider = None + }) + } + })?; + } "textDocument/rangeFormatting" => { this.update(&mut cx, |this, _| { if let Some(server) = this.language_server_for_id(server_id) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ea9e78cb7a914..70d5962647c487 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1243,10 +1243,6 @@ impl Project { self.client.clone() } - pub fn ssh_client(&self) -> Option> { - self.ssh_client.clone() - } - pub fn user_store(&self) -> Model { self.user_store.clone() } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index e089fcdebaa3f4..09891de6fecd3b 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -12,7 +12,6 @@ message Envelope { uint32 id = 1; optional uint32 responding_to = 2; optional PeerId original_sender_id = 3; - optional uint32 ack_id = 266; oneof payload { Hello hello = 4; @@ -296,9 +295,7 @@ message Envelope { OpenServerSettings open_server_settings = 263; GetPermalinkToLine get_permalink_to_line = 264; - GetPermalinkToLineResponse get_permalink_to_line_response = 265; - - FlushBufferedMessages flush_buffered_messages = 267; + GetPermalinkToLineResponse get_permalink_to_line_response = 265; // current max } reserved 87 to 88; @@ -2524,6 +2521,3 @@ message GetPermalinkToLine { message GetPermalinkToLineResponse { string permalink = 1; } - -message FlushBufferedMessages {} -message FlushBufferedMessagesResponse {} diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs index 2ce0c0df259d8d..4fdbfff81b6c4f 100644 --- a/crates/proto/src/macros.rs +++ b/crates/proto/src/macros.rs @@ -32,7 +32,6 @@ macro_rules! messages { responding_to, original_sender_id, payload: Some(envelope::Payload::$name(self)), - ack_id: None, } } diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 8179473feaa5f4..ffbbeb49c2b48a 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -372,7 +372,6 @@ messages!( (OpenServerSettings, Foreground), (GetPermalinkToLine, Foreground), (GetPermalinkToLineResponse, Foreground), - (FlushBufferedMessages, Foreground), ); request_messages!( @@ -499,7 +498,6 @@ request_messages!( (RemoveWorktree, Ack), (OpenServerSettings, OpenBufferResponse), (GetPermalinkToLine, GetPermalinkToLineResponse), - (FlushBufferedMessages, Ack), ); entity_messages!( diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 937a69ee592787..b8c5f34cc5854a 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -19,7 +19,6 @@ test-support = ["fs/test-support"] [dependencies] anyhow.workspace = true -async-trait.workspace = true collections.workspace = true fs.workspace = true futures.workspace = true diff --git a/crates/remote/src/protocol.rs b/crates/remote/src/protocol.rs index 311385f73b15b0..787094781d831b 100644 --- a/crates/remote/src/protocol.rs +++ b/crates/remote/src/protocol.rs @@ -2,7 +2,6 @@ use anyhow::Result; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use prost::Message as _; use rpc::proto::Envelope; -use std::mem::size_of; #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct MessageId(pub u32); @@ -30,8 +29,10 @@ pub async fn read_message( ) -> Result { buffer.resize(MESSAGE_LEN_SIZE, 0); stream.read_exact(buffer).await?; + let len = message_len_from_buffer(buffer); - read_message_with_len(stream, buffer, len).await + let result = read_message_with_len(stream, buffer, len).await; + result } pub async fn write_message( diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index 53621272b523f7..2dc5b0fc217b88 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -6,7 +6,6 @@ use crate::{ proxy::ProxyLaunchError, }; use anyhow::{anyhow, Context as _, Result}; -use async_trait::async_trait; use collections::HashMap; use futures::{ channel::{ @@ -14,8 +13,7 @@ use futures::{ oneshot, }, future::BoxFuture, - select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt, - StreamExt as _, + select_biased, AsyncReadExt as _, Future, FutureExt as _, SinkExt, StreamExt as _, }; use gpui::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task, @@ -32,7 +30,6 @@ use smol::{ }; use std::{ any::TypeId, - collections::VecDeque, ffi::OsStr, fmt, ops::ControlFlow, @@ -278,7 +275,7 @@ async fn run_cmd(command: &mut process::Command) -> Result { } } -pub struct ChannelForwarder { +struct ChannelForwarder { quit_tx: UnboundedSender<()>, forwarding_task: Task<(UnboundedSender, UnboundedReceiver)>, } @@ -349,7 +346,7 @@ const MAX_RECONNECT_ATTEMPTS: usize = 3; enum State { Connecting, Connected { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -359,7 +356,7 @@ enum State { HeartbeatMissed { missed_heartbeats: usize, - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -368,7 +365,7 @@ enum State { }, Reconnecting, ReconnectFailed { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -394,11 +391,11 @@ impl fmt::Display for State { } impl State { - fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> { + fn ssh_connection(&self) -> Option<&SshRemoteConnection> { match self { - Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()), + Self::Connected { ssh_connection, .. } => Some(ssh_connection), + Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection), + Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection), _ => None, } } @@ -544,19 +541,23 @@ impl SshRemoteClient { let (proxy, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); - let (ssh_connection, io_task) = Self::establish_connection( + let (ssh_connection, ssh_proxy_process) = Self::establish_connection( unique_identifier, false, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) .await?; - let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.downgrade(), + ssh_proxy_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { log::error!("failed to establish connection: {}", error); @@ -702,24 +703,30 @@ impl SshRemoteClient { }; } - if let Err(error) = ssh_connection.kill().await.context("Failed to kill ssh process") { + if let Err(error) = ssh_connection.master_process.kill() { failed!(error, attempts, ssh_connection, delegate, forwarder); }; - let connection_options = ssh_connection.connection_options(); + if let Err(error) = ssh_connection + .master_process + .status() + .await + .context("Failed to kill ssh process") + { + failed!(error, attempts, ssh_connection, delegate, forwarder); + } + + let connection_options = ssh_connection.socket.connection_options.clone(); let (incoming_tx, outgoing_rx) = forwarder.into_channels().await; let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1); - let (ssh_connection, io_task) = match Self::establish_connection( + let (ssh_connection, ssh_process) = match Self::establish_connection( identifier, true, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) @@ -731,9 +738,16 @@ impl SshRemoteClient { } }; - let multiplex_task = Self::monitor(this.clone(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.clone(), + ssh_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); - if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await { + if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { failed!(error, attempts, ssh_connection, delegate, forwarder); }; @@ -897,108 +911,101 @@ impl SshRemoteClient { } fn multiplex( + this: WeakModel, mut ssh_proxy_process: Child, incoming_tx: UnboundedSender, mut outgoing_rx: UnboundedReceiver, mut connection_activity_tx: Sender<()>, cx: &AsyncAppContext, - ) -> Task>> { + ) -> Task> { let mut child_stderr = ssh_proxy_process.stderr.take().unwrap(); let mut child_stdout = ssh_proxy_process.stdout.take().unwrap(); let mut child_stdin = ssh_proxy_process.stdin.take().unwrap(); - cx.background_executor().spawn(async move { - let mut stdin_buffer = Vec::new(); - let mut stdout_buffer = Vec::new(); - let mut stderr_buffer = Vec::new(); - let mut stderr_offset = 0; + let mut stdin_buffer = Vec::new(); + let mut stdout_buffer = Vec::new(); + let mut stderr_buffer = Vec::new(); + let mut stderr_offset = 0; - loop { - stdout_buffer.resize(MESSAGE_LEN_SIZE, 0); - stderr_buffer.resize(stderr_offset + 1024, 0); + let stdin_task = cx.background_executor().spawn(async move { + while let Some(outgoing) = outgoing_rx.next().await { + write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?; + } + anyhow::Ok(()) + }); - select_biased! { - outgoing = outgoing_rx.next().fuse() => { - let Some(outgoing) = outgoing else { - return anyhow::Ok(None); - }; + let stdout_task = cx.background_executor().spawn({ + let mut connection_activity_tx = connection_activity_tx.clone(); + async move { + loop { + stdout_buffer.resize(MESSAGE_LEN_SIZE, 0); + let len = child_stdout.read(&mut stdout_buffer).await?; - write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?; + if len == 0 { + return anyhow::Ok(()); } - result = child_stdout.read(&mut stdout_buffer).fuse() => { - match result { - Ok(0) => { - child_stdin.close().await?; - outgoing_rx.close(); - let status = ssh_proxy_process.status().await?; - // If we don't have a code, we assume process - // has been killed and treat it as non-zero exit - // code - return Ok(status.code().or_else(|| Some(1))); - } - Ok(len) => { - if len < stdout_buffer.len() { - child_stdout.read_exact(&mut stdout_buffer[len..]).await?; - } - - let message_len = message_len_from_buffer(&stdout_buffer); - match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await { - Ok(envelope) => { - connection_activity_tx.try_send(()).ok(); - incoming_tx.unbounded_send(envelope).ok(); - } - Err(error) => { - log::error!("error decoding message {error:?}"); - } - } - } - Err(error) => { - Err(anyhow!("error reading stdout: {error:?}"))?; - } - } + if len < MESSAGE_LEN_SIZE { + child_stdout.read_exact(&mut stdout_buffer[len..]).await?; } - result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => { - match result { - Ok(len) => { - stderr_offset += len; - let mut start_ix = 0; - while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') { - let line_ix = start_ix + ix; - let content = &stderr_buffer[start_ix..line_ix]; - start_ix = line_ix + 1; - if let Ok(record) = serde_json::from_slice::(content) { - record.log(log::logger()) - } else { - eprintln!("(remote) {}", String::from_utf8_lossy(content)); - } - } - stderr_buffer.drain(0..start_ix); - stderr_offset -= start_ix; + let message_len = message_len_from_buffer(&stdout_buffer); + let envelope = + read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len) + .await?; + connection_activity_tx.try_send(()).ok(); + incoming_tx.unbounded_send(envelope).ok(); + } + } + }); - connection_activity_tx.try_send(()).ok(); - } - Err(error) => { - Err(anyhow!("error reading stderr: {error:?}"))?; - } - } + let stderr_task: Task> = cx.background_executor().spawn(async move { + loop { + stderr_buffer.resize(stderr_offset + 1024, 0); + + let len = child_stderr + .read(&mut stderr_buffer[stderr_offset..]) + .await?; + + stderr_offset += len; + let mut start_ix = 0; + while let Some(ix) = stderr_buffer[start_ix..stderr_offset] + .iter() + .position(|b| b == &b'\n') + { + let line_ix = start_ix + ix; + let content = &stderr_buffer[start_ix..line_ix]; + start_ix = line_ix + 1; + if let Ok(record) = serde_json::from_slice::(content) { + record.log(log::logger()) + } else { + eprintln!("(remote) {}", String::from_utf8_lossy(content)); } } + stderr_buffer.drain(0..start_ix); + stderr_offset -= start_ix; + + connection_activity_tx.try_send(()).ok(); } - }) - } + }); - fn monitor( - this: WeakModel, - io_task: Task>>, - cx: &AsyncAppContext, - ) -> Task> { cx.spawn(|mut cx| async move { - let result = io_task.await; + let result = futures::select! { + result = stdin_task.fuse() => { + result.context("stdin") + } + result = stdout_task.fuse() => { + result.context("stdout") + } + result = stderr_task.fuse() => { + result.context("stderr") + } + }; match result { - Ok(Some(exit_code)) => { + Ok(_) => { + let exit_code = ssh_proxy_process.status().await?.code().unwrap_or(1); + if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) { match error { ProxyLaunchError::ServerNotRunning => { @@ -1016,7 +1023,6 @@ impl SshRemoteClient { })?; } } - Ok(None) => {} Err(error) => { log::warn!("ssh io task died with error: {:?}. reconnecting...", error); this.update(&mut cx, |this, cx| { @@ -1024,6 +1030,7 @@ impl SshRemoteClient { })?; } } + Ok(()) }) } @@ -1052,40 +1059,21 @@ impl SshRemoteClient { cx.notify(); } - #[allow(clippy::too_many_arguments)] async fn establish_connection( unique_identifier: String, reconnect: bool, connection_options: SshConnectionOptions, - proxy_incoming_tx: UnboundedSender, - proxy_outgoing_rx: UnboundedReceiver, - connection_activity_tx: Sender<()>, delegate: Arc, cx: &mut AsyncAppContext, - ) -> Result<(Box, Task>>)> { - #[cfg(any(test, feature = "test-support"))] - if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) { - let io_task = fake::SshRemoteConnection::multiplex( - fake.connection_options(), - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - cx, - ) - .await; - return Ok((fake, io_task)); - } - + ) -> Result<(SshRemoteConnection, Child)> { let ssh_connection = SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?; let platform = ssh_connection.query_platform().await?; let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?; - if !reconnect { - ssh_connection - .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) - .await?; - } + ssh_connection + .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) + .await?; let socket = ssh_connection.socket.clone(); run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?; @@ -1110,15 +1098,7 @@ impl SshRemoteClient { .spawn() .context("failed to spawn remote server")?; - let io_task = Self::multiplex( - ssh_proxy_process, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - &cx, - ); - - Ok((Box::new(ssh_connection), io_task)) + Ok((ssh_connection, ssh_proxy_process)) } pub fn subscribe_to_entity(&self, remote_id: u64, entity: &Model) { @@ -1130,7 +1110,7 @@ impl SshRemoteClient { .lock() .as_ref() .and_then(|state| state.ssh_connection()) - .map(|ssh_connection| ssh_connection.ssh_args()) + .map(|ssh_connection| ssh_connection.socket.ssh_args()) } pub fn proto_client(&self) -> AnyProtoClient { @@ -1145,6 +1125,7 @@ impl SshRemoteClient { self.connection_options.clone() } + #[cfg(not(any(test, feature = "test-support")))] pub fn connection_state(&self) -> ConnectionState { self.state .lock() @@ -1153,24 +1134,9 @@ impl SshRemoteClient { .unwrap_or(ConnectionState::Disconnected) } - pub fn is_disconnected(&self) -> bool { - self.connection_state() == ConnectionState::Disconnected - } - #[cfg(any(test, feature = "test-support"))] - pub fn simulate_disconnect(&self, cx: &mut AppContext) -> Task<()> { - use gpui::BorrowAppContext; - - let port = self.connection_options().port.unwrap(); - - let disconnect = - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.take(port).into_channels()); - cx.spawn(|mut cx| async move { - let (input_rx, output_tx) = disconnect.await; - let (forwarder, _, _) = ChannelForwarder::new(input_rx, output_tx, &mut cx); - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.replace(port, forwarder)) - .unwrap() - }) + pub fn connection_state(&self) -> ConnectionState { + ConnectionState::Connected } #[cfg(any(test, feature = "test-support"))] @@ -1189,33 +1155,35 @@ impl SshRemoteClient { let client = ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"); (forwarder, client) }) + + } + + pub fn is_disconnected(&self) -> bool { + self.connection_state() == ConnectionState::Disconnected } #[cfg(any(test, feature = "test-support"))] - pub async fn fake_client( - forwarder: ChannelForwarder, + pub fn fake( client_cx: &mut gpui::TestAppContext, - ) -> Model { - use gpui::BorrowAppContext; - client_cx - .update(|cx| { - let port = cx.update_default_global(|c: &mut fake::GlobalConnections, _cx| { - c.push(forwarder) - }); + server_cx: &mut gpui::TestAppContext, + ) -> (Model, Arc) { + use gpui::Context; - Self::new( - "fake".to_string(), - SshConnectionOptions { - host: "".to_string(), - port: Some(port), - ..Default::default() - }, - Arc::new(fake::Delegate), - cx, - ) - }) - .await - .unwrap() + let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded(); + let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded(); + + ( + client_cx.update(|cx| { + let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx); + cx.new_model(|_| Self { + client, + unique_identifier: "fake".to_string(), + connection_options: SshConnectionOptions::default(), + state: Arc::new(Mutex::new(None)), + }) + }), + server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)), + ) } } @@ -1225,13 +1193,6 @@ impl From for AnyProtoClient { } } -#[async_trait] -trait SshRemoteProcess: Send + Sync { - async fn kill(&mut self) -> Result<()>; - fn ssh_args(&self) -> Vec; - fn connection_options(&self) -> SshConnectionOptions; -} - struct SshRemoteConnection { socket: SshSocket, master_process: process::Child, @@ -1246,25 +1207,6 @@ impl Drop for SshRemoteConnection { } } -#[async_trait] -impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - self.master_process.kill()?; - - self.master_process.status().await?; - - Ok(()) - } - - fn ssh_args(&self) -> Vec { - self.socket.ssh_args() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.socket.connection_options.clone() - } -} - impl SshRemoteConnection { #[cfg(not(unix))] async fn new( @@ -1281,6 +1223,7 @@ impl SshRemoteConnection { delegate: Arc, cx: &mut AsyncAppContext, ) -> Result { + use futures::AsyncWriteExt as _; use futures::{io::BufReader, AsyncBufReadExt as _}; use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener}; use util::ResultExt as _; @@ -1547,10 +1490,8 @@ type ResponseChannels = Mutex, - buffer: Mutex>, - response_channels: ResponseChannels, - message_handlers: Mutex, - max_received: AtomicU32, + response_channels: ResponseChannels, // Lock + message_handlers: Mutex, // Lock } impl ChannelClient { @@ -1563,10 +1504,8 @@ impl ChannelClient { let this = Arc::new(Self { outgoing_tx, next_message_id: AtomicU32::new(0), - max_received: AtomicU32::new(0), response_channels: ResponseChannels::default(), message_handlers: Default::default(), - buffer: Mutex::new(VecDeque::new()), }); Self::start_handling_messages(this.clone(), incoming_rx, cx, name); @@ -1588,6 +1527,7 @@ impl ChannelClient { let Some(this) = this.upgrade() else { return anyhow::Ok(()); }; + if let Some(ack_id) = incoming.ack_id { let mut buffer = this.buffer.lock(); while buffer.front().is_some_and(|msg| msg.id <= ack_id) { @@ -1687,23 +1627,6 @@ impl ChannelClient { } } - pub async fn resync(&self, timeout: Duration) -> Result<()> { - smol::future::or( - async { - self.request(proto::FlushBufferedMessages {}).await?; - for envelope in self.buffer.lock().iter() { - self.outgoing_tx.unbounded_send(envelope.clone()).ok(); - } - Ok(()) - }, - async { - smol::Timer::after(timeout).await; - Err(anyhow!("Timeout detected")) - }, - ) - .await - } - pub async fn ping(&self, timeout: Duration) -> Result<()> { smol::future::or( async { @@ -1733,8 +1656,7 @@ impl ChannelClient { let mut response_channels_lock = self.response_channels.lock(); response_channels_lock.insert(MessageId(envelope.id), tx); drop(response_channels_lock); - - let result = self.send_buffered(envelope); + let result = self.outgoing_tx.unbounded_send(envelope); async move { if let Err(error) = &result { log::error!("failed to send message: {}", error); @@ -1751,12 +1673,6 @@ impl ChannelClient { pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> { envelope.id = self.next_message_id.fetch_add(1, SeqCst); - self.send_buffered(envelope) - } - - pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> { - envelope.ack_id = Some(self.max_received.load(SeqCst)); - self.buffer.lock().push_back(envelope.clone()); self.outgoing_tx.unbounded_send(envelope)?; Ok(()) } @@ -1787,165 +1703,3 @@ impl ProtoClient for ChannelClient { false } } - -#[cfg(any(test, feature = "test-support"))] -mod fake { - use std::path::PathBuf; - - use anyhow::Result; - use async_trait::async_trait; - use futures::{ - channel::{ - mpsc::{self, Sender}, - oneshot, - }, - select_biased, FutureExt, SinkExt, StreamExt, - }; - use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task}; - use rpc::proto::Envelope; - - use super::{ - ChannelForwarder, SshClientDelegate, SshConnectionOptions, SshPlatform, SshRemoteProcess, - }; - - pub(super) struct SshRemoteConnection { - connection_options: SshConnectionOptions, - } - - impl SshRemoteConnection { - pub(super) fn new( - connection_options: &SshConnectionOptions, - ) -> Option> { - if connection_options.host == "" { - return Some(Box::new(Self { - connection_options: connection_options.clone(), - })); - } - return None; - } - pub(super) async fn multiplex( - connection_options: SshConnectionOptions, - mut client_tx: mpsc::UnboundedSender, - mut client_rx: mpsc::UnboundedReceiver, - mut connection_activity_tx: Sender<()>, - cx: &mut AsyncAppContext, - ) -> Task>> { - let (server_tx, server_rx) = cx - .update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.take(connection_options.port.unwrap()) - }) - }) - .unwrap() - .into_channels() - .await; - - let (forwarder, mut proxy_tx, mut proxy_rx) = - ChannelForwarder::new(server_tx, server_rx, cx); - - cx.update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.replace(connection_options.port.unwrap(), forwarder) - }) - }) - .unwrap(); - - cx.background_executor().spawn(async move { - loop { - select_biased! { - server_to_client = proxy_rx.next().fuse() => { - let Some(server_to_client) = server_to_client else { - return Ok(Some(1)) - }; - connection_activity_tx.try_send(()).ok(); - client_tx.send(server_to_client).await.ok(); - } - client_to_server = client_rx.next().fuse() => { - let Some(client_to_server) = client_to_server else { - return Ok(None) - }; - proxy_tx.send(client_to_server).await.ok(); - - } - } - } - }) - } - } - - #[async_trait] - impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - Ok(()) - } - - fn ssh_args(&self) -> Vec { - Vec::new() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.connection_options.clone() - } - } - - #[derive(Default)] - pub(super) struct GlobalConnections(Vec>); - impl Global for GlobalConnections {} - - impl GlobalConnections { - pub(super) fn push(&mut self, forwarder: ChannelForwarder) -> u16 { - self.0.push(Some(forwarder)); - self.0.len() as u16 - 1 - } - - pub(super) fn take(&mut self, port: u16) -> ChannelForwarder { - self.0 - .get_mut(port as usize) - .expect("no fake server for port") - .take() - .expect("fake server is already borrowed") - } - pub(super) fn replace(&mut self, port: u16, forwarder: ChannelForwarder) { - let ret = self - .0 - .get_mut(port as usize) - .expect("no fake server for port") - .replace(forwarder); - if ret.is_some() { - panic!("fake server is already replaced"); - } - } - } - - pub(super) struct Delegate; - - impl SshClientDelegate for Delegate { - fn ask_password( - &self, - _: String, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn remote_server_binary_path( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> Result { - unreachable!() - } - fn get_server_binary( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) { - unreachable!() - } - fn set_error(&self, _: String, _: &mut AsyncAppContext) { - unreachable!() - } - } -} diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 99bdb93eacdb4f..c072b972e71ff9 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -692,9 +692,9 @@ async fn init_test( cx: &mut TestAppContext, server_cx: &mut TestAppContext, ) -> (Model, Model, Arc) { + let (ssh_remote_client, ssh_server_client) = SshRemoteClient::fake(cx, server_cx); init_logger(); - let (forwarder, ssh_server_client) = SshRemoteClient::fake_server(server_cx); let fs = FakeFs::new(server_cx.executor()); fs.insert_tree( "/code", @@ -735,9 +735,8 @@ async fn init_test( cx, ) }); + let project = build_project(ssh_remote_client, cx); - let ssh = SshRemoteClient::fake_client(forwarder, cx).await; - let project = build_project(ssh, cx); project .update(cx, { let headless = headless.clone(); diff --git a/crates/remote_server/src/unix.rs b/crates/remote_server/src/unix.rs index 096d576e0b1e06..abfdb798f42376 100644 --- a/crates/remote_server/src/unix.rs +++ b/crates/remote_server/src/unix.rs @@ -12,6 +12,7 @@ use language::LanguageRegistry; use node_runtime::{NodeBinaryOptions, NodeRuntime}; use paths::logs_dir; use project::project_settings::ProjectSettings; + use remote::proxy::ProxyLaunchError; use remote::ssh_session::ChannelClient; use remote::{ @@ -213,19 +214,27 @@ fn start_server( let mut input_buffer = Vec::new(); let mut output_buffer = Vec::new(); + + let (mut stdin_msg_tx, mut stdin_msg_rx) = mpsc::unbounded::(); + cx.background_executor().spawn(async move { + while let Ok(msg) = read_message(&mut stdin_stream, &mut input_buffer).await { + if let Err(_) = stdin_msg_tx.send(msg).await { + break; + } + } + }).detach(); + loop { + select_biased! { _ = app_quit_rx.next().fuse() => { return anyhow::Ok(()); } - stdin_message = read_message(&mut stdin_stream, &mut input_buffer).fuse() => { - let message = match stdin_message { - Ok(message) => message, - Err(error) => { - log::warn!("error reading message on stdin: {}. exiting.", error); - break; - } + stdin_message = stdin_msg_rx.next().fuse() => { + let Some(message) = stdin_message else { + log::warn!("error reading message on stdin. exiting."); + break; }; if let Err(error) = incoming_tx.unbounded_send(message) { log::error!("failed to send message to application: {:?}. exiting.", error); diff --git a/extensions/elixir/Cargo.toml b/extensions/elixir/Cargo.toml index 174711b1ba1931..139d21f1c5e895 100644 --- a/extensions/elixir/Cargo.toml +++ b/extensions/elixir/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "zed_elixir" -version = "0.1.0" +version = "0.1.1" edition = "2021" publish = false license = "Apache-2.0" diff --git a/extensions/elixir/extension.toml b/extensions/elixir/extension.toml index 6f00c29a79650d..ba8a1f66872659 100644 --- a/extensions/elixir/extension.toml +++ b/extensions/elixir/extension.toml @@ -1,7 +1,7 @@ id = "elixir" name = "Elixir" description = "Elixir support." -version = "0.1.0" +version = "0.1.1" schema_version = 1 authors = ["Marshall Bowers "] repository = "https://github.com/zed-industries/zed"