Skip to content

Commit

Permalink
cleanup some cobwebs around client disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
pr2502 committed Mar 17, 2024
1 parent ae92954 commit 3471bd3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 48 deletions.
38 changes: 15 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, Mutex};
use tokio::{select, task};
use tokio::task;
use tracing::{debug, error, info, warn, Instrument};
use uriparse::URI;

Expand Down Expand Up @@ -78,6 +78,7 @@ pub async fn process(
}
}

#[derive(Clone)]
pub struct Client {
port: u16,
sender: mpsc::Sender<Message>,
Expand All @@ -101,10 +102,6 @@ impl Client {
self.port
}

pub fn is_connected(&self) -> bool {
!self.sender.is_closed()
}

/// Send a message to the client channel
pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
self.sender.send(message).await
Expand Down Expand Up @@ -259,11 +256,10 @@ async fn connect(
info!("initialized client");

let (client, client_rx) = Client::new(port);
let (close_tx, close_rx) = mpsc::channel(1);
task::spawn(input_task(client_rx, close_rx, writer).in_current_span());
instance.add_client(client).await;
task::spawn(input_task(client_rx, writer).in_current_span());
instance.add_client(client.clone()).await;

task::spawn(output_task(reader, port, instance, close_tx).in_current_span());
task::spawn(output_task(reader, client, instance).in_current_span());

Ok(())
}
Expand Down Expand Up @@ -338,11 +334,7 @@ fn select_workspace_root<'a>(
}

/// Receive messages from channel and write them to the client input socket
async fn input_task(
mut rx: mpsc::Receiver<Message>,
mut close_rx: mpsc::Receiver<Message>,
mut writer: LspWriter<OwnedWriteHalf>,
) {
async fn input_task(mut rx: mpsc::Receiver<Message>, mut writer: LspWriter<OwnedWriteHalf>) {
// Unlike the output task, here we first wait on the channel which is going
// to block until the language server sends a notification, however if
// we're the last client and have just closed the server is unlikely to send
Expand All @@ -355,10 +347,7 @@ async fn input_task(
// request was received but the client closed `close_rx` channel will be
// dropped (unlike the normal rx channel which is shared) and the connection
// will close without sending any response.
while let Some(message) = select! {
message = close_rx.recv() => message,
message = rx.recv() => message,
} {
while let Some(message) = rx.recv().await {
if let Err(err) = writer.write_message(&message).await {
match err.kind() {
// ignore benign errors, treat as socket close
Expand All @@ -376,9 +365,8 @@ async fn input_task(
/// Read messages from client output socket and send them to the server channel
async fn output_task(
mut reader: LspReader<BufReader<OwnedReadHalf>>,
port: u16,
client: Client,
instance: Arc<Instance>,
close_tx: mpsc::Sender<Message>,
) {
loop {
let message = match reader.read_message().await {
Expand All @@ -404,12 +392,12 @@ async fn output_task(
// <https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#shutdown>
let res = ResponseSuccess::null(req.id);
// Ignoring error because we would've closed the connection regardless
let _ = close_tx.send(res.into()).await;
let _ = client.send_message(res.into()).await;
break;
}

Message::Request(mut req) => {
req.id = req.id.tag(Tag::Port(port));
req.id = req.id.tag(Tag::Port(client.port));
if instance.send_message(req.into()).await.is_err() {
break;
}
Expand All @@ -435,7 +423,7 @@ async fn output_task(
}

Message::Notification(notif) if notif.method == "textDocument/didOpen" => {
if let Err(err) = instance.open_file(port, notif.params).await {
if let Err(err) = instance.open_file(client.port, notif.params).await {
warn!(?err, "error opening file");
}
}
Expand All @@ -447,4 +435,8 @@ async fn output_task(
}
}
}

if let Err(err) = instance.cleanup_client(client).await {
warn!(?err, "error cleaning up after a client");
}
}
60 changes: 35 additions & 25 deletions src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,44 @@ impl Instance {
let mut clients = self.clients.lock().await;
let dyn_capabilities = self.dynamic_capabilities.lock().await;

// Register all currently cached dynamic capabilities. We will drop the
// client response and we need to make sure the request ID is unique.
let id = RequestId::String("replay:registerCapabilities".into()).tag(Tag::Drop);
let params = lsp::RegistrationParams {
registrations: dyn_capabilities.values().cloned().collect(),
};
let req = Request {
id,
method: "client/registerCapability".into(),
params: serde_json::to_value(params).unwrap(),
jsonrpc: Version,
};
debug!(?req, "replaying server request");
let _ = client.send_message(req.into()).await;
if !dyn_capabilities.is_empty() {
// Register all currently cached dynamic capabilities if there are
// any. We will drop the client response and we need to make sure
// the request ID is unique.
let id = RequestId::String("replay:registerCapabilities".into()).tag(Tag::Drop);
let params = lsp::RegistrationParams {
registrations: dyn_capabilities.values().cloned().collect(),
};
let req = Request {
id,
method: "client/registerCapability".into(),
params: serde_json::to_value(params).unwrap(),
jsonrpc: Version,
};
debug!(?req, "replaying server request");
let _ = client.send_message(req.into()).await;
}

if clients.insert(client.port(), client).is_some() {
unreachable!("BUG: added two clients with the same port");
}
}

/// Send cleanup messages and remove remove client for client map
pub async fn cleanup_client(&self, client: Client) -> Result<()> {
debug!("cleaning up client");

if self.clients.lock().await.remove(&client.port()).is_none() {
// TODO This happens for example when the language server died while
// client was still connected, and the client cleanup is attempted
// with the instance being gone already. We should try notifying
// these clients immediately and handling the cleanup separately.
bail!("client was not connected");
}

Ok(())
}

/// Send a message to the language server channel
pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
self.server.send(message).await
Expand Down Expand Up @@ -255,22 +273,14 @@ async fn gc_task(
interval.tick().await;

for (key, instance) in &instance_map.lock().await.0 {
let mut message_readers = instance.clients.lock().await;

// Remove closed senders
//
// We have to check here because otherwise the senders only get
// removed when a message is sent to them which might leave them
// hanging forever if the language server is quiet and cause the
// GC to never trigger.
message_readers.retain(|_port, client| client.is_connected());
let clients = instance.clients.lock().await;

let idle = instance.idle();
debug!(path = ?key.workspace_root, idle, readers = message_readers.len(), "check instance");
debug!(path = ?key.workspace_root, idle, clients = clients.len(), "check instance");

if let Some(instance_timeout) = instance_timeout {
// Close timed out instance
if idle > i64::from(instance_timeout) && message_readers.is_empty() {
if idle > i64::from(instance_timeout) && clients.is_empty() {
info!(pid = instance.pid, path = ?key.workspace_root, idle, "instance timed out");
instance.close.notify_one();
}
Expand Down

0 comments on commit 3471bd3

Please sign in to comment.