diff --git a/Cargo.toml b/Cargo.toml index 85d3fd5c0..b9502400b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,9 @@ libc = "0.2.69" [target.'cfg(windows)'.dependencies] miow = "0.3.3" -winapi = { version = "0.3", features = ["winsock2", "mswsock"] } +winapi = { version = "0.3", features = ["winsock2", "mswsock", "impl-default", "errhandlingapi"] } ntapi = "0.3" +slab = "0.4" [dev-dependencies] env_logger = { version = "0.6.2", default-features = false } diff --git a/src/lib.rs b/src/lib.rs index 1853582da..db70b2ed6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,6 +89,68 @@ pub mod unix { pub use crate::sys::SourceFd; } +cfg_any_os_util! { +/// Windows-only extensions to the mio crate. +/// +/// Mio on windows is currently implemented with IOCP for a high-performance +/// implementation of asynchronous I/O. On top of the IOCP, a connection to the "Ancillary Function +/// Driver" provides socket services. However, this approach doesn't work for other handle types +/// like files and named pipes. They need to be connected directly to the IO completion port like +/// this is done on unix-like systems with the `SourceFd` type. The purpose of this module is to +/// similarly provide a mechanism for foreign I/O types to get hooked up into the IOCP event loop. +/// +/// This module provides types for interfacing with a custom IOCP handle: +/// +/// * `Binding` - this type is intended to govern binding with mio's `Registry` +/// type. Each I/O object should contain an instance of `Binding` that's +/// interfaced with for the implementation of the `Source` trait. The +/// `register`, `reregister`, and `deregister` methods for the `Source` trait +/// may use methods provided by `Binding` during their operations. +/// +/// Also note that for types which represent streams of bytes the mio +/// interface of *readiness* doesn't map directly to the Windows model of +/// *completion*. This means that types will have to perform internal +/// buffering to ensure that a readiness interface can be provided. +/// +/// * `Overlapped` - this type is intended to be used as the concrete instances +/// of the `OVERLAPPED` type that most win32 methods expect. It's crucial, for +/// safety, that all asynchronous operations are initiated with an instance of +/// `Overlapped` and not another instantiation of `OVERLAPPED`. +/// +/// Mio's `Overlapped` type is created with an object implementing `CompletionHandler` that +/// is notified on completion of the associated asynchronous operation. The provided +/// `OVERLAPPED_ENTRY` type is defined in the `winapi` crate. Whenever a completion is posted to +/// an IOCP object the `OVERLAPPED` that was signaled will be interpreted as +/// `Overlapped` in the mio crate and the `CompletionHandler` will be invoked. +/// Through this object, and through the `OVERLAPPED` pointer, implementations can handle +/// management of I/O events. +/// +/// * `Readiness` - this type is used to indicate whether an IOCP client is ready to start the next +/// IO operation. This is done either by returning a `Readiness` value from the +/// `CompletionHandler` or by using `Binding::inject_event`. +/// +/// * `CompletionHandler` - this trait is implemented by the IOCP client and is used as a callback +/// when an asynchronous operation has finished. Currently it only implements one callback but may +/// be extended in the future. +/// +/// When put together these types enable custom Windows handles to be +/// registered with mio's event loops. The `Binding` type is used to associate +/// handles and the `Overlapped` type is used to execute I/O operations. When +/// the I/O operations are completed an object implementing `CompletionHandler` is called which +/// typically emits a `Readiness` that is then reported to the user who then may start the next +/// asynchronous operation. +#[cfg(windows)] + pub mod windows { + //! Windows only extensions. + pub use crate::sys::{ + Binding, + CompletionCallback, + Overlapped, + Readiness, + }; + } +} + // Enable with `cargo doc --features extra-docs`. #[cfg(feature = "extra-docs")] pub mod features { diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db25795d..39c97846c 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -70,6 +70,17 @@ macro_rules! cfg_udp { } } +/// Feature `os-util` enabled. +macro_rules! cfg_os_util { + ($($item:item)*) => { + $( + #[cfg(feature = "os-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "os-util")))] + $item + )* + } +} + /// Feature `uds` enabled. #[cfg(unix)] macro_rules! cfg_uds { diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 1359181f4..32db52883 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -80,7 +80,27 @@ cfg_os_poll! { #[cfg(windows)] cfg_os_poll! { mod windows; - pub(crate) use self::windows::*; + pub(crate) use self::windows::{Waker, Event, Events, event, Selector}; + cfg_any_os_util! { + pub use self::windows::{ + Binding, + CompletionCallback, + Overlapped, + Readiness, + }; + } + + cfg_net! { + pub(crate) use self::windows::IoSourceState; + } + + cfg_tcp! { + pub(crate) use self::windows::tcp; + } + + cfg_udp! { + pub(crate) use self::windows::udp; + } } cfg_not_os_poll! { diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 43d443c15..3e89439e1 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -114,17 +114,17 @@ impl Afd { } cfg_net! { - use miow::iocp::CompletionPort; use ntapi::ntioapi::FILE_OPEN; use ntapi::ntioapi::NtCreateFile; use std::mem::zeroed; use std::os::windows::io::{FromRawHandle, RawHandle}; - use std::sync::atomic::{AtomicUsize, Ordering}; use winapi::shared::ntdef::{OBJECT_ATTRIBUTES, UNICODE_STRING, USHORT, WCHAR}; use winapi::um::handleapi::INVALID_HANDLE_VALUE; use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVENT_ON_HANDLE}; use winapi::um::winnt::SYNCHRONIZE; use winapi::um::winnt::{FILE_SHARE_READ, FILE_SHARE_WRITE}; + use super::iocp_handler::IocpHandlerRegistry; + use super::selector::AfdCompletionPortEventHandler; const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES { Length: size_of::() as ULONG, @@ -159,8 +159,6 @@ cfg_net! { 'o' as _ ]; - static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0); - impl AfdPollInfo { pub fn zeroed() -> AfdPollInfo { unsafe { zeroed() } @@ -169,7 +167,7 @@ cfg_net! { impl Afd { /// Create new Afd instance. - pub fn new(cp: &CompletionPort) -> io::Result { + pub(crate) fn new(cp: &IocpHandlerRegistry, handler: AfdCompletionPortEventHandler) -> io::Result { let mut afd_helper_handle: HANDLE = INVALID_HANDLE_VALUE; let mut iosb = IO_STATUS_BLOCK { u: IO_STATUS_BLOCK_u { Status: 0 }, @@ -196,9 +194,8 @@ cfg_net! { )); } let fd = File::from_raw_handle(afd_helper_handle as RawHandle); - let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1; let afd = Afd { fd }; - cp.add_handle(token, &afd.fd)?; + cp.register_handle(&afd.fd, handler.into())?; match SetFileCompletionNotificationModes( afd_helper_handle, FILE_SKIP_SET_EVENT_ON_HANDLE, diff --git a/src/sys/windows/event.rs b/src/sys/windows/event.rs index b3412551d..9b8b56a70 100644 --- a/src/sys/windows/event.rs +++ b/src/sys/windows/event.rs @@ -5,6 +5,7 @@ use miow::iocp::CompletionStatus; use super::afd; use crate::Token; +#[derive(Debug)] pub struct Event { pub flags: u32, pub data: u64, diff --git a/src/sys/windows/iocp_handler.rs b/src/sys/windows/iocp_handler.rs new file mode 100644 index 000000000..bd6277c7a --- /dev/null +++ b/src/sys/windows/iocp_handler.rs @@ -0,0 +1,182 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, + io, + fmt +}; + +use winapi::shared::winerror; +use miow::{ + Overlapped, + iocp::{ + CompletionPort, + CompletionStatus + } +}; + +use slab::Slab; + +use crate::{ + Token, + sys::windows::{ + Event, + afd, + selector::AfdCompletionPortEventHandler, + }, +}; + +#[cfg(feature = "os-util")] +use crate::sys::windows::selector::RawHandleCompletionHandler; + +pub trait IocpHandler: fmt::Debug + Send + Sync + 'static { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option; + fn on_poll_finished(&mut self) { } +} + +#[derive(Debug)] +pub(crate) enum RegisteredHandler { + AfdHandler(AfdCompletionPortEventHandler), + WakerHandler(WakerHandler), + #[cfg(feature = "os-util")] + RawHandleHandler(RawHandleCompletionHandler) +} + +impl From for RegisteredHandler { + fn from(h: AfdCompletionPortEventHandler) -> Self { + RegisteredHandler::AfdHandler(h) + } +} + +impl From for RegisteredHandler { + fn from(h: WakerHandler) -> Self { + RegisteredHandler::WakerHandler(h) + } +} + +#[cfg(feature = "os-util")] +impl From for RegisteredHandler { + fn from(h: RawHandleCompletionHandler) -> Self { + RegisteredHandler::RawHandleHandler(h) + } +} + +impl IocpHandler for RegisteredHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + match self { + RegisteredHandler::AfdHandler(handler) => handler.handle_completion(status), + RegisteredHandler::WakerHandler(handler) => handler.handle_completion(status), + #[cfg(feature = "os-util")] + RegisteredHandler::RawHandleHandler(handler) => handler.handle_completion(status), + } + } + + fn on_poll_finished(&mut self) { + match self { + RegisteredHandler::AfdHandler(handler) => handler.on_poll_finished(), + RegisteredHandler::WakerHandler(handler) => handler.on_poll_finished(), + #[cfg(feature = "os-util")] + RegisteredHandler::RawHandleHandler(handler) => handler.on_poll_finished(), + } + } +} + +#[derive(Debug)] +pub struct IocpWaker { + token: usize, + iocp_registry: Arc, +} + +#[derive(Debug)] +pub(crate) struct WakerHandler { + external_token: Token, +} + +impl IocpHandler for WakerHandler { + fn handle_completion(&mut self, _status: &CompletionStatus) -> Option { + Some(Event { + flags: afd::POLL_RECEIVE, + data: self.external_token.0 as u64 + }) + } +} + +impl IocpWaker { + pub fn post(&self, bytes: u32, overlapped: *mut Overlapped) -> io::Result<()> { + self.iocp_registry.cp.post(CompletionStatus::new(bytes, self.token, overlapped)) + } +} + +#[derive(Debug)] +pub struct IocpHandlerRegistry { + cp: CompletionPort, + handlers: Mutex>, +} + +impl IocpHandlerRegistry { + pub fn new() -> io::Result { + CompletionPort::new(0).map(|cp| + Self { + cp, + handlers: Mutex::new(Slab::new()) + }) + } + + pub fn register_waker(self: Arc, token: Token) -> IocpWaker { + let handler = WakerHandler { + external_token: token + }; + let slab_token = self.handlers.lock().unwrap() + .insert(handler.into()); + IocpWaker { + token: slab_token, + iocp_registry: self + } + } + + pub fn handle_pending_events(&self, + statuses: &mut [CompletionStatus], + mut events: Option<&mut Vec>, + timeout: Option) -> io::Result { + let result = match self.cp.get_many(statuses, timeout) { + Ok(iocp_events) => { + let mut num_events = 0; + let mut handlers = self.handlers.lock().unwrap(); + for status in iocp_events { + let key = status.token(); + if let Some(handler) = handlers.get_mut(key) { + if let Some(event) = handler.handle_completion(status) { + if let Some(events) = &mut events { + events.push(event); + } + num_events += 1; + } + } + } + + Ok(num_events) + }, + + Err(ref e) if e.raw_os_error() == Some(winerror::WAIT_TIMEOUT as i32) => Ok(0), + + Err(e) => Err(e) + }; + + for (_, handler) in self.handlers.lock().unwrap().iter_mut() { + handler.on_poll_finished(); + } + + result + } +} + +cfg_any_os_util! { + use std::os::windows::io::AsRawHandle; + + impl IocpHandlerRegistry { + pub(crate) fn register_handle(&self, handle: &T, handler: RegisteredHandler) -> io::Result<()> + where T: AsRawHandle + ?Sized { + let token = self.handlers.lock().unwrap().insert(handler); + self.cp.add_handle(token, handle) + } + } +} diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index e1f48038d..a1d4e0710 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -7,6 +7,12 @@ pub use event::{Event, Events}; mod selector; pub use selector::{Selector, SelectorInner, SockState}; +mod iocp_handler; + +cfg_any_os_util! { + pub use selector::{CompletionCallback, Overlapped, Readiness, Binding}; +} + // Macros must be defined before the modules that use them cfg_net! { /// Helper macro to execute a system call that returns an `io::Result`. diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index b1395ac6c..517d13e0b 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -5,42 +5,45 @@ use crate::sys::event::{ ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS, }; use crate::sys::Events; +use crate::sys::windows::iocp_handler::{ + IocpHandler, IocpHandlerRegistry +}; use crate::Interest; -use miow::iocp::{CompletionPort, CompletionStatus}; -use miow::Overlapped; +use miow::iocp::CompletionStatus; use std::collections::VecDeque; use std::marker::PhantomPinned; use std::os::windows::io::RawSocket; use std::pin::Pin; #[cfg(debug_assertions)] -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{io, ptr}; use winapi::shared::ntdef::NT_SUCCESS; use winapi::shared::ntdef::{HANDLE, PVOID}; use winapi::shared::ntstatus::STATUS_CANCELLED; -use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT}; +use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING}; use winapi::um::minwinbase::OVERLAPPED; /// Overlapped value to indicate a `Waker` event. // // Note: this must be null, `SelectorInner::feed_events` depends on it. -pub const WAKER_OVERLAPPED: *mut Overlapped = ptr::null_mut(); +pub const WAKER_OVERLAPPED: *mut miow::Overlapped = ptr::null_mut(); #[derive(Debug)] struct AfdGroup { - cp: Arc, + cp: Arc, afd_group: Mutex>>, + completion_handler: AfdCompletionPortEventHandler, } impl AfdGroup { - pub fn new(cp: Arc) -> AfdGroup { + pub fn new(cp: Arc, completion_handler: AfdCompletionPortEventHandler) -> AfdGroup { AfdGroup { afd_group: Mutex::new(Vec::new()), cp, + completion_handler } } @@ -50,6 +53,12 @@ impl AfdGroup { } } +impl Drop for AfdGroup { + fn drop(&mut self) { + self.release_unused_afd(); + } +} + cfg_net! { const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; @@ -75,7 +84,7 @@ cfg_net! { } fn _alloc_afd_group(&self, afd_group: &mut Vec>) -> io::Result<()> { - let afd = Afd::new(&self.cp)?; + let afd = Afd::new(&self.cp, self.completion_handler.clone())?; let arc = Arc::new(afd); afd_group.push(arc); Ok(()) @@ -113,6 +122,37 @@ pub struct SockState { pinned: PhantomPinned, } +#[derive(Debug)] +pub(crate) struct AfdCompletionPortEventHandler { + update_queue: Vec>>>, + selector_update_queue: Arc>>>>>, +} + +impl Clone for AfdCompletionPortEventHandler { + fn clone(&self) -> Self { + Self { + update_queue: Vec::new(), + selector_update_queue: Arc::clone(&self.selector_update_queue) + } + } +} + +impl IocpHandler for AfdCompletionPortEventHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + let sock_state = from_overlapped(status.overlapped()); + let mut sock_guard = sock_state.lock().unwrap(); + let event = sock_guard.feed_event(); + if !sock_guard.is_pending_deletion() { + self.update_queue.push(sock_state.clone()); + } + event + } + + fn on_poll_finished(&mut self) { + self.selector_update_queue.lock().unwrap().extend(self.update_queue.drain(..)); + } +} + impl SockState { fn update(&mut self, self_arc: &Pin>>) -> io::Result<()> { assert!(!self.delete_pending); @@ -328,7 +368,7 @@ fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin>> { #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(0); -/// Windows implementaion of `sys::Selector` +/// Windows implementation of `sys::Selector` /// /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState` /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening. @@ -371,14 +411,233 @@ impl Selector { self.inner.select(events, timeout) } - pub(super) fn clone_port(&self) -> Arc { + pub(super) fn clone_port(&self) -> Arc { self.inner.cp.clone() } } +cfg_os_util! { + use std::fmt; + use std::cell::UnsafeCell; + use std::os::windows::io::AsRawHandle; + use std::ops::BitOr; + use std::num::NonZeroU32; + + use winapi::um::minwinbase::OVERLAPPED_ENTRY; + + use crate::poll; + use crate::{Registry, Token}; + + const OVERLAPPED_CANARY: u32 = 0x5e219f3a; + + /// Holds a set of actions that the sending IOCP client is ready to do. + /// + /// These can be combined into one `Readiness` instance by using the bitwise or operator '|'. + /// However, there is no "not ready" state, i.e. and empty `Readiness`. To indicate the absence of + /// any ready operation, use Option and return a None value. + #[derive(Debug, Clone, Copy, Eq, PartialEq)] + pub struct Readiness { + flags: NonZeroU32, + } + + impl Readiness { + /// Readiness that indicates readiness to read + pub const READ: Readiness = Readiness { flags: unsafe { NonZeroU32::new_unchecked(0b_0000_0001) } }; + /// Readiness that indicates readiness to write + pub const WRITE: Readiness = Readiness { flags: unsafe { NonZeroU32::new_unchecked(0b_0000_0010) } }; + + /// Checks whether the provided readiness is fulfilled + pub fn is_set(&self, readiness: Readiness) -> bool { + (self.flags.get() & readiness.flags.get()) == readiness.flags.get() + } + + pub(crate) fn into_event(self, token: Token) -> Event { + let readable = if self.is_set(Self::READ) { afd::POLL_RECEIVE } else { 0 }; + let writable = if self.is_set(Self::WRITE) { afd::POLL_SEND } else { 0 }; + let flags = readable | writable; + debug_assert!(flags != 0); + Event { + data: token.0 as u64, + flags + } + } + } + + impl BitOr for Readiness { + type Output = Readiness; + + fn bitor(self, rhs: Self) -> Self::Output { + // Since neither self nor rhs can be zero (assuming absence of transmuted Readinesses), it's + // safe to assume that the result is also nonzero. + let flags = unsafe { + NonZeroU32::new_unchecked(self.flags.get() | rhs.flags.get()) + }; + Readiness { flags } + } + } + + /// Callback trait for completed IOCP operations. + /// + /// This trait may be implemented to handle completed IOCP operations. The implementing object may + /// signal the readiness for certain operations to the upper layer by returning a `Readiness` value + /// that is then converted into `mio::Event` and delivered to the user. + /// + /// This trait is implemented for closures as well. + pub trait CompletionCallback { + /// Called on completion of the associated operation. + /// + /// The callback will be called once the operation that it is bound to via a call to + /// `Overlapped::new` has been completed and the IOCP is signalled. By returning a `Readiness` + /// value, the callback may deliver a `mio::Event` to the user. + fn complete_operation(&mut self, entry: &OVERLAPPED_ENTRY) -> Option; + } + + impl CompletionCallback for F where F: FnMut(&OVERLAPPED_ENTRY) -> Option { + fn complete_operation(&mut self, entry: &OVERLAPPED_ENTRY) -> Option { + (*self)(entry) + } + } + + #[derive(Debug)] + pub(crate) struct RawHandleCompletionHandler { + token: Token + } + + impl IocpHandler for RawHandleCompletionHandler { + fn handle_completion(&mut self, status: &CompletionStatus) -> Option { + let overlapped = status.overlapped() as *mut Overlapped; + unsafe { + debug_assert!((*overlapped).canary == OVERLAPPED_CANARY); + (*overlapped).callback.complete_operation(status.entry()) + .map(|r| r.into_event(self.token)) + } + } + } + + /// A `Binding` is embedded in all I/O objects associated with a `Registry`. + /// + /// Each registration keeps track of which selector the I/O object is + /// associated with, ensuring that implementations of `Source` can be + /// conformant for the various methods on Windows. + /// + /// If you're working with custom IOCP-enabled objects then you'll want to + /// ensure that one of these instances is stored in your object and used in the + /// implementation of `Source`. + /// + /// For more information about how to use this see the `windows` module + /// documentation in this crate. + #[derive(Debug, Clone)] + pub struct Binding { + selector: Arc, + token: Token + } + + impl Binding { + /// Creates a new blank binding ready to be inserted into an I/O + /// object. + /// + /// Won't actually do anything until `register_handle` has been called during a call to + /// `Registry::register`. + pub fn new(registry: &Registry, token: Token) -> Self { + let selector = poll::selector(registry); + Binding { selector: Arc::clone(&selector.inner), token } + } + + /// Register a raw windows handle with the IOCP event loop. + /// + /// The underlying resource needs to support overlapped operations. Thus, files and similar + /// entities need to be opened in overlapped mode. + /// + /// While it is safely possible to call this function more than once it is discouraged to do + /// so as the handler will use the same token again when generating events. This makes it hard + /// for the user of the registered `Source`. + pub fn register_handle(&self, handle: &H) -> io::Result<()> where H: AsRawHandle + ?Sized { + let handler = RawHandleCompletionHandler { + token: self.token + }; + self.selector.cp.register_handle(handle, handler.into()) + } + + /// Inject an event to the event loop. + /// + /// In case it is required to generate an event even though there was no callback due to a + /// completed IO operation, this method can be used to mark the associated token as ready, e.g. + /// in case it is necessary to signal an initial readiness before any IO operation has been + /// scheduled. + pub fn inject_event(&self, readiness: Readiness) { + let event = readiness.into_event(self.token); + self.selector.injected_events.lock().unwrap().push(event) + } + + /// Check whether the provided registry is the same as the one used during creation of the + /// binding. + /// + /// IOCP driven sources cannot unregister from their completion port before they get closed. + /// Therefore, implementations of reregister and deregister should check whether they have been + /// called on the same `Registry` using this method to report errors in case this constraint + /// has been violated. + pub fn is_same_registry(&self, registry: &Registry) -> bool { + Arc::ptr_eq(&self.selector, &poll::selector(registry).inner) + } + } + + /// A wrapper around an internal instance over `miow::Overlapped` which is in + /// turn a wrapper around the Windows type `OVERLAPPED`. + /// + /// This type is required to be used for all IOCP operations on handles that are + /// registered with an event loop. The event loop will receive notifications + /// over `OVERLAPPED` pointers that have completed, and it will cast that + /// pointer to a pointer to this structure and invoke the associated callback. + #[repr(C)] + pub struct Overlapped { + inner: UnsafeCell, + #[cfg(debug_assertions)] + canary: u32, + callback: Box, + } + + impl fmt::Debug for Overlapped { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Overlapped").finish() + } + } + + impl Overlapped { + /// Creates a new `Overlapped` which will invoke the provided `cb` callback + /// whenever it's triggered. + /// + /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all + /// I/O operations that are registered with mio's event loop. When the I/O + /// operation associated with an `OVERLAPPED` pointer completes the event + /// loop will invoke the object provided as `callback`. + pub fn new(callback: C) -> Self { + Self { + inner: UnsafeCell::new(miow::Overlapped::zero()), + #[cfg(debug_assertions)] + canary: OVERLAPPED_CANARY, + callback: Box::new(callback) as Box, + } + } + + /// Get the underlying `Overlapped` instance as a raw pointer. + /// + /// This can be useful when only a shared borrow is held and the overlapped + /// pointer needs to be passed down to winapi. + pub fn as_mut_ptr(&self) -> *mut OVERLAPPED { + unsafe { + (*self.inner.get()).raw() + } + } + } + + // Overlapped's APIs are marked as unsafe as they must be used with caution to ensure thread + // safety. The structure itself is safe to send across threads. + unsafe impl Send for Overlapped {} + unsafe impl Sync for Overlapped {} +} + cfg_net! { use super::InternalState; - use crate::Token; impl Selector { pub(super) fn register( @@ -408,8 +667,9 @@ cfg_net! { #[derive(Debug)] pub struct SelectorInner { - cp: Arc, - update_queue: Mutex>>>>, + cp: Arc, + update_queue: Arc>>>>>, + injected_events: Mutex>, afd_group: AfdGroup, is_polling: AtomicBool, } @@ -419,14 +679,20 @@ unsafe impl Sync for SelectorInner {} impl SelectorInner { pub fn new() -> io::Result { - CompletionPort::new(0).map(|cp| { + IocpHandlerRegistry::new().map(|cp| { let cp = Arc::new(cp); let cp_afd = Arc::clone(&cp); + let update_queue = Arc::new(Mutex::new(VecDeque::new())); + let completion_handler = AfdCompletionPortEventHandler { + selector_update_queue: Arc::clone(&update_queue), + update_queue: Vec::new() + }; SelectorInner { cp, - update_queue: Mutex::new(VecDeque::new()), - afd_group: AfdGroup::new(cp_afd), + update_queue, + injected_events: Mutex::default(), + afd_group: AfdGroup::new(cp_afd, completion_handler), is_polling: AtomicBool::new(false), } }) @@ -462,14 +728,26 @@ impl SelectorInner { unsafe { self.update_sockets_events() }?; - let result = self.cp.get_many(statuses, timeout); + events.extend(self.injected_events.lock().unwrap().drain(..)); + // since events was empty before this call, the number of events is the number of injected + // events + let num_drained_events = events.len(); - self.is_polling.store(false, Ordering::Relaxed); + if num_drained_events == 0 { + let result = self.cp.handle_pending_events(statuses, Some(events), timeout); - match result { - Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }), - Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0), - Err(e) => Err(e), + self.is_polling.store(false, Ordering::Relaxed); + + match result { + Ok(num_events) => { + self.afd_group.release_unused_afd(); + Ok(num_events + num_drained_events) + }, + Err(e) => Err(e), + } + } else { + self.is_polling.store(false, Ordering::Relaxed); + Ok(num_drained_events) } } @@ -488,43 +766,6 @@ impl SelectorInner { self.afd_group.release_unused_afd(); Ok(()) } - - // It returns processed count of iocp_events rather than the events itself. - unsafe fn feed_events( - &self, - events: &mut Vec, - iocp_events: &[CompletionStatus], - ) -> usize { - let mut n = 0; - let mut update_queue = self.update_queue.lock().unwrap(); - for iocp_event in iocp_events.iter() { - if iocp_event.overlapped().is_null() { - // `Waker` event, we'll add a readable event to match the other platforms. - events.push(Event { - flags: afd::POLL_RECEIVE, - data: iocp_event.token() as u64, - }); - n += 1; - continue; - } - - let sock_state = from_overlapped(iocp_event.overlapped()); - let mut sock_guard = sock_state.lock().unwrap(); - match sock_guard.feed_event() { - Some(e) => { - events.push(e); - n += 1; - } - None => {} - } - - if !sock_guard.is_pending_deletion() { - update_queue.push_back(sock_state.clone()); - } - } - self.afd_group.release_unused_afd(); - n - } } cfg_net! { @@ -686,35 +927,23 @@ cfg_net! { impl Drop for SelectorInner { fn drop(&mut self) { loop { - let events_num: usize; let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024]; let result = self .cp - .get_many(&mut statuses, Some(std::time::Duration::from_millis(0))); + .handle_pending_events(&mut statuses, None, Some(Duration::from_millis(0))); match result { - Ok(iocp_events) => { - events_num = iocp_events.iter().len(); - for iocp_event in iocp_events.iter() { - if !iocp_event.overlapped().is_null() { - // drain sock state to release memory of Arc reference - let _sock_state = from_overlapped(iocp_event.overlapped()); - } - } - } + Ok(events_num) if events_num == 0 => { + break + }, + + Ok(_) => (), Err(_) => { break; } } - - if events_num == 0 { - // continue looping until all completion statuses have been drained - break; - } } - - self.afd_group.release_unused_afd(); } } diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs index 1b21cf148..0d97bc839 100644 --- a/src/sys/windows/waker.rs +++ b/src/sys/windows/waker.rs @@ -1,28 +1,24 @@ use crate::sys::windows::selector::WAKER_OVERLAPPED; use crate::sys::windows::Selector; +use crate::sys::windows::iocp_handler::IocpWaker; use crate::Token; -use miow::iocp::{CompletionPort, CompletionStatus}; use std::io; -use std::sync::Arc; #[derive(Debug)] pub struct Waker { - token: Token, - port: Arc, + cp_waker: IocpWaker, } impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { - Ok(Waker { - token, - port: selector.clone_port(), - }) + let cp_registry = selector.clone_port(); + let cp_waker = cp_registry.register_waker(token); + Ok(Waker { cp_waker }) } pub fn wake(&self) -> io::Result<()> { // Keep NULL as Overlapped value to notify waking. - let status = CompletionStatus::new(0, self.token.0, WAKER_OVERLAPPED); - self.port.post(status) + self.cp_waker.post(0, WAKER_OVERLAPPED) } } diff --git a/tests/win_rawhandlesrc.rs b/tests/win_rawhandlesrc.rs new file mode 100644 index 000000000..65ed99e68 --- /dev/null +++ b/tests/win_rawhandlesrc.rs @@ -0,0 +1,123 @@ +#![cfg(all(windows, feature = "os-util"))] + +use mio::windows; +use mio::windows::Readiness; +use mio::{event::Source, Events, Interest, Poll, Registry, Token}; +use std::default::Default; +use std::ffi::OsStr; +use std::fs::File; +use std::io::{self, Seek, SeekFrom, Write}; +use std::iter; +use std::os::windows::ffi::OsStrExt; +use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; +use std::ptr; +use winapi::shared::winerror::ERROR_IO_PENDING; +use winapi::um::minwinbase::OVERLAPPED_ENTRY; +use winapi::um::{ + errhandlingapi::GetLastError, + fileapi::{CreateFileW, WriteFile, CREATE_ALWAYS}, + handleapi::INVALID_HANDLE_VALUE, + minwinbase::OVERLAPPED, + winbase::FILE_FLAG_OVERLAPPED, + winnt::GENERIC_WRITE, +}; + +struct AsyncFile { + file: File, + binding: Option, +} + +impl Source for AsyncFile { + fn register( + &mut self, + registry: &Registry, + token: Token, + _interests: Interest, + ) -> io::Result<()> { + let binding = windows::Binding::new(registry, token); + self.binding = Some(binding); + self.binding.as_ref().unwrap().register_handle(&self.file) + } + + fn reregister( + &mut self, + _registry: &Registry, + _token: Token, + _interests: Interest, + ) -> io::Result<()> { + unimplemented!() + } + + fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { + unimplemented!() + } +} + +impl AsRawHandle for AsyncFile { + fn as_raw_handle(&self) -> RawHandle { + self.file.as_raw_handle() + } +} + +#[test] +fn register_custom_iocp_handler() { + let mut poll = Poll::new().unwrap(); + + let path: &OsStr = "C:\\temp\\test.txt".as_ref(); + let path_u16: Vec = path.encode_wide().chain(iter::once(0)).collect(); + let mut file = unsafe { + let handle = CreateFileW( + path_u16.as_ptr(), + GENERIC_WRITE, + 0, + ptr::null_mut(), + CREATE_ALWAYS, + FILE_FLAG_OVERLAPPED, + ptr::null_mut(), + ); + if handle == INVALID_HANDLE_VALUE { + panic!("Unable to open file!"); + } + AsyncFile { + file: File::from_raw_handle(handle), + binding: None, + } + }; + + poll.registry() + .register(&mut file, Token(0), Interest::WRITABLE) + .unwrap(); + + let mut buffer: Vec<_> = iter::successors(Some(15u8), |p| Some(p.wrapping_add(2))) + .take(1024) + .collect(); + loop { + unsafe { + let mut overlapped = + windows::Overlapped::new(move |_: &OVERLAPPED_ENTRY| Some(Readiness::WRITE)); + if WriteFile( + file.as_raw_handle(), + buffer.as_mut_ptr() as *mut _, + buffer.len() as u32, + ptr::null_mut(), + &mut overlapped as *mut windows::Overlapped as *mut OVERLAPPED, + ) == 0 + { + match GetLastError() { + ERROR_IO_PENDING => { + let mut events = Events::with_capacity(16); + poll.poll(&mut events, None).unwrap(); + let mut event_iter = events.iter(); + let event = event_iter.next().unwrap(); + assert_eq!(0, event.token().0); + assert!(event.is_writable()); + assert!(event_iter.next().is_none()); + break; + } + + e => panic!("Error during file write operation: {}", e), + } + } + } + } +}