From cf5ea3ea31109c1397c2313c1a5aa311b75e0390 Mon Sep 17 00:00:00 2001 From: sundengyu Date: Fri, 7 Feb 2025 15:34:18 +0800 Subject: [PATCH 1/4] feat(aio): use lockless linked list to store aio tasks Signed-off-by: sundengyu --- Cargo.toml | 1 - src/bin/bench/main.rs | 2 +- src/bindings/async_sys.rs | 4 +- src/io/async_io.rs | 234 +++++++++++++++++++++++++++----------- src/io/async_io_c.rs | 71 ++---------- zfs | 2 +- 6 files changed, 180 insertions(+), 134 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1470707..5e389d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ prometheus-client = { git = "https://github.com/iomesh/client_rust.git", tag = " strum = "0.26.2" strum_macros = "0.26.4" crossbeam-utils = "0.8.20" -kanal = "0.1.0-pre8" [dev-dependencies] tokio-test = "0.4.2" diff --git a/src/bin/bench/main.rs b/src/bin/bench/main.rs index 415fe24..92d9be4 100644 --- a/src/bin/bench/main.rs +++ b/src/bin/bench/main.rs @@ -60,7 +60,7 @@ async fn main() { uzfs_env_init().await; let dev_path = std::env::args().nth(1).unwrap(); let sync: bool = std::env::args().nth(2).unwrap().parse().unwrap(); - let concurrency = 64; + let concurrency = 48; let blksize = 1 << 20; let file_size = 1 << 30; diff --git a/src/bindings/async_sys.rs b/src/bindings/async_sys.rs index 170bc1e..aebc323 100644 --- a/src/bindings/async_sys.rs +++ b/src/bindings/async_sys.rs @@ -82,9 +82,7 @@ pub(crate) unsafe fn set_libuzfs_ops(log_func: Option c_int; + #[inline] unsafe fn io_setup(nr_events: i64) -> Result { let mut io_ctx: aio_context_t = 0; @@ -66,56 +70,158 @@ unsafe fn io_submit(io_ctx: aio_context_t, iocbs: &[iocb]) -> Result<(), Error> Ok(()) } -#[derive(Default)] -pub(super) struct IoContent { - pub(super) data_ptr: usize, - pub(super) offset: u64, - pub(super) size: u64, +const SEM_CLOSED: usize = usize::MAX; + +struct Semaphore { + count: Mutex, + cond: Condvar, } -pub(super) enum IoType { - Read, - Write, - Sync, +impl Semaphore { + fn new(count: usize) -> Self { + Self { + count: Mutex::new(count), + cond: Condvar::new(), + } + } + + fn post(&self) { + let mut count = self.count.lock().unwrap(); + *count += 1; + self.cond.notify_all(); + } + + fn close(&self) { + let mut count = self.count.lock().unwrap(); + *count = SEM_CLOSED; + self.cond.notify_all(); + } + + fn acquire_all(&self) -> Result<(), ()> { + let mut count = self.count.lock().unwrap(); + loop { + if *count == SEM_CLOSED { + return Err(()); + } + + if *count > 0 { + *count = 0; + return Ok(()); + } + + count = self.cond.wait(count).unwrap(); + } + } } -pub(super) struct AioCallback { - pub(super) io_type: IoType, - pub(super) io_content: IoContent, - pub(super) arg: *mut libc::c_void, +const AIO_READ: c_int = 0; +const AIO_WRITE: c_int = 1; +const AIO_FSYNC: c_int = 2; + +pub(super) struct TaskList { + head: AtomicPtr, + sem: Semaphore, + next_off: usize, + init_io_arg: InitArgsFunc, + io_fd: i32, } -unsafe impl Send for AioCallback {} -unsafe impl Sync for AioCallback {} - -impl AioCallback { - fn into_iocb(self, io_fd: i32) -> iocb { - let opcode = match self.io_type { - IoType::Read => IOCB_CMD_PREAD, - IoType::Write => IOCB_CMD_PWRITE, - IoType::Sync => IOCB_CMD_FSYNC, - }; - - iocb { - aio_data: self.arg as u64, - aio_key: 0, - aio_rw_flags: 0, - aio_lio_opcode: opcode as u16, - aio_reqprio: 0, - aio_fildes: io_fd as u32, - aio_buf: self.io_content.data_ptr as u64, - aio_nbytes: self.io_content.size, - aio_offset: self.io_content.offset as i64, - aio_reserved2: 0, - aio_flags: 0, - aio_resfd: 0, +impl TaskList { + pub(super) fn new_arc(next_off: usize, init_io_arg: InitArgsFunc, io_fd: i32) -> Arc { + Arc::new(Self { + head: AtomicPtr::new(null_mut()), + sem: Semaphore::new(0), + next_off, + init_io_arg, + io_fd, + }) + } + + #[inline] + pub(super) unsafe fn push(&self, arg: *mut c_void) { + let next = arg.byte_add(self.next_off) as *mut *mut c_void; + + // Fetch the current head and update it with the new task node + // `Ordering::Release` ensures that the writes to the task node (like `(*task).next`) + // happen before updating the head pointer. + // This prevents other threads from seeing an incomplete task node with an uninitialized `next`. + let was_empty = self + .head + .fetch_update(Ordering::Release, Ordering::Relaxed, |ptr| { + *next = ptr; + Some(arg) + }) + .unwrap() + .is_null(); + + if was_empty { + self.sem.post(); } } + + fn close(&self) { + self.sem.close(); + } + + #[inline] + unsafe fn pop_all(&self) -> Option> { + if self.sem.acquire_all().is_err() { + return None; + } + + // Use `swap` with `Ordering::Acquire` to read and clear the head pointer. + // `Ordering::Acquire` ensures that all operations before this load (such as the writes to the task list) + // are visible to this thread. Without this, we might see stale data or incomplete updates to the list. + let mut cur = self.head.swap(null_mut(), Ordering::Acquire); + + let mut res = Vec::with_capacity(256); + while !cur.is_null() { + let mut off = 0; + let mut data = null_mut(); + let mut len = 0; + let io_type = (self.init_io_arg)(cur, &mut off, &mut data, &mut len); + + let opcode = match io_type { + AIO_READ => IOCB_CMD_PREAD, + AIO_WRITE => IOCB_CMD_PWRITE, + AIO_FSYNC => IOCB_CMD_FSYNC, + _ => unimplemented!(), + }; + + res.push(iocb { + aio_data: cur as u64, + aio_key: 0, + aio_rw_flags: 0, + aio_lio_opcode: opcode as u16, + aio_reqprio: 0, + aio_fildes: self.io_fd as u32, + aio_buf: data as u64, + aio_nbytes: len as u64, + aio_offset: off as i64, + aio_reserved2: 0, + aio_flags: 0, + aio_resfd: 0, + }); + + cur = *(cur.byte_add(self.next_off) as *mut *mut c_void); + } + + Some(res) + } +} + +impl Drop for TaskList { + fn drop(&mut self) { + assert!(self.head.load(Ordering::Relaxed).is_null()); + } } +unsafe impl Send for TaskList {} +unsafe impl Sync for TaskList {} + struct IoCompletions { completions: Vec, - cb: unsafe extern "C" fn(arg: *mut libc::c_void, res: i64), + io_done: DoneFunc, } unsafe impl Send for IoCompletions {} @@ -124,12 +230,12 @@ unsafe impl Sync for IoCompletions {} unsafe extern "C" fn process_completion(arg: *mut libc::c_void) { let completions = &*(arg as *const IoCompletions); for completion in &completions.completions { - (completions.cb)(completion.data as *mut libc::c_void, completion.res); + (completions.io_done)(completion.data as *mut libc::c_void, completion.res); } } pub(super) struct AioContext { - pub(super) sender: Sender, + pub(super) task_list: Arc, reaper: Option>, submitter: Option>, stop: Arc, @@ -140,21 +246,9 @@ const MAX_EVENTS: usize = 4096; const MAX_IDLE_MILLS: u64 = 10; impl AioContext { - pub(super) fn submit(io_fd: i32, receiver: Receiver, io_ctx: aio_context_t) { - loop { - match receiver.recv() { - Ok(task) => { - let mut iocbs = Vec::with_capacity(MAX_EVENTS); - iocbs.push(task.into_iocb(io_fd)); - - while let Ok(Some(task)) = receiver.try_recv() { - iocbs.push(task.into_iocb(io_fd)); - } - - unsafe { io_submit(io_ctx, &iocbs).unwrap() }; - } - _ => return, - } + pub(super) fn submit(task_list: Arc, io_ctx: aio_context_t) { + while let Some(tasks) = unsafe { task_list.pop_all() } { + unsafe { io_submit(io_ctx, &tasks).unwrap() }; } } @@ -162,7 +256,7 @@ impl AioContext { io_ctx: aio_context_t, stop: Arc, handle: Handle, - cb: unsafe extern "C" fn(arg: *mut libc::c_void, res: i64), + io_done: DoneFunc, ) -> Result<(), Error> { while !stop.load(Ordering::Acquire) { let mut ts = timespec { @@ -185,7 +279,10 @@ impl AioContext { if ret > 0 { handle.block_on(async move { unsafe { completions.set_len(ret as usize) }; - let mut completions = IoCompletions { completions, cb }; + let mut completions = IoCompletions { + completions, + io_done, + }; let arg = &mut completions as *mut _ as usize; CoroutineFuture::new(process_completion, arg).await; }); @@ -210,19 +307,22 @@ impl AioContext { pub(super) fn start( io_fd: i32, - cb: unsafe extern "C" fn(arg: *mut libc::c_void, res: i64), + io_done: DoneFunc, + next_off: usize, + init_io_args: InitArgsFunc, ) -> Result { let io_ctx = unsafe { io_setup(256)? }; - let (sender, receiver) = unbounded(); - let submitter = std::thread::spawn(move || Self::submit(io_fd, receiver, io_ctx)); + let task_list = TaskList::new_arc(next_off, init_io_args, io_fd); + let task_list_cloned = task_list.clone(); + let submitter = std::thread::spawn(move || Self::submit(task_list_cloned, io_ctx)); let stop = Arc::new(AtomicBool::new(false)); let stop_cloned = stop.clone(); let handle = Handle::current(); let reaper = std::thread::spawn(move || { - Self::reap(io_ctx, stop_cloned, handle, cb).unwrap(); + Self::reap(io_ctx, stop_cloned, handle, io_done).unwrap(); }); Ok(Self { - sender, + task_list: task_list, reaper: Some(reaper), submitter: Some(submitter), stop, @@ -234,7 +334,7 @@ impl AioContext { impl Drop for AioContext { fn drop(&mut self) { self.stop.store(true, Ordering::Release); - self.sender.close(); + self.task_list.close(); fence(Ordering::SeqCst); self.submitter.take().unwrap().join().unwrap(); self.reaper.take().unwrap().join().unwrap(); diff --git a/src/io/async_io_c.rs b/src/io/async_io_c.rs index 41e9028..955df15 100644 --- a/src/io/async_io_c.rs +++ b/src/io/async_io_c.rs @@ -1,11 +1,16 @@ -use super::async_io::{AioCallback, AioContext, IoContent, IoType}; +use crate::bindings::sys::{aio_done_func_t, init_io_args_func_t}; + +use super::async_io::AioContext; #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn register_fd( fd: i32, - cb: Option, + next_off: usize, + io_done: aio_done_func_t, + init_io_args: init_io_args_func_t, ) -> *mut libc::c_void { - let aio_context = Box::new(AioContext::start(fd, cb.unwrap()).unwrap()); + let aio_context = + Box::new(AioContext::start(fd, io_done.unwrap(), next_off, init_io_args.unwrap()).unwrap()); Box::into_raw(aio_context) as *mut libc::c_void } @@ -15,63 +20,7 @@ pub unsafe extern "C" fn unregister_fd(aio_hdl: *mut libc::c_void) { } #[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn submit_read( - aio_hdl: *const libc::c_void, - offset: u64, - buf: *mut i8, - size: u64, - arg: *mut libc::c_void, -) { - let aio_hdl = &*(aio_hdl as *const AioContext); - let io_content = IoContent { - data_ptr: buf as usize, - offset, - size, - }; - aio_hdl - .sender - .send(AioCallback { - io_type: IoType::Read, - io_content, - arg, - }) - .unwrap(); -} - -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn submit_write( - aio_hdl: *const libc::c_void, - offset: u64, - buf: *const i8, - size: u64, - arg: *mut libc::c_void, -) { - let aio_hdl = &*(aio_hdl as *const AioContext); - let io_content = IoContent { - data_ptr: buf as usize, - offset, - size, - }; - aio_hdl - .sender - .send(AioCallback { - io_type: IoType::Write, - io_content, - arg, - }) - .unwrap(); -} - -#[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn submit_fsync(aio_hdl: *const libc::c_void, arg: *mut libc::c_void) { +pub unsafe extern "C" fn submit_aio(aio_hdl: *const libc::c_void, arg: *mut libc::c_void) { let aio_hdl = &*(aio_hdl as *const AioContext); - let io_content = IoContent::default(); - aio_hdl - .sender - .send(AioCallback { - io_type: IoType::Sync, - io_content, - arg, - }) - .unwrap(); + aio_hdl.task_list.push(arg); } diff --git a/zfs b/zfs index 80708e6..a45b7c9 160000 --- a/zfs +++ b/zfs @@ -1 +1 @@ -Subproject commit 80708e6458f2b7bcadf2e4c5ab71d01d06dd70e1 +Subproject commit a45b7c9439c976ba0411d57377f4605fa4795f67 From 097a4da6226d0f47d22e4ff4f8e2ba046139325f Mon Sep 17 00:00:00 2001 From: sundengyu Date: Mon, 10 Feb 2025 13:39:01 +0800 Subject: [PATCH 2/4] feat(read): add zero copy interfaces for object read Signed-off-by: sundengyu --- src/bin/bench/main.rs | 4 +- src/bindings/async_sys.rs | 22 ++++ src/dataset.rs | 199 +++++++++++++++++++++++++++++++++++++ src/tests/dataset_tests.rs | 58 +++++++++++ zfs | 2 +- 5 files changed, 283 insertions(+), 2 deletions(-) diff --git a/src/bin/bench/main.rs b/src/bin/bench/main.rs index 92d9be4..1440040 100644 --- a/src/bin/bench/main.rs +++ b/src/bin/bench/main.rs @@ -21,7 +21,9 @@ async fn worker(obj: u64, ds: Arc, blksize: u64, file_size: u64, sync: .unwrap(); } BenchOp::Read => { - ds.read_object(&ino_hdl, offset, blksize).await.unwrap(); + ds.read_object_zero_copy(&ino_hdl, offset, blksize) + .await + .unwrap(); } } ino_hdl diff --git a/src/bindings/async_sys.rs b/src/bindings/async_sys.rs index aebc323..362e50d 100644 --- a/src/bindings/async_sys.rs +++ b/src/bindings/async_sys.rs @@ -530,6 +530,28 @@ pub unsafe extern "C" fn libuzfs_read_object_c(arg: *mut c_void) { } } +pub struct ReadObjectZeroCopyArg { + pub ihp: *mut libuzfs_inode_handle_t, + pub offset: u64, + pub size: u64, + + pub err: i32, + pub data: libuzfs_read_buf_t, +} + +unsafe impl Send for ReadObjectZeroCopyArg {} +unsafe impl Sync for ReadObjectZeroCopyArg {} + +pub unsafe extern "C" fn libuzfs_read_object_zero_copy_c(args: *mut c_void) { + let args = &mut *(args as *mut ReadObjectZeroCopyArg); + args.err = libuzfs_object_read_zero_copy(args.ihp, args.offset, args.size, &mut args.data); +} + +pub unsafe extern "C" fn libuzfs_read_buf_rele_c(args: *mut c_void) { + let read_buf = args as *mut libuzfs_read_buf_t; + libuzfs_read_buf_rele(read_buf); +} + pub struct LibuzfsWriteObjectArg { pub ihp: *mut libuzfs_inode_handle_t, pub offset: u64, diff --git a/src/dataset.rs b/src/dataset.rs index 3e1a1a2..75f44c6 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -3,13 +3,23 @@ use std::ffi::CString; use std::io; use std::io::Error; use std::io::ErrorKind; +use std::mem; use std::os::raw::{c_char, c_void}; use std::ptr::null_mut; +use std::ptr::slice_from_raw_parts; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; use cstr_argument::CStrArgument; use io::Result; use once_cell::sync::OnceCell; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time::timeout; use crate::bindings::async_sys::*; use crate::bindings::sys::*; @@ -165,11 +175,133 @@ pub enum KvSetOption { NeedLog = 1 << 1, } +struct ReadBuf(libuzfs_read_buf_t); + +impl Default for ReadBuf { + fn default() -> Self { + Self(libuzfs_read_buf { + ihp: null_mut(), + lr: null_mut(), + offset: 0, + nread: 0, + dbpp: null_mut(), + num_bufs: 0, + }) + } +} + +impl ReadBuf { + async fn release(mut self) { + let buf_usize = &mut self.0 as *mut _ as usize; + CoroutineFuture::new(libuzfs_read_buf_rele_c, buf_usize).await; + mem::forget(self); + } +} + +impl Drop for ReadBuf { + fn drop(&mut self) { + assert!( + self.0.lr.is_null() && self.0.dbpp.is_null(), + "non-empty read buf {:?} cannot be dropped!", + self.0 + ) + } +} + +unsafe impl Send for ReadBuf {} +unsafe impl Sync for ReadBuf {} + +pub struct ReadBufWrapper { + data: ReadBuf, + sender: UnboundedSender, +} + +impl ReadBufWrapper { + #[inline] + pub fn as_slices<'a>(&'a self) -> Vec<&'a [u8]> { + let len = self.data.0.num_bufs as usize; + let mut slices = Vec::with_capacity(len); + unsafe { libuzfs_read_buf_to_slices(&self.data.0, slices.as_mut_ptr()) }; + unsafe { slices.set_len(len) }; + slices + .into_iter() + .map(|slice| unsafe { &*slice_from_raw_parts(slice.buf as *const u8, slice.len) }) + .collect() + } +} + +impl Drop for ReadBufWrapper { + fn drop(&mut self) { + self.sender.send(std::mem::take(&mut self.data)).unwrap(); + } +} + +struct ReadBufReleaser { + sender: UnboundedSender, + stop: Arc, + handle: Mutex>>, +} + +impl ReadBufReleaser { + fn new() -> Self { + let stop = Arc::new(AtomicBool::new(false)); + + let stop_cloned = stop.clone(); + let (sender, mut receiver) = unbounded_channel::(); + let handle = tokio::spawn(async move { + loop { + let mut bufs = Vec::with_capacity(256); + if let Ok(nrecv) = timeout( + Duration::from_millis(100), + receiver.recv_many(&mut bufs, 256), + ) + .await + { + assert_ne!(nrecv, 0); + for buf in bufs { + buf.release().await; + } + } + + if stop_cloned.load(Ordering::Acquire) { + break; + } + } + }); + + Self { + sender, + stop, + handle: Mutex::new(Some(handle)), + } + } + + fn wrap_read_buf(&self, data: libuzfs_read_buf_t) -> ReadBufWrapper { + ReadBufWrapper { + data: ReadBuf(data), + sender: self.sender.clone(), + } + } + + async fn exit(&self) { + while self.sender.strong_count() > 1 { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + self.stop.fetch_or(true, Ordering::AcqRel); + let mut handle = self.handle.lock().await; + if let Some(task) = handle.take() { + task.await.unwrap(); + } + } +} + pub struct Dataset { dhp: *mut libuzfs_dataset_handle_t, zhp: *mut libuzfs_zpool_handle_t, poolname: CString, metrics: Box, + buf_releaser: ReadBufReleaser, } // metrics @@ -241,6 +373,7 @@ impl Dataset { zhp: arg.zhp, poolname, metrics, + buf_releaser: ReadBufReleaser::new(), }) } } @@ -279,6 +412,8 @@ impl Dataset { } pub async fn close(&self) -> Result<()> { + self.buf_releaser.exit().await; + let mut arg = LibuzfsDatasetFiniArg { dhp: self.dhp, zhp: self.zhp, @@ -590,6 +725,70 @@ impl Dataset { } } + /// Reads an object from storage without copying the data. + /// + /// This function performs a zero-copy read operation, retrieving the specified + /// portion of an object associated with the given inode handle. It directly references + /// the underlying ZFS ARC buffer and includes a range lock, ensuring safe concurrent access. + /// + /// # Constraints + /// - The `size` must not exceed `32 MiB` (`32 << 20` bytes). + /// - `ino_hdl` cannot be released before returned ReadBufWrapper is dropped + /// + /// # Parameters + /// - `ino_hdl`: A reference to the [`InodeHandle`] that identifies the object. + /// - `offset`: The byte offset within the object from where the read should start. + /// - `size`: The number of bytes to read (must be ≤ 32 MiB). + /// + /// # Returns + /// - `Ok(ReadBufWrapper)`: A wrapper containing the read data on success. + /// - `Err(...)`: An error if the read operation fails. + /// + /// # Resource Management + /// - `ReadBufWrapper` holds references to the underlying range lock and ZFS ARC buffer. + /// - It **must be dropped as soon as it is no longer needed** to avoid blocking other operations. + /// + /// # Async Behavior + /// This function is asynchronous and must be awaited. + /// + /// # Errors + /// Returns an error if the read operation encounters issues, such as: + /// - Underlying storage failures. + pub async fn read_object_zero_copy( + &self, + ino_hdl: &InodeHandle, + offset: u64, + size: u64, + ) -> Result { + let _guard = self + .metrics + .record(RequestMethod::ReadObject, size as usize); + let mut arg = ReadObjectZeroCopyArg { + ihp: ino_hdl.ihp, + offset, + size, + err: 0, + data: libuzfs_read_buf { + ihp: null_mut(), + lr: null_mut(), + offset: 0, + nread: 0, + dbpp: null_mut(), + num_bufs: 0, + }, + }; + + let arg_usize = &mut arg as *mut _ as usize; + + CoroutineFuture::new(libuzfs_read_object_zero_copy_c, arg_usize).await; + + if arg.err == 0 { + Ok(self.buf_releaser.wrap_read_buf(arg.data)) + } else { + Err(io::Error::from_raw_os_error(arg.err)) + } + } + pub async fn write_object( &self, ino_hdl: &InodeHandle, diff --git a/src/tests/dataset_tests.rs b/src/tests/dataset_tests.rs index 9f0992a..b025f80 100644 --- a/src/tests/dataset_tests.rs +++ b/src/tests/dataset_tests.rs @@ -16,6 +16,7 @@ use nix::unistd::ForkResult; use petgraph::algo::is_cyclic_directed; use petgraph::prelude::DiGraph; use rand::distributions::Alphanumeric; +use rand::thread_rng; use rand::Rng; use std::collections::HashMap; use std::ffi::CString; @@ -1426,3 +1427,60 @@ async fn dentry_test() { ds.close().await.unwrap(); uzfs_env_fini().await; } + +#[tokio::test(flavor = "multi_thread")] +async fn read_zero_copy_test() { + let dsname = "read_zero_copy_test/ds"; + let uzfs_test_env = UzfsTestEnv::new(100 * 1024 * 1024); + uzfs_env_init().await; + let ds = Arc::new( + Dataset::init( + dsname, + uzfs_test_env.get_dev_path(), + DatasetType::Data, + 0, + false, + ) + .await + .unwrap(), + ); + + for _ in 0..128 { + let nobjs = 128; + let objs = ds.create_objects(nobjs).await.unwrap().0; + let handles: Vec<_> = objs + .into_iter() + .map(|obj| { + let ds = ds.clone(); + tokio::spawn(async move { + let mut ino_hdl = ds.get_inode_handle(obj, u64::MAX, true).await.unwrap(); + + let off = thread_rng().gen_range(0..65536); + let size = thread_rng().gen_range(16384..65536); + let buf = vec![123; size]; + ds.write_object(&ino_hdl, off, false, vec![&buf]) + .await + .unwrap(); + let read_buf = ds + .read_object_zero_copy(&ino_hdl, off, size as u64) + .await + .unwrap(); + let mut data_read = Vec::new(); + for slice in read_buf.as_slices() { + data_read.extend_from_slice(slice); + } + assert_eq!(data_read, buf, "slices: {:?}", read_buf.as_slices()); + + ds.delete_object(&mut ino_hdl).await.unwrap(); + ds.release_inode_handle(&mut ino_hdl).await; + }) + }) + .collect(); + for handle in handles { + handle.await.unwrap(); + } + } + + ds.close().await.unwrap(); + uzfs_env_fini().await; +} diff --git a/zfs b/zfs index a45b7c9..bf8ab7c 160000 --- a/zfs +++ b/zfs @@ -1 +1 @@ -Subproject commit a45b7c9439c976ba0411d57377f4605fa4795f67 +Subproject commit bf8ab7c5ddc718a2eb38873c0e75a5721264a893 From cdd5dc7bb4559395cc38c2a9f3d1f45377baf48d Mon Sep 17 00:00:00 2001 From: sundengyu Date: Thu, 13 Feb 2025 17:51:44 +0800 Subject: [PATCH 3/4] feat(object): support append write Signed-off-by: sundengyu --- src/bin/bench/main.rs | 11 +- src/dataset.rs | 12 +- src/tests/dataset_tests.rs | 226 +++++++++++++++++++++++++++++-------- zfs | 2 +- 4 files changed, 197 insertions(+), 54 deletions(-) diff --git a/src/bin/bench/main.rs b/src/bin/bench/main.rs index 1440040..91174e3 100644 --- a/src/bin/bench/main.rs +++ b/src/bin/bench/main.rs @@ -16,9 +16,14 @@ async fn worker(obj: u64, ds: Arc, blksize: u64, file_size: u64, sync: match op { BenchOp::Write => { let data = vec![1; blksize as usize]; - ds.write_object(&ino_hdl, offset, sync, vec![&data]) - .await - .unwrap(); + ds.write_object( + &ino_hdl, + WriteMode::OverwriteFrom(offset), + sync, + vec![&data], + ) + .await + .unwrap(); } BenchOp::Read => { ds.read_object_zero_copy(&ino_hdl, offset, blksize) diff --git a/src/dataset.rs b/src/dataset.rs index 75f44c6..1632323 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -598,6 +598,11 @@ impl Dataset { } } +pub enum WriteMode { + AppendToEnd, + OverwriteFrom(u64), +} + // object functions impl Dataset { pub async fn create_objects(&self, num_objs: usize) -> Result<(Vec, u64)> { @@ -792,7 +797,7 @@ impl Dataset { pub async fn write_object( &self, ino_hdl: &InodeHandle, - offset: u64, + mode: WriteMode, sync: bool, data: Vec<&[u8]>, ) -> Result<()> { @@ -809,7 +814,10 @@ impl Dataset { .collect(); let mut arg = LibuzfsWriteObjectArg { ihp: ino_hdl.ihp, - offset, + offset: match mode { + WriteMode::AppendToEnd => u64::MAX, + WriteMode::OverwriteFrom(off) => off, + }, iovs, sync, err: 0, diff --git a/src/tests/dataset_tests.rs b/src/tests/dataset_tests.rs index b025f80..782186e 100644 --- a/src/tests/dataset_tests.rs +++ b/src/tests/dataset_tests.rs @@ -7,8 +7,10 @@ use crate::DatasetType; use crate::InodeType; use crate::KvSetOption; use crate::UzfsDentry; +use crate::WriteMode; use crate::MAX_RESERVED_SIZE; use dashmap::DashMap; +use futures::future::join_all; use nix::sys::wait::waitpid; use nix::sys::wait::WaitStatus; use nix::unistd::fork; @@ -143,7 +145,7 @@ async fn uzfs_test() { let data = s.as_bytes(); let size = s.len() as u64; - ds.write_object(&rwobj_hdl, 0, true, vec![data]) + ds.write_object(&rwobj_hdl, WriteMode::OverwriteFrom(0), true, vec![data]) .await .unwrap(); assert_eq!(ds.get_object_attr(&rwobj_hdl).await.unwrap().size, size); @@ -222,12 +224,17 @@ async fn uzfs_test() { let size = 1 << 18; let mut data = Vec::::with_capacity(size); data.resize(size, 1); - ds.write_object(&obj_hdl, 0, false, vec![&data]) - .await - .unwrap(); - ds.write_object(&obj_hdl, (size * 2) as u64, false, vec![&data]) + ds.write_object(&obj_hdl, WriteMode::OverwriteFrom(0), false, vec![&data]) .await .unwrap(); + ds.write_object( + &obj_hdl, + WriteMode::OverwriteFrom((size * 2) as u64), + false, + vec![&data], + ) + .await + .unwrap(); ds.wait_synced().await; assert!(!ds .object_has_hole_in_range(&obj_hdl, 0, size as u64) @@ -561,7 +568,12 @@ async fn uzfs_expand_test() { let mut obj_hdl = ds_clone.get_inode_handle(objs[0], gen, true).await.unwrap(); while offset < size { while ds_clone - .write_object(&obj_hdl, offset, false, vec![&buf]) + .write_object( + &obj_hdl, + WriteMode::OverwriteFrom(offset), + false, + vec![&buf], + ) .await .is_err() { @@ -644,7 +656,12 @@ async fn uzfs_rangelock_test() { buf_u16.resize(write_size, my_version); let buf_u8 = unsafe { buf_u16.align_to::().1 }; ds_clone - .write_object(&obj_hdl, offset as u64 * 2, false, vec![buf_u8]) + .write_object( + &obj_hdl, + WriteMode::OverwriteFrom(offset as u64 * 2), + false, + vec![buf_u8], + ) .await .unwrap(); } @@ -889,28 +906,28 @@ async fn test_reduce_max(dsname: &str, dev_path: &str) { // original max > blksize of obj0 > reduced max, but is not power of 2 let data0 = vec![1; 3 << 9]; - ds.write_object(&hdl0, 0, false, vec![&data0]) + ds.write_object(&hdl0, WriteMode::OverwriteFrom(0), false, vec![&data0]) .await .unwrap(); let obj_attr0 = ds.get_object_attr(&hdl0).await.unwrap(); assert_eq!(obj_attr0.blksize, data0.len() as u32); // original max > blksize of obj1 > reduced max, is power of 2 let data1 = vec![1; 4 << 9]; - ds.write_object(&hdl1, 0, false, vec![&data1]) + ds.write_object(&hdl1, WriteMode::OverwriteFrom(0), false, vec![&data1]) .await .unwrap(); let blksize1 = ds.get_object_attr(&hdl1).await.unwrap().blksize; assert_eq!(blksize1, data1.len() as u32); // blksize of obj2 > original max > reduced max let data2 = vec![1; 9 << 9]; - ds.write_object(&hdl2, 0, false, vec![&data2]) + ds.write_object(&hdl2, WriteMode::OverwriteFrom(0), false, vec![&data2]) .await .unwrap(); let blksize2 = ds.get_object_attr(&hdl2).await.unwrap().blksize; assert_eq!(blksize2, 4096); // original max > reduced max > blksize of obj3 let data3 = vec![1; 1 << 9]; - ds.write_object(&hdl3, 0, false, vec![&data3]) + ds.write_object(&hdl3, WriteMode::OverwriteFrom(0), false, vec![&data3]) .await .unwrap(); assert_eq!( @@ -930,19 +947,34 @@ async fn test_reduce_max(dsname: &str, dev_path: &str) { let mut hdl1 = ds.get_inode_handle(objs[1], gen, true).await.unwrap(); let mut hdl2 = ds.get_inode_handle(objs[2], gen, true).await.unwrap(); let mut hdl3 = ds.get_inode_handle(objs[3], gen, true).await.unwrap(); - ds.write_object(&hdl0, data0.len() as u64, false, vec![&data0]) - .await - .unwrap(); + ds.write_object( + &hdl0, + WriteMode::OverwriteFrom(data0.len() as u64), + false, + vec![&data0], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl0).await.unwrap().blksize, 2048); - ds.write_object(&hdl1, data1.len() as u64, false, vec![&data1]) - .await - .unwrap(); + ds.write_object( + &hdl1, + WriteMode::OverwriteFrom(data1.len() as u64), + false, + vec![&data1], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl1).await.unwrap().blksize, blksize1); - ds.write_object(&hdl2, data2.len() as u64, false, vec![&data2]) - .await - .unwrap(); + ds.write_object( + &hdl2, + WriteMode::OverwriteFrom(data2.len() as u64), + false, + vec![&data2], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl2).await.unwrap().blksize, blksize2); - ds.write_object(&hdl3, 0, false, vec![&data2]) + ds.write_object(&hdl3, WriteMode::OverwriteFrom(0), false, vec![&data2]) .await .unwrap(); assert_eq!(ds.get_object_attr(&hdl3).await.unwrap().blksize, 1024); @@ -964,19 +996,19 @@ async fn test_increase_max(dsname: &str, dev_path: &str) { let mut hdl2 = ds.get_inode_handle(objs[2], gen, true).await.unwrap(); // blksize of obj0 > increased max > original max let data0 = vec![1; 9 << 9]; - ds.write_object(&hdl0, 0, false, vec![&data0]) + ds.write_object(&hdl0, WriteMode::OverwriteFrom(0), false, vec![&data0]) .await .unwrap(); assert_eq!(ds.get_object_attr(&hdl0).await.unwrap().blksize, 1024); // increased max > blksize of obj1 > original max let data1 = vec![1; 3 << 9]; - ds.write_object(&hdl1, 0, false, vec![&data1]) + ds.write_object(&hdl1, WriteMode::OverwriteFrom(0), false, vec![&data1]) .await .unwrap(); assert_eq!(ds.get_object_attr(&hdl1).await.unwrap().blksize, 1024); // increased max > orignal max > blksize of obj2 let data2 = vec![1; 1 << 9]; - ds.write_object(&hdl2, 0, false, vec![&data2]) + ds.write_object(&hdl2, WriteMode::OverwriteFrom(0), false, vec![&data2]) .await .unwrap(); assert_eq!(ds.get_object_attr(&hdl2).await.unwrap().blksize, 512); @@ -991,17 +1023,32 @@ async fn test_increase_max(dsname: &str, dev_path: &str) { let mut hdl0 = ds.get_inode_handle(objs[0], gen, true).await.unwrap(); let mut hdl1 = ds.get_inode_handle(objs[1], gen, true).await.unwrap(); let mut hdl2 = ds.get_inode_handle(objs[2], gen, true).await.unwrap(); - ds.write_object(&hdl0, data0.len() as u64, false, vec![&data0]) - .await - .unwrap(); + ds.write_object( + &hdl0, + WriteMode::OverwriteFrom(data0.len() as u64), + false, + vec![&data0], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl0).await.unwrap().blksize, 1024); - ds.write_object(&hdl1, data1.len() as u64, false, vec![&data1]) - .await - .unwrap(); + ds.write_object( + &hdl1, + WriteMode::OverwriteFrom(data1.len() as u64), + false, + vec![&data1], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl1).await.unwrap().blksize, 1024); - ds.write_object(&hdl2, data2.len() as u64, false, vec![&data0]) - .await - .unwrap(); + ds.write_object( + &hdl2, + WriteMode::OverwriteFrom(data2.len() as u64), + false, + vec![&data0], + ) + .await + .unwrap(); assert_eq!(ds.get_object_attr(&hdl2).await.unwrap().blksize, 4096); ds.release_inode_handle(&mut hdl0).await; ds.release_inode_handle(&mut hdl1).await; @@ -1135,7 +1182,12 @@ fn uzfs_sync_test() { .await .unwrap(); ds_cloned - .write_object(&obj_hdl, offset, sync, vec![&data]) + .write_object( + &obj_hdl, + WriteMode::OverwriteFrom(offset), + sync, + vec![&data], + ) .await .unwrap(); ds_cloned.release_inode_handle(&mut obj_hdl).await; @@ -1202,9 +1254,14 @@ async fn uzfs_write_read_test() { handles.push(tokio::spawn(async move { let data: Vec<_> = (0..blksize).map(|_| rand::thread_rng().gen()).collect(); let mut obj_hdl = ds.get_inode_handle(obj, u64::MAX, true).await.unwrap(); - ds.write_object(&obj_hdl, offset as u64, false, vec![&data]) - .await - .unwrap(); + ds.write_object( + &obj_hdl, + WriteMode::OverwriteFrom(offset as u64), + false, + vec![&data], + ) + .await + .unwrap(); let read = ds .read_object(&obj_hdl, offset as u64, blksize as u64) .await @@ -1266,9 +1323,14 @@ async fn uzfs_truncate_test() { let end_size = if rand::thread_rng().gen_bool(0.5) { total_data[..data.len()].copy_from_slice(&data); total_data[(truncate_size as usize)..].fill(0); - ds.write_object(&mut obj_hdl, 0, false, vec![&data]) - .await - .unwrap(); + ds.write_object( + &mut obj_hdl, + WriteMode::OverwriteFrom(0), + false, + vec![&data], + ) + .await + .unwrap(); ds.truncate_object(&mut obj_hdl, 0, truncate_size) .await .unwrap(); @@ -1279,7 +1341,7 @@ async fn uzfs_truncate_test() { ds.truncate_object(&mut obj_hdl, 0, truncate_size) .await .unwrap(); - ds.write_object(&obj_hdl, 0, false, vec![&data]) + ds.write_object(&obj_hdl, WriteMode::OverwriteFrom(0), false, vec![&data]) .await .unwrap(); std::cmp::max(write_size, truncate_size) @@ -1339,12 +1401,22 @@ async fn next_block_test() { let obj = ds.create_objects(1).await.unwrap().0[0]; let mut ino_hdl = ds.get_inode_handle(obj, u64::MAX, true).await.unwrap(); let data = vec![1; 65536]; - ds.write_object(&ino_hdl, 65536, false, vec![&data]) - .await - .unwrap(); - ds.write_object(&ino_hdl, 262144, false, vec![&data]) - .await - .unwrap(); + ds.write_object( + &ino_hdl, + WriteMode::OverwriteFrom(65536), + false, + vec![&data], + ) + .await + .unwrap(); + ds.write_object( + &ino_hdl, + WriteMode::OverwriteFrom(262144), + false, + vec![&data], + ) + .await + .unwrap(); ds.wait_synced().await; let (off, size) = ds.object_next_block(&ino_hdl, 0).await.unwrap().unwrap(); @@ -1428,6 +1500,64 @@ async fn dentry_test() { uzfs_env_fini().await; } +#[tokio::test(flavor = "multi_thread")] +async fn append_write_test() { + let dsname = "append_write_test/ds"; + let uzfs_test_env = UzfsTestEnv::new(100 * 1024 * 1024); + uzfs_env_init().await; + let ds = Arc::new( + Dataset::init( + dsname, + uzfs_test_env.get_dev_path(), + DatasetType::Data, + 0, + false, + ) + .await + .unwrap(), + ); + + let concurrency = 16; + let blksize = 64 << 10; + let ncalls = 32; + + let obj = ds.create_objects(1).await.unwrap().0[0]; + let handles: Vec<_> = (0..concurrency) + .map(|i| { + let ds = ds.clone(); + tokio::spawn(async move { + let data = vec![i as u8; blksize]; + let mut ino_hdl = ds.get_inode_handle(obj, u64::MAX, true).await.unwrap(); + for _ in 0..ncalls { + ds.write_object(&ino_hdl, WriteMode::AppendToEnd, false, vec![&data]) + .await + .unwrap(); + } + ds.release_inode_handle(&mut ino_hdl).await; + }) + }) + .collect(); + + join_all(handles).await; + + let mut ino_hdl = ds.get_inode_handle(obj, u64::MAX, true).await.unwrap(); + let size = ds.get_object_attr(&ino_hdl).await.unwrap().size; + assert_eq!(size, concurrency * blksize as u64 * ncalls); + for i in 0..(concurrency * ncalls) { + let data = ds + .read_object(&ino_hdl, i * blksize as u64, blksize as u64) + .await + .unwrap(); + for byte in &data { + assert_eq!(byte, &data[0]); + } + } + ds.release_inode_handle(&mut ino_hdl).await; + + ds.close().await.unwrap(); + uzfs_env_fini().await; +} + #[tokio::test(flavor = "multi_thread")] async fn read_zero_copy_test() { let dsname = "read_zero_copy_test/ds"; @@ -1458,7 +1588,7 @@ async fn read_zero_copy_test() { let off = thread_rng().gen_range(0..65536); let size = thread_rng().gen_range(16384..65536); let buf = vec![123; size]; - ds.write_object(&ino_hdl, off, false, vec![&buf]) + ds.write_object(&ino_hdl, WriteMode::OverwriteFrom(off), false, vec![&buf]) .await .unwrap(); let read_buf = ds diff --git a/zfs b/zfs index bf8ab7c..9cf7f08 160000 --- a/zfs +++ b/zfs @@ -1 +1 @@ -Subproject commit bf8ab7c5ddc718a2eb38873c0e75a5721264a893 +Subproject commit 9cf7f08d284a6b70b6c2591705b241cf6eb269ad From 53e7ff87a5c4cd2acdc5a08eab27b9c566cb9bc5 Mon Sep 17 00:00:00 2001 From: sundengyu Date: Mon, 17 Feb 2025 14:03:47 +0800 Subject: [PATCH 4/4] feat(aio): run aio reaper in a runtime Signed-off-by: sundengyu --- src/bin/bench/main.rs | 21 ++++++-- src/bindings/async_sys.rs | 4 +- src/context/coroutine_c.rs | 15 +++--- src/dataset.rs | 4 ++ src/io/async_io.rs | 100 ++++++++++++++++--------------------- src/io/async_io_c.rs | 15 +++--- zfs | 2 +- 7 files changed, 85 insertions(+), 76 deletions(-) diff --git a/src/bin/bench/main.rs b/src/bin/bench/main.rs index 91174e3..a93d589 100644 --- a/src/bin/bench/main.rs +++ b/src/bin/bench/main.rs @@ -1,4 +1,5 @@ use std::{sync::Arc, time::Instant}; + use uzfs::*; #[derive(Clone, Copy, Debug)] @@ -26,7 +27,7 @@ async fn worker(obj: u64, ds: Arc, blksize: u64, file_size: u64, sync: .unwrap(); } BenchOp::Read => { - ds.read_object_zero_copy(&ino_hdl, offset, blksize) + ds.read_object(&ino_hdl, offset, blksize) .await .unwrap(); } @@ -62,14 +63,14 @@ async fn bench( println!("{op:?} throughput: {throughput}MB/s"); } -#[tokio::main] -async fn main() { +async fn bench_main() { uzfs_env_init().await; let dev_path = std::env::args().nth(1).unwrap(); let sync: bool = std::env::args().nth(2).unwrap().parse().unwrap(); - let concurrency = 48; + let concurrency = 10; let blksize = 1 << 20; - let file_size = 1 << 30; + let file_size = 10 << 30; + config_uzfs(8 << 30, 10, true); let ds = Arc::new( Dataset::init("testzp/ds", &dev_path, DatasetType::Data, 0, false) @@ -80,6 +81,7 @@ async fn main() { let objs = ds.create_objects(concurrency).await.unwrap().0; bench(&objs, ds.clone(), blksize, file_size, sync, BenchOp::Write).await; + ds.wait_synced().await; bench(&objs, ds.clone(), blksize, file_size, sync, BenchOp::Read).await; for obj in objs { @@ -91,3 +93,12 @@ async fn main() { ds.close().await.unwrap(); uzfs_env_fini().await; } + +fn main() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(9) + .build() + .unwrap(); + rt.block_on(bench_main()); +} diff --git a/src/bindings/async_sys.rs b/src/bindings/async_sys.rs index 362e50d..d182927 100644 --- a/src/bindings/async_sys.rs +++ b/src/bindings/async_sys.rs @@ -80,8 +80,8 @@ pub(crate) unsafe fn set_libuzfs_ops(log_func: Option>> = OnceCell::new(); -static BACKGROUND_RT: OnceCell = OnceCell::new(); +static BACKGROUND_RT: OnceCell = OnceCell::new(); -fn enter_background_rt<'a>() -> EnterGuard<'a> { +pub(crate) fn enter_background_rt<'a>() -> tokio::runtime::EnterGuard<'a> { let rt = BACKGROUND_RT.get_or_init(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() diff --git a/src/dataset.rs b/src/dataset.rs index 1632323..7f38fbd 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -248,6 +248,10 @@ impl ReadBufReleaser { let stop_cloned = stop.clone(); let (sender, mut receiver) = unbounded_channel::(); + + #[cfg(test)] + let _guard = crate::context::coroutine_c::enter_background_rt(); + let handle = tokio::spawn(async move { loop { let mut bufs = Vec::with_capacity(256); diff --git a/src/io/async_io.rs b/src/io/async_io.rs index 3d78724..12dd1db 100644 --- a/src/io/async_io.rs +++ b/src/io/async_io.rs @@ -1,6 +1,3 @@ -use super::aio::*; -use crate::context::coroutine::CoroutineFuture; -use libc::{c_char, c_int, c_void, timespec}; use std::{ io::{Error, ErrorKind}, ptr::null_mut, @@ -8,13 +5,18 @@ use std::{ atomic::{fence, AtomicBool, AtomicPtr, Ordering}, Arc, Condvar, Mutex, }, - thread::JoinHandle, }; -use tokio::runtime::Handle; + +use libc::{c_char, c_int, c_void, timespec}; +use tokio::task::yield_now; + +use crate::context::coroutine::CoroutineFuture; + +use super::aio::*; type DoneFunc = unsafe extern "C" fn(arg: *mut libc::c_void, res: i64); type InitArgsFunc = - unsafe extern "C" fn(*mut c_void, *mut u64, *mut *mut c_char, *mut usize) -> c_int; + unsafe extern "C" fn(*mut c_void, *mut u64, *mut *mut c_char, *mut usize, *mut c_int) -> c_int; #[inline] unsafe fn io_setup(nr_events: i64) -> Result { @@ -123,17 +125,15 @@ pub(super) struct TaskList { sem: Semaphore, next_off: usize, init_io_arg: InitArgsFunc, - io_fd: i32, } impl TaskList { - pub(super) fn new_arc(next_off: usize, init_io_arg: InitArgsFunc, io_fd: i32) -> Arc { + pub(super) fn new_arc(next_off: usize, init_io_arg: InitArgsFunc) -> Arc { Arc::new(Self { head: AtomicPtr::new(null_mut()), sem: Semaphore::new(0), next_off, init_io_arg, - io_fd, }) } @@ -179,7 +179,8 @@ impl TaskList { let mut off = 0; let mut data = null_mut(); let mut len = 0; - let io_type = (self.init_io_arg)(cur, &mut off, &mut data, &mut len); + let mut fd = 0; + let io_type = (self.init_io_arg)(cur, &mut off, &mut data, &mut len, &mut fd); let opcode = match io_type { AIO_READ => IOCB_CMD_PREAD, @@ -194,7 +195,7 @@ impl TaskList { aio_rw_flags: 0, aio_lio_opcode: opcode as u16, aio_reqprio: 0, - aio_fildes: self.io_fd as u32, + aio_fildes: fd as u32, aio_buf: data as u64, aio_nbytes: len as u64, aio_offset: off as i64, @@ -236,8 +237,8 @@ unsafe extern "C" fn process_completion(arg: *mut libc::c_void) { pub(super) struct AioContext { pub(super) task_list: Arc, - reaper: Option>, - submitter: Option>, + reaper: tokio::task::JoinHandle<()>, + submitter: std::thread::JoinHandle<()>, stop: Arc, io_ctx: aio_context_t, } @@ -252,12 +253,7 @@ impl AioContext { } } - pub fn reap( - io_ctx: aio_context_t, - stop: Arc, - handle: Handle, - io_done: DoneFunc, - ) -> Result<(), Error> { + pub async fn reap(io_ctx: aio_context_t, stop: Arc, io_done: DoneFunc) { while !stop.load(Ordering::Acquire) { let mut ts = timespec { tv_sec: 0, @@ -277,67 +273,59 @@ impl AioContext { }; if ret > 0 { - handle.block_on(async move { - unsafe { completions.set_len(ret as usize) }; - let mut completions = IoCompletions { - completions, - io_done, - }; - let arg = &mut completions as *mut _ as usize; - CoroutineFuture::new(process_completion, arg).await; - }); - continue; + unsafe { completions.set_len(ret as usize) }; + let mut completions = IoCompletions { + completions, + io_done, + }; + let arg = &mut completions as *mut _ as usize; + CoroutineFuture::new(process_completion, arg).await; + } else if ret < 0 { + let err = Error::last_os_error(); + if err.kind() != ErrorKind::Interrupted + && err.kind() != ErrorKind::WouldBlock + && err.kind() != ErrorKind::TimedOut + { + panic!("unexpected error when reaping complted ios {err}"); + } } - if ret == 0 { - continue; - } - - let err = Error::last_os_error(); - if err.kind() != ErrorKind::Interrupted - && err.kind() != ErrorKind::WouldBlock - && err.kind() != ErrorKind::TimedOut - { - return Err(err); - } + yield_now().await; } - - Ok(()) } pub(super) fn start( - io_fd: i32, io_done: DoneFunc, next_off: usize, init_io_args: InitArgsFunc, ) -> Result { - let io_ctx = unsafe { io_setup(256)? }; - let task_list = TaskList::new_arc(next_off, init_io_args, io_fd); + let io_ctx = unsafe { io_setup(MAX_EVENTS as i64)? }; + let task_list = TaskList::new_arc(next_off, init_io_args); let task_list_cloned = task_list.clone(); let submitter = std::thread::spawn(move || Self::submit(task_list_cloned, io_ctx)); let stop = Arc::new(AtomicBool::new(false)); let stop_cloned = stop.clone(); - let handle = Handle::current(); - let reaper = std::thread::spawn(move || { - Self::reap(io_ctx, stop_cloned, handle, io_done).unwrap(); - }); + + #[cfg(test)] + let _guard = crate::context::coroutine_c::enter_background_rt(); + + let reaper = tokio::spawn(Self::reap(io_ctx, stop_cloned, io_done)); + Ok(Self { task_list: task_list, - reaper: Some(reaper), - submitter: Some(submitter), + reaper, + submitter, stop, io_ctx, }) } -} -impl Drop for AioContext { - fn drop(&mut self) { + pub(super) async fn exit(self) { self.stop.store(true, Ordering::Release); self.task_list.close(); fence(Ordering::SeqCst); - self.submitter.take().unwrap().join().unwrap(); - self.reaper.take().unwrap().join().unwrap(); + self.submitter.join().unwrap(); + self.reaper.await.unwrap(); unsafe { io_destroy(self.io_ctx).unwrap() } } } diff --git a/src/io/async_io_c.rs b/src/io/async_io_c.rs index 955df15..dfd42db 100644 --- a/src/io/async_io_c.rs +++ b/src/io/async_io_c.rs @@ -1,22 +1,25 @@ -use crate::bindings::sys::{aio_done_func_t, init_io_args_func_t}; +use crate::{ + bindings::sys::{aio_done_func_t, init_io_args_func_t}, + context::coroutine::CoroutineFuture, +}; use super::async_io::AioContext; #[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn register_fd( - fd: i32, +pub unsafe extern "C" fn aio_init( next_off: usize, io_done: aio_done_func_t, init_io_args: init_io_args_func_t, ) -> *mut libc::c_void { let aio_context = - Box::new(AioContext::start(fd, io_done.unwrap(), next_off, init_io_args.unwrap()).unwrap()); + Box::new(AioContext::start(io_done.unwrap(), next_off, init_io_args.unwrap()).unwrap()); Box::into_raw(aio_context) as *mut libc::c_void } #[allow(clippy::missing_safety_doc)] -pub unsafe extern "C" fn unregister_fd(aio_hdl: *mut libc::c_void) { - drop(Box::from_raw(aio_hdl as *mut AioContext)); +pub unsafe extern "C" fn aio_fini(aio_hdl: *mut libc::c_void) { + let aio_handle = Box::from_raw(aio_hdl as *mut AioContext); + CoroutineFuture::poll_until_ready(aio_handle.exit()); } #[allow(clippy::missing_safety_doc)] diff --git a/zfs b/zfs index 9cf7f08..ada456c 160000 --- a/zfs +++ b/zfs @@ -1 +1 @@ -Subproject commit 9cf7f08d284a6b70b6c2591705b241cf6eb269ad +Subproject commit ada456c6f1d37e3bc12b893b5ebd5f57d164f959