From 3471bd3ce1a0038957e3089a6fb2c2e273034e41 Mon Sep 17 00:00:00 2001 From: max <36980911+pr2502@users.noreply.github.com> Date: Sun, 17 Mar 2024 18:56:43 +0100 Subject: [PATCH] cleanup some cobwebs around client disconnection --- src/client.rs | 38 +++++++++++++------------------ src/instance.rs | 60 ++++++++++++++++++++++++++++--------------------- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/src/client.rs b/src/client.rs index c273615..ca39f88 100644 --- a/src/client.rs +++ b/src/client.rs @@ -9,7 +9,7 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; use tokio::sync::mpsc::error::SendError; use tokio::sync::{mpsc, Mutex}; -use tokio::{select, task}; +use tokio::task; use tracing::{debug, error, info, warn, Instrument}; use uriparse::URI; @@ -78,6 +78,7 @@ pub async fn process( } } +#[derive(Clone)] pub struct Client { port: u16, sender: mpsc::Sender, @@ -101,10 +102,6 @@ impl Client { self.port } - pub fn is_connected(&self) -> bool { - !self.sender.is_closed() - } - /// Send a message to the client channel pub async fn send_message(&self, message: Message) -> Result<(), SendError> { self.sender.send(message).await @@ -259,11 +256,10 @@ async fn connect( info!("initialized client"); let (client, client_rx) = Client::new(port); - let (close_tx, close_rx) = mpsc::channel(1); - task::spawn(input_task(client_rx, close_rx, writer).in_current_span()); - instance.add_client(client).await; + task::spawn(input_task(client_rx, writer).in_current_span()); + instance.add_client(client.clone()).await; - task::spawn(output_task(reader, port, instance, close_tx).in_current_span()); + task::spawn(output_task(reader, client, instance).in_current_span()); Ok(()) } @@ -338,11 +334,7 @@ fn select_workspace_root<'a>( } /// Receive messages from channel and write them to the client input socket -async fn input_task( - mut rx: mpsc::Receiver, - mut close_rx: mpsc::Receiver, - mut writer: LspWriter, -) { +async fn input_task(mut rx: mpsc::Receiver, mut writer: LspWriter) { // Unlike the output task, here we first wait on the channel which is going // to block until the language server sends a notification, however if // we're the last client and have just closed the server is unlikely to send @@ -355,10 +347,7 @@ async fn input_task( // request was received but the client closed `close_rx` channel will be // dropped (unlike the normal rx channel which is shared) and the connection // will close without sending any response. - while let Some(message) = select! { - message = close_rx.recv() => message, - message = rx.recv() => message, - } { + while let Some(message) = rx.recv().await { if let Err(err) = writer.write_message(&message).await { match err.kind() { // ignore benign errors, treat as socket close @@ -376,9 +365,8 @@ async fn input_task( /// Read messages from client output socket and send them to the server channel async fn output_task( mut reader: LspReader>, - port: u16, + client: Client, instance: Arc, - close_tx: mpsc::Sender, ) { loop { let message = match reader.read_message().await { @@ -404,12 +392,12 @@ async fn output_task( // let res = ResponseSuccess::null(req.id); // Ignoring error because we would've closed the connection regardless - let _ = close_tx.send(res.into()).await; + let _ = client.send_message(res.into()).await; break; } Message::Request(mut req) => { - req.id = req.id.tag(Tag::Port(port)); + req.id = req.id.tag(Tag::Port(client.port)); if instance.send_message(req.into()).await.is_err() { break; } @@ -435,7 +423,7 @@ async fn output_task( } Message::Notification(notif) if notif.method == "textDocument/didOpen" => { - if let Err(err) = instance.open_file(port, notif.params).await { + if let Err(err) = instance.open_file(client.port, notif.params).await { warn!(?err, "error opening file"); } } @@ -447,4 +435,8 @@ async fn output_task( } } } + + if let Err(err) = instance.cleanup_client(client).await { + warn!(?err, "error cleaning up after a client"); + } } diff --git a/src/instance.rs b/src/instance.rs index bf11fbd..e0a3e40 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -95,26 +95,44 @@ impl Instance { let mut clients = self.clients.lock().await; let dyn_capabilities = self.dynamic_capabilities.lock().await; - // Register all currently cached dynamic capabilities. We will drop the - // client response and we need to make sure the request ID is unique. - let id = RequestId::String("replay:registerCapabilities".into()).tag(Tag::Drop); - let params = lsp::RegistrationParams { - registrations: dyn_capabilities.values().cloned().collect(), - }; - let req = Request { - id, - method: "client/registerCapability".into(), - params: serde_json::to_value(params).unwrap(), - jsonrpc: Version, - }; - debug!(?req, "replaying server request"); - let _ = client.send_message(req.into()).await; + if !dyn_capabilities.is_empty() { + // Register all currently cached dynamic capabilities if there are + // any. We will drop the client response and we need to make sure + // the request ID is unique. + let id = RequestId::String("replay:registerCapabilities".into()).tag(Tag::Drop); + let params = lsp::RegistrationParams { + registrations: dyn_capabilities.values().cloned().collect(), + }; + let req = Request { + id, + method: "client/registerCapability".into(), + params: serde_json::to_value(params).unwrap(), + jsonrpc: Version, + }; + debug!(?req, "replaying server request"); + let _ = client.send_message(req.into()).await; + } if clients.insert(client.port(), client).is_some() { unreachable!("BUG: added two clients with the same port"); } } + /// Send cleanup messages and remove remove client for client map + pub async fn cleanup_client(&self, client: Client) -> Result<()> { + debug!("cleaning up client"); + + if self.clients.lock().await.remove(&client.port()).is_none() { + // TODO This happens for example when the language server died while + // client was still connected, and the client cleanup is attempted + // with the instance being gone already. We should try notifying + // these clients immediately and handling the cleanup separately. + bail!("client was not connected"); + } + + Ok(()) + } + /// Send a message to the language server channel pub async fn send_message(&self, message: Message) -> Result<(), SendError> { self.server.send(message).await @@ -255,22 +273,14 @@ async fn gc_task( interval.tick().await; for (key, instance) in &instance_map.lock().await.0 { - let mut message_readers = instance.clients.lock().await; - - // Remove closed senders - // - // We have to check here because otherwise the senders only get - // removed when a message is sent to them which might leave them - // hanging forever if the language server is quiet and cause the - // GC to never trigger. - message_readers.retain(|_port, client| client.is_connected()); + let clients = instance.clients.lock().await; let idle = instance.idle(); - debug!(path = ?key.workspace_root, idle, readers = message_readers.len(), "check instance"); + debug!(path = ?key.workspace_root, idle, clients = clients.len(), "check instance"); if let Some(instance_timeout) = instance_timeout { // Close timed out instance - if idle > i64::from(instance_timeout) && message_readers.is_empty() { + if idle > i64::from(instance_timeout) && clients.is_empty() { info!(pid = instance.pid, path = ?key.workspace_root, idle, "instance timed out"); instance.close.notify_one(); }