Skip to content

Commit

Permalink
Fix build by upgrading dependencies (incl hyper 1.0) (#118)
Browse files Browse the repository at this point in the history
* Fix build by upgrading dependencies (incl hyper 1.0)

* Fix the hyper 1.0 stuff

* Fix tungstenite-axum compatibility issues

* Fix more awful issues in listen.rs

* Fix build
  • Loading branch information
ekzhang authored Feb 11, 2025
1 parent 34189bf commit d9775a7
Show file tree
Hide file tree
Showing 11 changed files with 501 additions and 375 deletions.
728 changes: 418 additions & 310 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ keywords = ["ssh", "share", "terminal", "collaborative"]
[workspace.dependencies]
anyhow = "1.0.62"
clap = { version = "4.5.17", features = ["derive", "env"] }
prost = "0.12.6"
prost = "0.13.4"
rand = "0.8.5"
serde = { version = "1.0.188", features = ["derive", "rc"] }
sshx-core = { version = "0.3.1", path = "crates/sshx-core" }
tokio = { version = "1.40.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tonic = { version = "0.11.0", features = ["tls", "tls-webpki-roots"] }
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

Expand Down
2 changes: 1 addition & 1 deletion crates/sshx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ serde.workspace = true
tonic.workspace = true

[build-dependencies]
tonic-build = "0.11.0"
tonic-build = "0.12.3"
2 changes: 1 addition & 1 deletion crates/sshx-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.file_descriptor_set_path(descriptor_path)
.bytes(["."])
.compile(&["proto/sshx.proto"], &["proto/"])?;
.compile_protos(&["proto/sshx.proto"], &["proto/"])?;
Ok(())
}
18 changes: 9 additions & 9 deletions crates/sshx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ edition = "2021"
anyhow.workspace = true
async-channel = "1.9.0"
async-stream = "0.3.5"
axum = { version = "0.6.20", features = ["ws"] }
axum = { version = "0.8.1", features = ["http2", "ws"] }
base64 = "0.21.4"
bytes = { version = "1.5.0", features = ["serde"] }
ciborium = "0.2.1"
clap.workspace = true
dashmap = "5.5.3"
deadpool = "0.10.0"
deadpool-redis = "0.13.0"
deadpool = "0.12.2"
deadpool-redis = "0.18.0"
futures-util = { version = "0.3.28", features = ["sink"] }
hmac = "0.12.1"
hyper = { version = "0.14.27", features = ["full"] }
hyper = { version = "1.6.0", features = ["full"] }
parking_lot = "0.12.1"
prost.workspace = true
rand.workspace = true
redis = { version = "0.23.3", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
redis = { version = "0.27.6", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
serde.workspace = true
sha2 = "0.10.7"
sshx-core.workspace = true
subtle = "2.5.0"
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.20.0"
tokio-tungstenite = "0.26.1"
tonic.workspace = true
tonic-reflection = "0.11.0"
tonic-reflection = "0.12.3"
tower = { version = "0.4.13", features = ["steer"] }
tower-http = { version = "0.4.4", features = ["fs", "redirect", "trace"] }
tower-http = { version = "0.6.2", features = ["fs", "redirect", "trace"] }
tracing.workspace = true
tracing-subscriber.workspace = true
zstd = "0.12.4"

[dev-dependencies]
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls"] }
reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls"] }
sshx = { path = "../sshx" }
24 changes: 19 additions & 5 deletions crates/sshx-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]

use std::{net::SocketAddr, sync::Arc};
use std::{fmt::Debug, net::SocketAddr, sync::Arc};

use anyhow::Result;
use hyper::server::conn::AddrIncoming;
use axum::serve::{Listener, ListenerExt};
use tokio::net::TcpListener;
use tracing::debug;
use utils::Shutdown;

use crate::state::ServerState;
Expand Down Expand Up @@ -65,7 +67,11 @@ impl Server {
}

/// Run the application server, listening on a stream of connections.
pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
pub async fn listen<L>(&self, listener: L) -> Result<()>
where
L: Listener,
L::Addr: Debug,
{
let state = self.state.clone();
let terminated = self.shutdown.wait();
tokio::spawn(async move {
Expand All @@ -79,12 +85,20 @@ impl Server {
}
});

listen::start_server(self.state(), incoming, self.shutdown.wait()).await
listen::start_server(self.state(), listener, self.shutdown.wait()).await
}

/// Convenience function to call [`Server::listen`] bound to a TCP address.
///
/// This also sets `TCP_NODELAY` on the incoming connections for performance
/// reasons, as a reasonable default.
pub async fn bind(&self, addr: &SocketAddr) -> Result<()> {
self.listen(AddrIncoming::bind(addr)?).await
let listener = TcpListener::bind(addr).await?.tap_io(|tcp_stream| {
if let Err(err) = tcp_stream.set_nodelay(true) {
debug!("failed to set TCP_NODELAY on incoming connection: {err:#}");
}
});
self.listen(listener).await
}

/// Send a graceful shutdown signal to the server.
Expand Down
56 changes: 24 additions & 32 deletions crates/sshx-server/src/listen.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use std::{error::Error as StdError, future::Future, sync::Arc};
use std::{fmt::Debug, future::Future, sync::Arc};

use anyhow::Result;
use axum::body::HttpBody;
use hyper::{
header::CONTENT_TYPE,
server::{conn::AddrIncoming, Server as HyperServer},
service::make_service_fn,
Body, Request,
};
use axum::body::Body;
use axum::serve::Listener;
use hyper::{header::CONTENT_TYPE, Request};
use sshx_core::proto::{sshx_service_server::SshxServiceServer, FILE_DESCRIPTOR_SET};
use tonic::transport::Server as TonicServer;
use tower::{steer::Steer, ServiceBuilder, ServiceExt};
use tonic::service::Routes as TonicRoutes;
use tower::{make::Shared, steer::Steer, ServiceExt};
use tower_http::trace::TraceLayer;

use crate::{grpc::GrpcServer, web, ServerState};
Expand All @@ -19,33 +15,34 @@ use crate::{grpc::GrpcServer, web, ServerState};
///
/// This internal method is responsible for multiplexing the HTTP and gRPC
/// servers onto a single, consolidated `hyper` service.
pub(crate) async fn start_server(
pub(crate) async fn start_server<L>(
state: Arc<ServerState>,
incoming: AddrIncoming,
signal: impl Future<Output = ()>,
) -> Result<()> {
type BoxError = Box<dyn StdError + Send + Sync>;

listener: L,
signal: impl Future<Output = ()> + Send + 'static,
) -> Result<()>
where
L: Listener,
L::Addr: Debug,
{
let http_service = web::app()
.with_state(state.clone())
.layer(TraceLayer::new_for_http())
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
.map_err(BoxError::from)
.into_service()
.boxed_clone();

let grpc_service = TonicServer::builder()
let grpc_service = TonicRoutes::default()
.add_service(SshxServiceServer::new(GrpcServer::new(state)))
.add_service(
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build()?,
.build_v1()?,
)
.into_service();

let grpc_service = ServiceBuilder::new()
.into_axum_router()
.layer(TraceLayer::new_for_grpc())
.service(grpc_service)
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
.into_service()
// This type conversion is necessary because Tonic 0.12 uses Axum 0.7, so its `axum::Router`
// and `axum::Body` are based on an older `axum_core` version.
.map_response(|r| r.map(Body::new))
.boxed_clone();

let svc = Steer::new(
Expand All @@ -58,14 +55,9 @@ pub(crate) async fn start_server(
}
},
);
let make_svc = make_service_fn(move |_| {
let svc = svc.clone();
async { Ok::<_, std::convert::Infallible>(svc) }
});
let make_svc = Shared::new(svc);

HyperServer::builder(incoming)
.tcp_nodelay(true)
.serve(make_svc)
axum::serve(listener, make_svc)
.with_graceful_shutdown(signal)
.await?;

Expand Down
19 changes: 14 additions & 5 deletions crates/sshx-server/src/state/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::{pin::pin, sync::Arc, time::Duration};

use anyhow::Result;
use deadpool::managed::Manager;
use redis::AsyncCommands;
use tokio::time;
use tokio_stream::{Stream, StreamExt};
Expand All @@ -19,7 +18,7 @@ const STORAGE_EXPIRY: Duration = Duration::from_secs(300);

fn set_opts() -> redis::SetOptions {
redis::SetOptions::default()
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as usize))
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as u64))
}

/// Communication with a distributed mesh of sshx server nodes.
Expand All @@ -33,6 +32,7 @@ fn set_opts() -> redis::SetOptions {
#[derive(Clone)]
pub struct StorageMesh {
redis: deadpool_redis::Pool,
redis_pubsub: redis::Client,
host: Option<String>,
}

Expand All @@ -46,8 +46,18 @@ impl StorageMesh {
.runtime(deadpool_redis::Runtime::Tokio1)
.build()?;

// Separate `redis::Client` just for pub/sub connections.
//
// At time of writing, deadpool-redis has not been updated to support the new
// pub/sub client APIs in Rust. This is a temporary workaround that creates a
// new Redis client on the side, bypassing the connection pool.
//
// Reference: https://github.com/deadpool-rs/deadpool/issues/226
let redis_pubsub = redis::Client::open(redis_url)?;

Ok(Self {
redis,
redis_pubsub,
host: host.map(|s| s.to_string()),
})
}
Expand Down Expand Up @@ -161,15 +171,14 @@ impl StorageMesh {

loop {
// Requires an owned, non-pool connection for ownership reasons.
let conn = match self.redis.manager().create().await {
Ok(conn) => conn,
let mut pubsub = match self.redis_pubsub.get_async_pubsub().await {
Ok(pubsub) => pubsub,
Err(err) => {
error!(?err, "failed to connect to redis for pub/sub");
time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut pubsub = conn.into_pubsub();
if let Err(err) = pubsub.subscribe(format!("transfers:{host}")).await {
error!(?err, "failed to subscribe to transfers");
time::sleep(Duration::from_secs(1)).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sshx-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ pub fn app() -> Router<Arc<ServerState>> {

/// Routes for the backend web API server.
fn backend() -> Router<Arc<ServerState>> {
Router::new().route("/s/:name", get(socket::get_session_ws))
Router::new().route("/s/{name}", get(socket::get_session_ws))
}
10 changes: 5 additions & 5 deletions crates/sshx-server/src/web/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn handle_socket(socket: &mut WebSocket, session: Arc<Session>) -> Result<
async fn send(socket: &mut WebSocket, msg: WsServer) -> Result<()> {
let mut buf = Vec::new();
ciborium::ser::into_writer(&msg, &mut buf)?;
socket.send(Message::Binary(buf)).await?;
socket.send(Message::Binary(Bytes::from(buf))).await?;
Ok(())
}

Expand Down Expand Up @@ -265,12 +265,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
tokio::select! {
Some(client_msg) = socket.recv() => {
let msg = match client_msg {
Ok(Message::Text(s)) => Some(TMessage::Text(s)),
Ok(Message::Text(s)) => Some(TMessage::Text(s.as_str().into())),
Ok(Message::Binary(b)) => Some(TMessage::Binary(b)),
Ok(Message::Close(frame)) => {
let frame = frame.map(|frame| TCloseFrame {
code: frame.code.into(),
reason: frame.reason,
reason: frame.reason.as_str().into(),
});
Some(TMessage::Close(frame))
}
Expand All @@ -285,12 +285,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
}
Some(server_msg) = upstream.next() => {
let msg = match server_msg {
Ok(TMessage::Text(s)) => Some(Message::Text(s)),
Ok(TMessage::Text(s)) => Some(Message::Text(s.as_str().into())),
Ok(TMessage::Binary(b)) => Some(Message::Binary(b)),
Ok(TMessage::Close(frame)) => {
let frame = frame.map(|frame| CloseFrame {
code: frame.code.into(),
reason: frame.reason,
reason: frame.reason.as_str().into(),
});
Some(Message::Close(frame))
}
Expand Down
11 changes: 7 additions & 4 deletions crates/sshx-server/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{ensure, Result};
use axum::serve::ListenerExt;
use futures_util::{SinkExt, StreamExt};
use hyper::{server::conn::AddrIncoming, StatusCode};
use hyper::StatusCode;
use sshx::encrypt::Encrypt;
use sshx_core::proto::sshx_service_client::SshxServiceClient;
use sshx_core::{Sid, Uid};
Expand Down Expand Up @@ -34,12 +35,14 @@ impl TestServer {
let listener = TcpListener::bind("[::1]:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();

let incoming = AddrIncoming::from_listener(listener).unwrap();
let server = Arc::new(Server::new(Default::default()).unwrap());
{
let server = Arc::clone(&server);
let listener = listener.tap_io(|tcp_stream| {
_ = tcp_stream.set_nodelay(true);
});
tokio::spawn(async move {
server.listen(incoming).await.unwrap();
server.listen(listener).await.unwrap();
});
}

Expand Down Expand Up @@ -124,7 +127,7 @@ impl ClientSocket {
pub async fn send(&mut self, msg: WsClient) {
let mut buf = Vec::new();
ciborium::ser::into_writer(&msg, &mut buf).unwrap();
self.inner.send(Message::Binary(buf)).await.unwrap();
self.inner.send(Message::Binary(buf.into())).await.unwrap();
}

pub async fn send_input(&mut self, id: Sid, data: &[u8]) {
Expand Down

0 comments on commit d9775a7

Please sign in to comment.