From acfa2e46f7acf2abdc2fb8c23c9061e45395998d Mon Sep 17 00:00:00 2001 From: Markus Hosch Date: Mon, 10 Aug 2020 14:09:29 +0200 Subject: [PATCH 1/2] Add API to hook into Windows IOCP loop An API similar to the one of mio 0.6 is now available so that custom objects can be added to the IOCP that is used to implement Poll. --- Cargo.toml | 3 +- src/lib.rs | 62 +++++ src/macros/mod.rs | 11 + src/sys/mod.rs | 22 +- src/sys/windows/afd.rs | 11 +- src/sys/windows/event.rs | 1 + src/sys/windows/iocp_handler.rs | 182 +++++++++++++++ src/sys/windows/mod.rs | 6 + src/sys/windows/selector.rs | 385 +++++++++++++++++++++++++------- src/sys/windows/waker.rs | 16 +- 10 files changed, 602 insertions(+), 97 deletions(-) create mode 100644 src/sys/windows/iocp_handler.rs diff --git a/Cargo.toml b/Cargo.toml index 85d3fd5c0..b9502400b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,9 @@ libc = "0.2.69" [target.'cfg(windows)'.dependencies] miow = "0.3.3" -winapi = { version = "0.3", features = ["winsock2", "mswsock"] } +winapi = { version = "0.3", features = ["winsock2", "mswsock", "impl-default", "errhandlingapi"] } ntapi = "0.3" +slab = "0.4" [dev-dependencies] env_logger = { version = "0.6.2", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index 1853582da..db70b2ed6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,68 @@ pub mod unix { pub use crate::sys::SourceFd; } +cfg_any_os_util! { +/// Windows-only extensions to the mio crate. +/// +/// Mio on windows is currently implemented with IOCP for a high-performance +/// implementation of asynchronous I/O. On top of the IOCP, a connection to the "Ancillary Function +/// Driver" provides socket services. However, this approach doesn't work for other handle types +/// like files and named pipes. They need to be connected directly to the IO completion port like +/// this is done on unix-like systems with the `SourceFd` type. The purpose of this module is to +/// similarly provide a mechanism for foreign I/O types to get hooked up into the IOCP event loop. +/// +/// This module provides types for interfacing with a custom IOCP handle: +/// +/// * `Binding` - this type is intended to govern binding with mio's `Registry` +/// type. Each I/O object should contain an instance of `Binding` that's +/// interfaced with for the implementation of the `Source` trait. The +/// `register`, `reregister`, and `deregister` methods for the `Source` trait +/// may use methods provided by `Binding` during their operations. +/// +/// Also note that for types which represent streams of bytes the mio +/// interface of *readiness* doesn't map directly to the Windows model of +/// *completion*. This means that types will have to perform internal +/// buffering to ensure that a readiness interface can be provided. +/// +/// * `Overlapped` - this type is intended to be used as the concrete instances +/// of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for +/// safety, that all asynchronous operations are initiated with an instance of +/// `Overlapped` and not another instantiation of `OVERLAPPED`. +/// +/// Mio's `Overlapped` type is created with an object implementing `CompletionHandler` that +/// is notified on completion of the associated asynchronous operation. The provided +/// `OVERLAPPED_ENTRY` type is defined in the `winapi` crate. Whenever a completion is posted to +/// an IOCP object the `OVERLAPPED` that was signaled will be interpreted as +/// `Overlapped` in the mio crate and the `CompletionHandler` will be invoked. +/// Through this object, and through the `OVERLAPPED` pointer, implementations can handle +/// management of I/O events. +/// +/// * `Readiness` - this type is used to indicate whether an IOCP client is ready to start the next +/// IO operation. This is done either by returning a `Readiness` value from the +/// `CompletionHandler` or by using `Binding::inject_event`. +/// +/// * `CompletionHandler` - this trait is implemented by the IOCP client and is used as a callback +/// when an asynchronous operation has finished. Currently it only implements one callback but may +/// be extended in the future. +/// +/// When put together these types enable custom Windows handles to be +/// registered with mio's event loops. The `Binding` type is used to associate +/// handles and the `Overlapped` type is used to execute I/O operations. When +/// the I/O operations are completed an object implementing `CompletionHandler` is called which +/// typically emits a `Readiness` that is then reported to the user who then may start the next +/// asynchronous operation. +#[cfg(windows)] + pub mod windows { + //! Windows only extensions. + pub use crate::sys::{ + Binding, + CompletionCallback, + Overlapped, + Readiness, + }; + } +} + // Enable with `cargo doc --features extra-docs`. #[cfg(feature = "extra-docs")] pub mod features { diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db25795d..39c97846c 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -70,6 +70,17 @@ macro_rules! cfg_udp { } } +/// Feature `os-util` enabled. +macro_rules! cfg_os_util { + ($($item:item)*) => { + $( + #[cfg(feature = "os-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "os-util")))] + $item + )* + } +} + /// Feature `uds` enabled. #[cfg(unix)] macro_rules! cfg_uds { diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 1359181f4..32db52883 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -80,7 +80,27 @@ cfg_os_poll! { #[cfg(windows)] cfg_os_poll! { mod windows; - pub(crate) use self::windows::*; + pub(crate) use self::windows::{Waker, Event, Events, event, Selector}; + cfg_any_os_util! { + pub use self::windows::{ + Binding, + CompletionCallback, + Overlapped, + Readiness, + }; + } + + cfg_net! { + pub(crate) use self::windows::IoSourceState; + } + + cfg_tcp! { + pub(crate) use self::windows::tcp; + } + + cfg_udp! { + pub(crate) use self::windows::udp; + } } cfg_not_os_poll! { diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 43d443c15..3e89439e1 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -114,17 +114,17 @@ impl Afd { } cfg_net! { - use miow::iocp::CompletionPort; use ntapi::ntioapi::FILE_OPEN; use ntapi::ntioapi::NtCreateFile; use std::mem::zeroed; use std::os::windows::io::{FromRawHandle, RawHandle}; - use std::sync::atomic::{AtomicUsize, Ordering}; use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR}; use winapi::um::handleapi::INVALID_HANDLE_VALUE; use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE}; use winapi::um::winnt::SYNCHRONIZE; use winapi::um::winnt::{FILE_SHARE_READ, FILE_SHARE_WRITE}; + use super::iocp_handler::IocpHandlerRegistry; + use super::selector::AfdCompletionPortEventHandler; const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES { Length: size_of::() as ULONG, @@ -159,8 +159,6 @@ cfg_net! { 'o' as _ ]; - static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0); - impl AfdPollInfo { pub fn zeroed() -> AfdPollInfo { unsafe { zeroed() } @@ -169,7 +167,7 @@ cfg_net! { impl Afd { /// Create new Afd instance. - pub fn new(cp: &CompletionPort) -> io::Result { + pub(crate) fn new(cp: &IocpHandlerRegistry, handler: AfdCompletionPortEventHandler) -> io::Result { let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE; let mut iosb = IO_STATUS_BLOCK { u: IO_STATUS_BLOCK_u { Status: 0 }, @@ -196,9 +194,8 @@ cfg_net! { )); } let fd = File::from_raw_handle(afd_helper_handle as RawHandle); - let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1; let afd = Afd { fd }; - cp.add_handle(token, &afd.fd)?; + cp.register_handle(&afd.fd, handler.into())?; match SetFileCompletionNotificationModes( afd_helper_handle, FILE_SKIP_SET_EVENT_ON_HANDLE, diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index b3412551d..9b8b56a70 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -5,6 +5,7 @@ use miow::iocp::CompletionStatus; use super::afd; use crate::Token; +#[derive(Debug)] pub struct Event { pub flags: u32, pub data: u64, diff --git a/src/sys/windows/iocp_handler.rs b/src/sys/windows/iocp_handler.rs new file mode 100644 index 000000000..bd6277c7a --- /dev/null +++ b/src/sys/windows/iocp_handler.rs @@ -0,0 +1,182 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, + io, + fmt +}; + +use winapi::shared::winerror; +use miow::{ + Overlapped, + iocp::{ + CompletionPort, + CompletionStatus + } +}; + +use slab::Slab; + +use crate::{ + Token, + sys::windows::{ + Event, + afd, + selector::AfdCompletionPortEventHandler, + }, +}; + +#[cfg(feature = "os-util")] +use crate::sys::windows::selector::RawHandleCompletionHandler; + +pub trait IocpHandler: fmt::Debug + Send + Sync + 'static { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option; + fn on_poll_finished(&mut self) { } +} + +#[derive(Debug)] +pub(crate) enum RegisteredHandler { + AfdHandler(AfdCompletionPortEventHandler), + WakerHandler(WakerHandler), + #[cfg(feature = "os-util")] + RawHandleHandler(RawHandleCompletionHandler) +} + +impl From for RegisteredHandler { + fn from(h: AfdCompletionPortEventHandler) -> Self { + RegisteredHandler::AfdHandler(h) + } +} + +impl From for RegisteredHandler { + fn from(h: WakerHandler) -> Self { + RegisteredHandler::WakerHandler(h) + } +} + +#[cfg(feature = "os-util")] +impl From for RegisteredHandler { + fn from(h: RawHandleCompletionHandler) -> Self { + RegisteredHandler::RawHandleHandler(h) + } +} + +impl IocpHandler for RegisteredHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + match self { + RegisteredHandler::AfdHandler(handler) => handler.handle_completion(status), + RegisteredHandler::WakerHandler(handler) => handler.handle_completion(status), + #[cfg(feature = "os-util")] + RegisteredHandler::RawHandleHandler(handler) => handler.handle_completion(status), + } + } + + fn on_poll_finished(&mut self) { + match self { + RegisteredHandler::AfdHandler(handler) => handler.on_poll_finished(), + RegisteredHandler::WakerHandler(handler) => handler.on_poll_finished(), + #[cfg(feature = "os-util")] + RegisteredHandler::RawHandleHandler(handler) => handler.on_poll_finished(), + } + } +} + +#[derive(Debug)] +pub struct IocpWaker { + token: usize, + iocp_registry: Arc, +} + +#[derive(Debug)] +pub(crate) struct WakerHandler { + external_token: Token, +} + +impl IocpHandler for WakerHandler { + fn handle_completion(&mut self, _status: &CompletionStatus) -> Option { + Some(Event { + flags: afd::POLL_RECEIVE, + data: self.external_token.0 as u64 + }) + } +} + +impl IocpWaker { + pub fn post(&self, bytes: u32, overlapped: *mut Overlapped) -> io::Result<()> { + self.iocp_registry.cp.post(CompletionStatus::new(bytes, self.token, overlapped)) + } +} + +#[derive(Debug)] +pub struct IocpHandlerRegistry { + cp: CompletionPort, + handlers: Mutex>, +} + +impl IocpHandlerRegistry { + pub fn new() -> io::Result { + CompletionPort::new(0).map(|cp| + Self { + cp, + handlers: Mutex::new(Slab::new()) + }) + } + + pub fn register_waker(self: Arc, token: Token) -> IocpWaker { + let handler = WakerHandler { + external_token: token + }; + let slab_token = self.handlers.lock().unwrap() + .insert(handler.into()); + IocpWaker { + token: slab_token, + iocp_registry: self + } + } + + pub fn handle_pending_events(&self, + statuses: &mut [CompletionStatus], + mut events: Option<&mut Vec>, + timeout: Option) -> io::Result { + let result = match self.cp.get_many(statuses, timeout) { + Ok(iocp_events) => { + let mut num_events = 0; + let mut handlers = self.handlers.lock().unwrap(); + for status in iocp_events { + let key = status.token(); + if let Some(handler) = handlers.get_mut(key) { + if let Some(event) = handler.handle_completion(status) { + if let Some(events) = &mut events { + events.push(event); + } + num_events += 1; + } + } + } + + Ok(num_events) + }, + + Err(ref e) if e.raw_os_error() == Some(winerror::WAIT_TIMEOUT as i32) => Ok(0), + + Err(e) => Err(e) + }; + + for (_, handler) in self.handlers.lock().unwrap().iter_mut() { + handler.on_poll_finished(); + } + + result + } +} + +cfg_any_os_util! { + use std::os::windows::io::AsRawHandle; + + impl IocpHandlerRegistry { + pub(crate) fn register_handle(&self, handle: &T, handler: RegisteredHandler) -> io::Result<()> + where T: AsRawHandle + ?Sized { + let token = self.handlers.lock().unwrap().insert(handler); + self.cp.add_handle(token, handle) + } + } +} diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index e1f48038d..a1d4e0710 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -7,6 +7,12 @@ pub use event::{Event, Events}; mod selector; pub use selector::{Selector, SelectorInner, SockState}; +mod iocp_handler; + +cfg_any_os_util! { + pub use selector::{CompletionCallback, Overlapped, Readiness, Binding}; +} + // Macros must be defined before the modules that use them cfg_net! { /// Helper macro to execute a system call that returns an `io::Result`. diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index b1395ac6c..517d13e0b 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -5,42 +5,45 @@ use crate::sys::event::{ ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS, }; use crate::sys::Events; +use crate::sys::windows::iocp_handler::{ + IocpHandler, IocpHandlerRegistry +}; use crate::Interest; -use miow::iocp::{CompletionPort, CompletionStatus}; -use miow::Overlapped; +use miow::iocp::CompletionStatus; use std::collections::VecDeque; use std::marker::PhantomPinned; use std::os::windows::io::RawSocket; use std::pin::Pin; #[cfg(debug_assertions)] -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{io, ptr}; use winapi::shared::ntdef::NT_SUCCESS; use winapi::shared::ntdef::{HANDLE, PVOID}; use winapi::shared::ntstatus::STATUS_CANCELLED; -use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT}; +use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING}; use winapi::um::minwinbase::OVERLAPPED; /// Overlapped value to indicate a `Waker` event. // // Note: this must be null, `SelectorInner::feed_events` depends on it. -pub const WAKER_OVERLAPPED: *mut Overlapped = ptr::null_mut(); +pub const WAKER_OVERLAPPED: *mut miow::Overlapped = ptr::null_mut(); #[derive(Debug)] struct AfdGroup { - cp: Arc, + cp: Arc, afd_group: Mutex>>, + completion_handler: AfdCompletionPortEventHandler, } impl AfdGroup { - pub fn new(cp: Arc) -> AfdGroup { + pub fn new(cp: Arc, completion_handler: AfdCompletionPortEventHandler) -> AfdGroup { AfdGroup { afd_group: Mutex::new(Vec::new()), cp, + completion_handler } } @@ -50,6 +53,12 @@ impl AfdGroup { } } +impl Drop for AfdGroup { + fn drop(&mut self) { + self.release_unused_afd(); + } +} + cfg_net! { const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; @@ -75,7 +84,7 @@ cfg_net! { } fn _alloc_afd_group(&self, afd_group: &mut Vec>) -> io::Result<()> { - let afd = Afd::new(&self.cp)?; + let afd = Afd::new(&self.cp, self.completion_handler.clone())?; let arc = Arc::new(afd); afd_group.push(arc); Ok(()) @@ -113,6 +122,37 @@ pub struct SockState { pinned: PhantomPinned, } +#[derive(Debug)] +pub(crate) struct AfdCompletionPortEventHandler { + update_queue: Vec>>>, + selector_update_queue: Arc>>>>>, +} + +impl Clone for AfdCompletionPortEventHandler { + fn clone(&self) -> Self { + Self { + update_queue: Vec::new(), + selector_update_queue: Arc::clone(&self.selector_update_queue) + } + } +} + +impl IocpHandler for AfdCompletionPortEventHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + let sock_state = from_overlapped(status.overlapped()); + let mut sock_guard = sock_state.lock().unwrap(); + let event = sock_guard.feed_event(); + if !sock_guard.is_pending_deletion() { + self.update_queue.push(sock_state.clone()); + } + event + } + + fn on_poll_finished(&mut self) { + self.selector_update_queue.lock().unwrap().extend(self.update_queue.drain(..)); + } +} + impl SockState { fn update(&mut self, self_arc: &Pin>>) -> io::Result<()> { assert!(!self.delete_pending); @@ -328,7 +368,7 @@ fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin>> { #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(0); -/// Windows implementaion of `sys::Selector` +/// Windows implementation of `sys::Selector` /// /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState` /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening. @@ -371,14 +411,233 @@ impl Selector { self.inner.select(events, timeout) } - pub(super) fn clone_port(&self) -> Arc { + pub(super) fn clone_port(&self) -> Arc { self.inner.cp.clone() } } +cfg_os_util! { + use std::fmt; + use std::cell::UnsafeCell; + use std::os::windows::io::AsRawHandle; + use std::ops::BitOr; + use std::num::NonZeroU32; + + use winapi::um::minwinbase::OVERLAPPED_ENTRY; + + use crate::poll; + use crate::{Registry, Token}; + + const OVERLAPPED_CANARY: u32 = 0x5e219f3a; + + /// Holds a set of actions that the sending IOCP client is ready to do. + /// + /// These can be combined into one `Readiness` instance by using the bitwise or operator '|'. + /// However, there is no "not ready" state, i.e. and empty `Readiness`. To indicate the absence of + /// any ready operation, use Option and return a None value. + #[derive(Debug, Clone, Copy, Eq, PartialEq)] + pub struct Readiness { + flags: NonZeroU32, + } + + impl Readiness { + /// Readiness that indicates readiness to read + pub const READ: Readiness = Readiness { flags: unsafe { NonZeroU32::new_unchecked(0b_0000_0001) } }; + /// Readiness that indicates readiness to write + pub const WRITE: Readiness = Readiness { flags: unsafe { NonZeroU32::new_unchecked(0b_0000_0010) } }; + + /// Checks whether the provided readiness is fulfilled + pub fn is_set(&self, readiness: Readiness) -> bool { + (self.flags.get() & readiness.flags.get()) == readiness.flags.get() + } + + pub(crate) fn into_event(self, token: Token) -> Event { + let readable = if self.is_set(Self::READ) { afd::POLL_RECEIVE } else { 0 }; + let writable = if self.is_set(Self::WRITE) { afd::POLL_SEND } else { 0 }; + let flags = readable | writable; + debug_assert!(flags != 0); + Event { + data: token.0 as u64, + flags + } + } + } + + impl BitOr for Readiness { + type Output = Readiness; + + fn bitor(self, rhs: Self) -> Self::Output { + // Since neither self nor rhs can be zero (assuming absence of transmuted Readinesses), it's + // safe to assume that the result is also nonzero. + let flags = unsafe { + NonZeroU32::new_unchecked(self.flags.get() | rhs.flags.get()) + }; + Readiness { flags } + } + } + + /// Callback trait for completed IOCP operations. + /// + /// This trait may be implemented to handle completed IOCP operations. The implementing object may + /// signal the readiness for certain operations to the upper layer by returning a `Readiness` value + /// that is then converted into `mio::Event` and delivered to the user. + /// + /// This trait is implemented for closures as well. + pub trait CompletionCallback { + /// Called on completion of the associated operation. + /// + /// The callback will be called once the operation that it is bound to via a call to + /// `Overlapped::new` has been completed and the IOCP is signalled. By returning a `Readiness` + /// value, the callback may deliver a `mio::Event` to the user. + fn complete_operation(&mut self, entry: &OVERLAPPED_ENTRY) -> Option; + } + + impl CompletionCallback for F where F: FnMut(&OVERLAPPED_ENTRY) -> Option { + fn complete_operation(&mut self, entry: &OVERLAPPED_ENTRY) -> Option { + (*self)(entry) + } + } + + #[derive(Debug)] + pub(crate) struct RawHandleCompletionHandler { + token: Token + } + + impl IocpHandler for RawHandleCompletionHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + let overlapped = status.overlapped() as *mut Overlapped; + unsafe { + debug_assert!((*overlapped).canary == OVERLAPPED_CANARY); + (*overlapped).callback.complete_operation(status.entry()) + .map(|r| r.into_event(self.token)) + } + } + } + + /// A `Binding` is embedded in all I/O objects associated with a `Registry`. + /// + /// Each registration keeps track of which selector the I/O object is + /// associated with, ensuring that implementations of `Source` can be + /// conformant for the various methods on Windows. + /// + /// If you're working with custom IOCP-enabled objects then you'll want to + /// ensure that one of these instances is stored in your object and used in the + /// implementation of `Source`. + /// + /// For more information about how to use this see the `windows` module + /// documentation in this crate. + #[derive(Debug, Clone)] + pub struct Binding { + selector: Arc, + token: Token + } + + impl Binding { + /// Creates a new blank binding ready to be inserted into an I/O + /// object. + /// + /// Won't actually do anything until `register_handle` has been called during a call to + /// `Registry::register`. + pub fn new(registry: &Registry, token: Token) -> Self { + let selector = poll::selector(registry); + Binding { selector: Arc::clone(&selector.inner), token } + } + + /// Register a raw windows handle with the IOCP event loop. + /// + /// The underlying resource needs to support overlapped operations. Thus, files and similar + /// entities need to be opened in overlapped mode. + /// + /// While it is safely possible to call this function more than once it is discouraged to do + /// so as the handler will use the same token again when generating events. This makes it hard + /// for the user of the registered `Source`. + pub fn register_handle(&self, handle: &H) -> io::Result<()> where H: AsRawHandle + ?Sized { + let handler = RawHandleCompletionHandler { + token: self.token + }; + self.selector.cp.register_handle(handle, handler.into()) + } + + /// Inject an event to the event loop. + /// + /// In case it is required to generate an event even though there was no callback due to a + /// completed IO operation, this method can be used to mark the associated token as ready, e.g. + /// in case it is necessary to signal an initial readiness before any IO operation has been + /// scheduled. + pub fn inject_event(&self, readiness: Readiness) { + let event = readiness.into_event(self.token); + self.selector.injected_events.lock().unwrap().push(event) + } + + /// Check whether the provided registry is the same as the one used during creation of the + /// binding. + /// + /// IOCP driven sources cannot unregister from their completion port before they get closed. + /// Therefore, implementations of reregister and deregister should check whether they have been + /// called on the same `Registry` using this method to report errors in case this constraint + /// has been violated. + pub fn is_same_registry(&self, registry: &Registry) -> bool { + Arc::ptr_eq(&self.selector, &poll::selector(registry).inner) + } + } + + /// A wrapper around an internal instance over `miow::Overlapped` which is in + /// turn a wrapper around the Windows type `OVERLAPPED`. + /// + /// This type is required to be used for all IOCP operations on handles that are + /// registered with an event loop. The event loop will receive notifications + /// over `OVERLAPPED` pointers that have completed, and it will cast that + /// pointer to a pointer to this structure and invoke the associated callback. + #[repr(C)] + pub struct Overlapped { + inner: UnsafeCell, + #[cfg(debug_assertions)] + canary: u32, + callback: Box, + } + + impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Overlapped").finish() + } + } + + impl Overlapped { + /// Creates a new `Overlapped` which will invoke the provided `cb` callback + /// whenever it's triggered. + /// + /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all + /// I/O operations that are registered with mio's event loop. When the I/O + /// operation associated with an `OVERLAPPED` pointer completes the event + /// loop will invoke the object provided as `callback`. + pub fn new(callback: C) -> Self { + Self { + inner: UnsafeCell::new(miow::Overlapped::zero()), + #[cfg(debug_assertions)] + canary: OVERLAPPED_CANARY, + callback: Box::new(callback) as Box, + } + } + + /// Get the underlying `Overlapped` instance as a raw pointer. + /// + /// This can be useful when only a shared borrow is held and the overlapped + /// pointer needs to be passed down to winapi. + pub fn as_mut_ptr(&self) -> *mut OVERLAPPED { + unsafe { + (*self.inner.get()).raw() + } + } + } + + // Overlapped's APIs are marked as unsafe as they must be used with caution to ensure thread + // safety. The structure itself is safe to send across threads. + unsafe impl Send for Overlapped {} + unsafe impl Sync for Overlapped {} +} + cfg_net! { use super::InternalState; - use crate::Token; impl Selector { pub(super) fn register( @@ -408,8 +667,9 @@ cfg_net! { #[derive(Debug)] pub struct SelectorInner { - cp: Arc, - update_queue: Mutex>>>>, + cp: Arc, + update_queue: Arc>>>>>, + injected_events: Mutex>, afd_group: AfdGroup, is_polling: AtomicBool, } @@ -419,14 +679,20 @@ unsafe impl Sync for SelectorInner {} impl SelectorInner { pub fn new() -> io::Result { - CompletionPort::new(0).map(|cp| { + IocpHandlerRegistry::new().map(|cp| { let cp = Arc::new(cp); let cp_afd = Arc::clone(&cp); + let update_queue = Arc::new(Mutex::new(VecDeque::new())); + let completion_handler = AfdCompletionPortEventHandler { + selector_update_queue: Arc::clone(&update_queue), + update_queue: Vec::new() + }; SelectorInner { cp, - update_queue: Mutex::new(VecDeque::new()), - afd_group: AfdGroup::new(cp_afd), + update_queue, + injected_events: Mutex::default(), + afd_group: AfdGroup::new(cp_afd, completion_handler), is_polling: AtomicBool::new(false), } }) @@ -462,14 +728,26 @@ impl SelectorInner { unsafe { self.update_sockets_events() }?; - let result = self.cp.get_many(statuses, timeout); + events.extend(self.injected_events.lock().unwrap().drain(..)); + // since events was empty before this call, the number of events is the number of injected + // events + let num_drained_events = events.len(); - self.is_polling.store(false, Ordering::Relaxed); + if num_drained_events == 0 { + let result = self.cp.handle_pending_events(statuses, Some(events), timeout); - match result { - Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }), - Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0), - Err(e) => Err(e), + self.is_polling.store(false, Ordering::Relaxed); + + match result { + Ok(num_events) => { + self.afd_group.release_unused_afd(); + Ok(num_events + num_drained_events) + }, + Err(e) => Err(e), + } + } else { + self.is_polling.store(false, Ordering::Relaxed); + Ok(num_drained_events) } } @@ -488,43 +766,6 @@ impl SelectorInner { self.afd_group.release_unused_afd(); Ok(()) } - - // It returns processed count of iocp_events rather than the events itself. - unsafe fn feed_events( - &self, - events: &mut Vec, - iocp_events: &[CompletionStatus], - ) -> usize { - let mut n = 0; - let mut update_queue = self.update_queue.lock().unwrap(); - for iocp_event in iocp_events.iter() { - if iocp_event.overlapped().is_null() { - // `Waker` event, we'll add a readable event to match the other platforms. - events.push(Event { - flags: afd::POLL_RECEIVE, - data: iocp_event.token() as u64, - }); - n += 1; - continue; - } - - let sock_state = from_overlapped(iocp_event.overlapped()); - let mut sock_guard = sock_state.lock().unwrap(); - match sock_guard.feed_event() { - Some(e) => { - events.push(e); - n += 1; - } - None => {} - } - - if !sock_guard.is_pending_deletion() { - update_queue.push_back(sock_state.clone()); - } - } - self.afd_group.release_unused_afd(); - n - } } cfg_net! { @@ -686,35 +927,23 @@ cfg_net! { impl Drop for SelectorInner { fn drop(&mut self) { loop { - let events_num: usize; let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024]; let result = self .cp - .get_many(&mut statuses, Some(std::time::Duration::from_millis(0))); + .handle_pending_events(&mut statuses, None, Some(Duration::from_millis(0))); match result { - Ok(iocp_events) => { - events_num = iocp_events.iter().len(); - for iocp_event in iocp_events.iter() { - if !iocp_event.overlapped().is_null() { - // drain sock state to release memory of Arc reference - let _sock_state = from_overlapped(iocp_event.overlapped()); - } - } - } + Ok(events_num) if events_num == 0 => { + break + }, + + Ok(_) => (), Err(_) => { break; } } - - if events_num == 0 { - // continue looping until all completion statuses have been drained - break; - } } - - self.afd_group.release_unused_afd(); } } diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs index 1b21cf148..0d97bc839 100644 --- a/src/sys/windows/waker.rs +++ b/src/sys/windows/waker.rs @@ -1,28 +1,24 @@ use crate::sys::windows::selector::WAKER_OVERLAPPED; use crate::sys::windows::Selector; +use crate::sys::windows::iocp_handler::IocpWaker; use crate::Token; -use miow::iocp::{CompletionPort, CompletionStatus}; use std::io; -use std::sync::Arc; #[derive(Debug)] pub struct Waker { - token: Token, - port: Arc, + cp_waker: IocpWaker, } impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { - Ok(Waker { - token, - port: selector.clone_port(), - }) + let cp_registry = selector.clone_port(); + let cp_waker = cp_registry.register_waker(token); + Ok(Waker { cp_waker }) } pub fn wake(&self) -> io::Result<()> { // Keep NULL as Overlapped value to notify waking. - let status = CompletionStatus::new(0, self.token.0, WAKER_OVERLAPPED); - self.port.post(status) + self.cp_waker.post(0, WAKER_OVERLAPPED) } } From bccc605b94b37e6ffabe757672e03eff02221847 Mon Sep 17 00:00:00 2001 From: Markus Hosch Date: Tue, 11 Aug 2020 08:08:30 +0200 Subject: [PATCH 2/2] Add preliminary test for the new old Windows IOCP API This is a first test to play with the Windows IOCP handler API. --- tests/win_rawhandlesrc.rs | 123 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tests/win_rawhandlesrc.rs diff --git a/tests/win_rawhandlesrc.rs b/tests/win_rawhandlesrc.rs new file mode 100644 index 000000000..65ed99e68 --- /dev/null +++ b/tests/win_rawhandlesrc.rs @@ -0,0 +1,123 @@ +#![cfg(all(windows, feature = "os-util"))] + +use mio::windows; +use mio::windows::Readiness; +use mio::{event::Source, Events, Interest, Poll, Registry, Token}; +use std::default::Default; +use std::ffi::OsStr; +use std::fs::File; +use std::io::{self, Seek, SeekFrom, Write}; +use std::iter; +use std::os::windows::ffi::OsStrExt; +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; +use std::ptr; +use winapi::shared::winerror::ERROR_IO_PENDING; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +use winapi::um::{ + errhandlingapi::GetLastError, + fileapi::{CreateFileW, WriteFile, CREATE_ALWAYS}, + handleapi::INVALID_HANDLE_VALUE, + minwinbase::OVERLAPPED, + winbase::FILE_FLAG_OVERLAPPED, + winnt::GENERIC_WRITE, +}; + +struct AsyncFile { + file: File, + binding: Option, +} + +impl Source for AsyncFile { + fn register( + &mut self, + registry: &Registry, + token: Token, + _interests: Interest, + ) -> io::Result<()> { + let binding = windows::Binding::new(registry, token); + self.binding = Some(binding); + self.binding.as_ref().unwrap().register_handle(&self.file) + } + + fn reregister( + &mut self, + _registry: &Registry, + _token: Token, + _interests: Interest, + ) -> io::Result<()> { + unimplemented!() + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + unimplemented!() + } +} + +impl AsRawHandle for AsyncFile { + fn as_raw_handle(&self) -> RawHandle { + self.file.as_raw_handle() + } +} + +#[test] +fn register_custom_iocp_handler() { + let mut poll = Poll::new().unwrap(); + + let path: &OsStr = "C:\\temp\\test.txt".as_ref(); + let path_u16: Vec = path.encode_wide().chain(iter::once(0)).collect(); + let mut file = unsafe { + let handle = CreateFileW( + path_u16.as_ptr(), + GENERIC_WRITE, + 0, + ptr::null_mut(), + CREATE_ALWAYS, + FILE_FLAG_OVERLAPPED, + ptr::null_mut(), + ); + if handle == INVALID_HANDLE_VALUE { + panic!("Unable to open file!"); + } + AsyncFile { + file: File::from_raw_handle(handle), + binding: None, + } + }; + + poll.registry() + .register(&mut file, Token(0), Interest::WRITABLE) + .unwrap(); + + let mut buffer: Vec<_> = iter::successors(Some(15u8), |p| Some(p.wrapping_add(2))) + .take(1024) + .collect(); + loop { + unsafe { + let mut overlapped = + windows::Overlapped::new(move |_: &OVERLAPPED_ENTRY| Some(Readiness::WRITE)); + if WriteFile( + file.as_raw_handle(), + buffer.as_mut_ptr() as *mut _, + buffer.len() as u32, + ptr::null_mut(), + &mut overlapped as *mut windows::Overlapped as *mut OVERLAPPED, + ) == 0 + { + match GetLastError() { + ERROR_IO_PENDING => { + let mut events = Events::with_capacity(16); + poll.poll(&mut events, None).unwrap(); + let mut event_iter = events.iter(); + let event = event_iter.next().unwrap(); + assert_eq!(0, event.token().0); + assert!(event.is_writable()); + assert!(event_iter.next().is_none()); + break; + } + + e => panic!("Error during file write operation: {}", e), + } + } + } + } +}