Skip to content

Commit

Permalink
Merge branch 'master' into noah/warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
Noah-Kennedy authored Feb 20, 2025
2 parents def2dd8 + a27575f commit 935b4ce
Show file tree
Hide file tree
Showing 53 changed files with 1,411 additions and 168 deletions.
2 changes: 1 addition & 1 deletion .cirrus.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
only_if: $CIRRUS_TAG == '' && ($CIRRUS_PR != '' || $CIRRUS_BRANCH == 'master' || $CIRRUS_BRANCH =~ 'tokio-.*')
auto_cancellation: $CIRRUS_BRANCH != 'master' && $CIRRUS_BRANCH !=~ 'tokio-.*'
freebsd_instance:
image_family: freebsd-14-1
image_family: freebsd-14-2
env:
RUST_STABLE: stable
RUST_NIGHTLY: nightly-2025-01-25
Expand Down
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
299
300
&
+
<
Expand Down Expand Up @@ -78,6 +78,7 @@ deallocate
deallocated
Deallocates
debuginfo
decrement
decrementing
demangled
dequeued
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ __docs_rs = ["futures-util"]

[dependencies]
tokio = { version = "1.28.0", path = "../tokio", features = ["sync"] }
bytes = "1.0.0"
bytes = "1.2.1"
futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
Expand Down
253 changes: 252 additions & 1 deletion tokio-util/src/io/sync_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,258 @@ use tokio::io::{
};

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
/// a [`tokio::io::AsyncWrite`] synchronously as a [`std::io::Write`].
///
/// # Alternatives
///
/// In many cases, there are better alternatives to using `SyncIoBridge`, especially
/// if you want to avoid blocking the async runtime. Consider the following scenarios:
///
/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and
/// might not fully leverage the async capabilities of the system.
///
/// ### Why It Matters:
///
/// `SyncIoBridge` allows you to use asynchronous I/O operations in an synchronous
/// context by blocking the current thread. However, this can be inefficient because:
/// - **Inefficient Resource Usage**: `SyncIoBridge` takes up an entire OS thread,
/// which is inefficient compared to asynchronous code that can multiplex many
/// tasks on a single thread.
/// - **Thread Pool Saturation**: Excessive use of `SyncIoBridge` can exhaust the
/// async runtime's thread pool, reducing the number of threads available for
/// other tasks and impacting overall performance.
/// - **Missed Concurrency Benefits**: By using synchronous operations with
/// `SyncIoBridge`, you lose the ability to interleave tasks efficiently,
/// which is a key advantage of asynchronous programming.
///
/// ## Example 1: Hashing Data
///
/// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can
/// process the data asynchronously by reading it into memory, which avoids blocking
/// the async runtime.
///
/// There are two strategies for avoiding `SyncIoBridge` when hashing data. When
/// the data fits into memory, the easiest is to read the data into a `Vec<u8>`
/// and hash it:
///
/// Explanation: This example demonstrates how to asynchronously read data from a
/// reader into memory and hash it using a synchronous hashing function. The
/// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked.
/// ```rust
/// use tokio::io::AsyncReadExt;
/// use tokio::io::AsyncRead;
/// use std::io::Cursor;
/// # mod blake3 { pub fn hash(_: &[u8]) {} }
///
/// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Hash the data using the blake3 hashing function.
/// let hash = blake3::hash(&data);
///
/// Ok(hash)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// hash_contents(reader).await
/// }
/// ```
///
/// When the data doesn't fit into memory, the hashing library will usually
/// provide a `hasher` that you can repeatedly call `update` on to hash the data
/// one chunk at the time.
///
/// Explanation: This example demonstrates how to asynchronously stream data in
/// chunks for hashing. Each chunk is read asynchronously, and the hash is updated
/// incrementally. This avoids blocking and improves performance over using
/// `SyncIoBridge`.
///
/// ```rust
/// use tokio::io::AsyncReadExt;
/// use tokio::io::AsyncRead;
/// use std::io::Cursor;
/// # struct Hasher;
/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
///
/// /// Asynchronously streams data from an async reader, processes it in chunks,
/// /// and hashes the data incrementally.
/// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
/// // Create a buffer to read data into, sized for performance.
/// let mut data = vec![0; 64 * 1024];
/// loop {
/// // Read data from the reader into the buffer.
/// let len = reader.read(&mut data).await?;
/// if len == 0 { break; } // Exit loop if no more data.
///
/// // Update the hash with the data read.
/// hasher.update(&data[..len]);
/// }
///
/// // Finalize the hash after all data has been processed.
/// let hash = hasher.finalize();
///
/// Ok(hash)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// let hasher = Hasher;
/// hash_stream(reader, hasher).await
/// }
/// ```
///
///
/// ## Example 2: Compressing Data
///
/// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces
/// blocking and inefficient code. Instead, you can utilize an async compression library
/// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/)
/// crate, which is built to handle asynchronous data streams efficiently.
///
/// Explanation: This example shows how to asynchronously compress data using an
/// async compression library. By reading and writing asynchronously, it avoids
/// blocking and is more efficient than using `SyncIoBridge` with a non-async
/// compression library.
///
/// ```ignore
/// use async_compression::tokio::write::GzipEncoder;
/// use std::io::Cursor;
/// use tokio::io::AsyncRead;
///
/// /// Asynchronously compresses data from an async reader using Gzip and an async encoder.
/// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
/// let writer = tokio::io::sink();
///
/// // Create a Gzip encoder that wraps the writer.
/// let mut encoder = GzipEncoder::new(writer);
///
/// // Copy data from the reader to the encoder, compressing it.
/// tokio::io::copy(&mut reader, &mut encoder).await?;
///
/// Ok(())
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// compress_data(reader).await?;
///
/// Ok(())
/// }
/// ```
///
///
/// ## Example 3: Parsing Data Formats
///
///
/// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it
/// blocks async operations. A more efficient approach is to read data asynchronously
/// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead.
///
/// Explanation: This example shows how to asynchronously read data into memory
/// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime
/// remains unblocked, leading to better performance when working with asynchronous
/// I/O streams.
///
/// ```rust,no_run
/// use tokio::io::AsyncRead;
/// use tokio::io::AsyncReadExt;
/// use std::io::Cursor;
/// # mod serde {
/// # pub trait DeserializeOwned: 'static {}
/// # impl<T: 'static> DeserializeOwned for T {}
/// # }
/// # mod serde_json {
/// # use super::serde::DeserializeOwned;
/// # pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
/// # unimplemented!()
/// # }
/// # }
/// # #[derive(Debug)] struct MyStruct;
///
///
/// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Deserialize the data from the Vec<u8> into a MyStruct instance.
/// let value: MyStruct = serde_json::from_slice(&data)?;
///
/// Ok(value)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// parse_json(reader).await?;
/// Ok(())
/// }
/// ```
///
/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
///
/// `SyncIoBridge` is mainly useful when you need to interface with synchronous
/// libraries from an asynchronous context.
///
/// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking`
/// task to safely perform synchronous I/O without blocking the async runtime. The
/// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated
/// thread pool, preventing it from interfering with the async tasks.
///
/// ```rust
/// use tokio::task::spawn_blocking;
/// use tokio_util::io::SyncIoBridge;
/// use tokio::io::AsyncRead;
/// use std::marker::Unpin;
/// use std::io::Cursor;
///
/// /// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
/// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
/// // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
/// let mut sync_reader = SyncIoBridge::new(reader);
///
/// // Spawn a blocking task to perform synchronous I/O operations.
/// let result = spawn_blocking(move || {
/// // Create an in-memory buffer to hold the copied data.
/// let mut buffer = Vec::new();
/// // Copy data from the sync_reader to the buffer.
/// std::io::copy(&mut sync_reader, &mut buffer)?;
/// // Return the buffer containing the copied data.
/// Ok::<_, std::io::Error>(buffer)
/// })
/// .await??;
///
/// // Return the result from the blocking task.
/// Ok(result)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// let result = process_sync_io(reader).await?;
///
/// // You can use `result` here as needed.
///
/// Ok(())
/// }
/// ```
///
#[derive(Debug)]
pub struct SyncIoBridge<T> {
src: T,
Expand Down
7 changes: 4 additions & 3 deletions tokio-util/src/udp/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use tokio::{io::ReadBuf, net::UdpSocket};

use bytes::{BufMut, BytesMut};
use futures_sink::Sink;
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::{
borrow::Borrow,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use std::{io, mem::MaybeUninit};

/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
Expand Down Expand Up @@ -83,17 +83,18 @@ where
let addr = {
// Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
// transparent wrapper around `[MaybeUninit<u8>]`.
let buf = unsafe { &mut *(pin.rd.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) };
let buf = unsafe { pin.rd.chunk_mut().as_uninit_slice_mut() };
let mut read = ReadBuf::uninit(buf);
let ptr = read.filled().as_ptr();
let res = ready!(pin.socket.borrow().poll_recv_from(cx, &mut read));

assert_eq!(ptr, read.filled().as_ptr());
let addr = res?;

let filled = read.filled().len();
// Safety: This is guaranteed to be the number of initialized (and read) bytes due
// to the invariants provided by `ReadBuf::filled`.
unsafe { pin.rd.advance_mut(read.filled().len()) };
unsafe { pin.rd.advance_mut(filled) };

addr
};
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/src/util/poll_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use bytes::{Buf, BufMut};
use std::io::{self, IoSlice};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

Expand Down Expand Up @@ -59,7 +58,7 @@ pub fn poll_read_buf<T: AsyncRead + ?Sized, B: BufMut>(

// Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
// transparent wrapper around `[MaybeUninit<u8>]`.
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let dst = unsafe { dst.as_uninit_slice_mut() };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(io.poll_read(cx, &mut buf)?);
Expand Down
3 changes: 2 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ tokio-macros = { version = "~2.5.0", path = "../tokio-macros", optional = true }
pin-project-lite = "0.2.11"

# Everything else is optional...
bytes = { version = "1.1.0", optional = true }
bytes = { version = "1.2.1", optional = true }
mio = { version = "1.0.1", optional = true, default-features = false }
parking_lot = { version = "0.12.0", optional = true }

Expand Down Expand Up @@ -133,6 +133,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
mockall = "0.11.1"
async-stream = "0.3"
futures-concurrency = "7.6.3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
socket2 = "0.5.5"
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ impl AsyncRead for File {
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));

let me = self.get_mut();
let inner = me.inner.get_mut();

Expand All @@ -595,7 +596,7 @@ impl AsyncRead for File {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();

if !buf.is_empty() {
if !buf.is_empty() || dst.remaining() == 0 {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Poll::Ready(Ok(()));
Expand Down
Loading

0 comments on commit 935b4ce

Please sign in to comment.