Skip to content

Commit

Permalink
create a lsp transport writer wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
pr2502 committed Oct 17, 2023
1 parent 4e7b830 commit 3ac0808
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
7 changes: 4 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, info, trace, Instrument};
use crate::instance::{
InitializeCache, InstanceKey, InstanceRegistry, RaInstance, INIT_REQUEST_ID,
};
use crate::lsp::transport::{LspReader, Message};
use crate::lsp::transport::{LspReader, LspWriter, Message};
use crate::proto;

pub struct Client {
Expand Down Expand Up @@ -132,8 +132,9 @@ impl Client {
&self,
mut rx: mpsc::Receiver<Message>,
mut close_rx: mpsc::Receiver<Message>,
mut socket_write: OwnedWriteHalf,
socket_write: OwnedWriteHalf,
) {
let mut writer = LspWriter::new(socket_write);
task::spawn(
async move {
// unlike the output task, here we first wait on the channel which is going to
Expand All @@ -150,7 +151,7 @@ impl Client {
message = close_rx.recv() => message,
message = rx.recv() => message,
} {
if let Err(err) = message.to_writer(&mut socket_write).await {
if let Err(err) = writer.write_message(message).await {
match err.kind() {
// ignore benign errors, treat as socket close
ErrorKind::BrokenPipe => {}
Expand Down
10 changes: 4 additions & 6 deletions src/instance.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::io::ErrorKind;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str::{self, FromStr};
Expand All @@ -20,7 +18,7 @@ use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument

use crate::async_once_cell::AsyncOnceCell;
use crate::config::Config;
use crate::lsp::transport::{LspReader, Message};
use crate::lsp::transport::{LspReader, LspWriter, Message};
use crate::proto;

/// keeps track of the initialize/initialized handshake for an instance
Expand Down Expand Up @@ -350,15 +348,15 @@ impl RaInstance {
/// read messages sent by clients from a channel and write them into server stdin
fn spawn_stdin_task(self: &Arc<Self>, rx: mpsc::Receiver<Message>, stdin: ChildStdin) {
let mut receiver = rx;
let mut stdin = stdin;
let mut writer = LspWriter::new(stdin);

task::spawn(
async move {
// because we (stdin task) don't keep a reference to `self` it will be dropped when the
// child closes and all the clients disconnect including the sender and this receiver
// will not keep blocking (unlike in client input task)
while let Some(message) = receiver.recv().await {
if let Err(err) = message.to_writer(&mut stdin).await {
if let Err(err) = writer.write_message(message).await {
match err.kind() {
// stdin is closed, no need to log an error
ErrorKind::BrokenPipe => {}
Expand Down Expand Up @@ -405,7 +403,7 @@ impl RaInstance {
match exit {
Ok(status) => {
#[cfg(unix)]
let signal = status.signal();
let signal = std::os::unix::process::ExitStatusExt::signal(&status);
#[cfg(not(unix))]
let signal = tracing::field::Empty;

Expand Down
34 changes: 22 additions & 12 deletions src/lsp/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ where
}
}

pub struct LspWriter<W> {
writer: W,
}

/// LSP messages
#[derive(Clone)]
pub struct Message {
Expand All @@ -127,6 +131,24 @@ impl Debug for Message {
}
}

impl<W> LspWriter<W>
where
W: AsyncWrite + Unpin,
{
pub fn new(writer: W) -> Self {
LspWriter { writer }
}

/// serialize LSP message into a writer, prepending the appropriate content-length header
pub async fn write_message(&mut self, message: Message) -> io::Result<()> {
self.writer
.write_all(format!("Content-Length: {}\r\n\r\n", message.bytes.len()).as_bytes())
.await?;
self.writer.write_all(&message.bytes).await?;
self.writer.flush().await
}
}

impl Message {
/// construct a message from a byte buffer, should only contain the message body - no headers
pub fn from_bytes(bytes: &[u8]) -> Self {
Expand All @@ -145,16 +167,4 @@ impl Message {
serde_json::to_writer(&mut *buffer, json).expect("invalid json");
Self::from_bytes(&*buffer)
}

/// serialize LSP message into a writer, prepending the appropriate content-length header
pub async fn to_writer<W>(&self, mut writer: W) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
writer
.write_all(format!("Content-Length: {}\r\n\r\n", self.bytes.len()).as_bytes())
.await?;
writer.write_all(&self.bytes).await?;
writer.flush().await
}
}

0 comments on commit 3ac0808

Please sign in to comment.