Skip to content

Commit

Permalink
Merge pull request #7 from Myriad-Dreamin/shutdown-sender
Browse files Browse the repository at this point in the history
feat: correctly drop sender after the server shutting down
  • Loading branch information
Myriad-Dreamin authored Mar 11, 2024
2 parents 746afdd + 92b1789 commit 9823dfa
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
29 changes: 23 additions & 6 deletions crates/tinymist/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use lsp_types::notification::{Notification as NotificationTrait, PublishDiagnost
use lsp_types::request::{RegisterCapability, UnregisterCapability, WorkspaceConfiguration};
use lsp_types::*;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use paste::paste;
use query::MemoryFileMeta;
use serde_json::{Map, Value as JsonValue};
Expand Down Expand Up @@ -79,13 +79,13 @@ type ReqQueue = lsp_server::ReqQueue<(String, Instant), ReqHandler>;
/// The host for the language server, or known as the LSP client.
#[derive(Debug, Clone)]
pub struct LspHost {
sender: crossbeam_channel::Sender<Message>,
sender: Arc<RwLock<Option<crossbeam_channel::Sender<Message>>>>,
req_queue: Arc<Mutex<ReqQueue>>,
}

impl LspHost {
/// Creates a new language server host.
pub fn new(sender: crossbeam_channel::Sender<Message>) -> Self {
pub fn new(sender: Arc<RwLock<Option<crossbeam_channel::Sender<Message>>>>) -> Self {
Self {
sender,
req_queue: Arc::new(Mutex::new(ReqQueue::default())),
Expand All @@ -98,10 +98,15 @@ impl LspHost {
handler: ReqHandler,
) {
let mut req_queue = self.req_queue.lock();
let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};
let request = req_queue
.outgoing
.register(R::METHOD.to_owned(), params, handler);
let Err(res) = self.sender.send(request.into()) else {
let Err(res) = sender.send(request.into()) else {
return;
};
warn!("failed to send request: {res:?}");
Expand All @@ -123,7 +128,13 @@ impl LspHost {

pub fn send_notification<N: lsp_types::notification::Notification>(&self, params: N::Params) {
let not = lsp_server::Notification::new(N::METHOD.to_owned(), params);
let Err(res) = self.sender.send(not.into()) else {

let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};
let Err(res) = sender.send(not.into()) else {
return;
};
warn!("failed to send notification: {res:?}");
Expand All @@ -143,6 +154,12 @@ impl LspHost {
pub fn respond(&self, response: lsp_server::Response) {
let mut req_queue = self.req_queue.lock();
if let Some((method, start)) = req_queue.incoming.complete(response.id.clone()) {
let sender = self.sender.read();
let Some(sender) = sender.as_ref() else {
warn!("closed connection, failed to send request");
return;
};

// if let Some(err) = &response.error {
// if err.message.starts_with("server panicked") {
// self.poke_rust_analyzer_developer(format!("{}, check the log",
Expand All @@ -154,7 +171,7 @@ impl LspHost {
"handled {} - ({}) in {:0.2?}",
method, response.id, duration
);
let Err(res) = self.sender.send(response.into()) else {
let Err(res) = sender.send(response.into()) else {
return;
};
warn!("failed to send response: {res:?}");
Expand Down
13 changes: 11 additions & 2 deletions crates/tinymist/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

mod args;

use std::io::{self, BufRead, Read, Write};
use std::{
io::{self, BufRead, Read, Write},
sync::Arc,
};

use clap::Parser;
use log::{info, trace, warn};
use lsp_types::{InitializeParams, InitializedParams};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use tinymist::{init::Init, transport::io_transport, LspHost};
use typst_ts_core::config::CompileOpts;
Expand Down Expand Up @@ -99,7 +103,8 @@ async fn main() -> anyhow::Result<()> {
};
let request_received = std::time::Instant::now();
trace!("InitializeParams: {initialize_params}");
let host = LspHost::new(connection.sender);
let sender = Arc::new(RwLock::new(Some(connection.sender)));
let host = LspHost::new(sender.clone());

let req = lsp_server::Request::new(initialize_id, "initialize".to_owned(), initialize_params);
host.register_request(&req, request_received);
Expand Down Expand Up @@ -168,6 +173,10 @@ async fn main() -> anyhow::Result<()> {

service.main_loop(connection.receiver)?;

// Drop it on the main thread
{
sender.write().take();
}
io_threads.join()?;
info!("server did shut down");
Ok(())
Expand Down
11 changes: 8 additions & 3 deletions crates/tinymist/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
thread,
};

use log::trace;
use log::{info, trace};

use crossbeam_channel::{bounded, Receiver, Sender};

Expand All @@ -29,9 +29,12 @@ pub fn io_transport<I: BufRead, O: Write>(
let (writer_sender, writer_receiver) = bounded::<Message>(0);
let writer = thread::spawn(move || {
let mut out = out();
writer_receiver
let res = writer_receiver
.into_iter()
.try_for_each(|it| it.write(&mut out))
.try_for_each(|it| it.write(&mut out));

info!("writer thread finished");
res
});
let (reader_sender, reader_receiver) = bounded::<Message>(0);
let reader = thread::spawn(move || {
Expand All @@ -48,6 +51,8 @@ pub fn io_transport<I: BufRead, O: Write>(
break;
}
}

info!("reader thread finished");
Ok(())
});
let threads = IoThreads { reader, writer };
Expand Down

0 comments on commit 9823dfa

Please sign in to comment.