From 185721f98462a6f53c28bfede06ea56d582597e4 Mon Sep 17 00:00:00 2001 From: yukang Date: Thu, 30 Nov 2023 11:00:26 +0800 Subject: [PATCH] use with_shutdown for tcp connection --- rpc/src/server.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 8107415978..3909785ced 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -34,6 +34,7 @@ impl RpcServer { /// /// * `config` - RPC config options. /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). + /// * `handler` - Tokio runtime handle. pub fn new(config: RpcConfig, io_handler: IoHandler, handler: Handle) -> Self { let rpc = Arc::new(io_handler); @@ -141,7 +142,11 @@ impl RpcServer { let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); let stream_config = StreamServerConfig::default() .with_channel_size(4) - .with_pipeline_size(4); + .with_pipeline_size(4) + .with_shutdown(async move { + let exit = new_tokio_exit_rx(); + exit.cancelled().await; + }); let exit_signal: CancellationToken = new_tokio_exit_rx(); tokio::select! { @@ -160,14 +165,8 @@ impl RpcServer { }) }); tokio::pin!(w); - let exit_signal: CancellationToken = new_tokio_exit_rx(); - tokio::select! { - result = serve_stream_sink(&rpc, w, r, stream_config) => { - if let Err(err) = result { - info!("TCP RPCServer error: {:?}", err); - } - } - _ = exit_signal.cancelled() => {} + if let Err(err) = serve_stream_sink(&rpc, w, r, stream_config).await { + info!("TCP RPCServer error: {:?}", err); } }); }