Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: avoid ptr->ref->ptr roundtrip in RegistrationSet #6929

Merged
merged 7 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions tokio/src/runtime/io/registration_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,20 @@ impl RegistrationSet {

for io in pending {
// safety: the registration is part of our list
unsafe { self.remove(synced, io.as_ref()) }
unsafe { self.remove(synced, &io) }
}

self.num_pending_release.store(0, Release);
}

// This function is marked as unsafe, because the caller must make sure that
// `io` is part of the registration set.
pub(super) unsafe fn remove(&self, synced: &mut Synced, io: &ScheduledIo) {
super::EXPOSE_IO.unexpose_provenance(io);
let _ = synced.registrations.remove(io.into());
pub(super) unsafe fn remove(&self, synced: &mut Synced, io: &Arc<ScheduledIo>) {
// SAFETY: Pointers into an Arc are never null.
let io = unsafe { NonNull::new_unchecked(Arc::as_ptr(io).cast_mut()) };

super::EXPOSE_IO.unexpose_provenance(io.as_ptr());
let _ = synced.registrations.remove(io);
}
}

Expand Down
26 changes: 21 additions & 5 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![warn(rust_2018_idioms)]
#![cfg(all(unix, feature = "full", not(miri)))]
#![cfg(all(unix, feature = "full"))]

use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::{
Expand Down Expand Up @@ -148,6 +148,7 @@ fn drain(mut fd: &FileDescriptor, mut amt: usize) {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn initially_writable() {
let (a, b) = socketpair();

Expand All @@ -166,6 +167,7 @@ async fn initially_writable() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn reset_readable() {
let (a, mut b) = socketpair();

Expand Down Expand Up @@ -210,6 +212,7 @@ async fn reset_readable() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn reset_writable() {
let (a, b) = socketpair();

Expand Down Expand Up @@ -247,6 +250,7 @@ impl<T: AsRawFd> AsRawFd for ArcFd<T> {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn drop_closes() {
let (a, mut b) = socketpair();

Expand Down Expand Up @@ -287,6 +291,7 @@ async fn drop_closes() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn reregister() {
let (a, _b) = socketpair();

Expand All @@ -296,6 +301,7 @@ async fn reregister() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn try_io() {
let (a, mut b) = socketpair();

Expand Down Expand Up @@ -331,6 +337,7 @@ async fn try_io() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn multiple_waiters() {
let (a, mut b) = socketpair();
let afd_a = Arc::new(AsyncFd::new(a).unwrap());
Expand Down Expand Up @@ -379,6 +386,7 @@ async fn multiple_waiters() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn poll_fns() {
let (a, b) = socketpair();
let afd_a = Arc::new(AsyncFd::new(a).unwrap());
Expand Down Expand Up @@ -472,6 +480,7 @@ fn rt() -> tokio::runtime::Runtime {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_currently_pending() {
let rt = rt();

Expand All @@ -493,6 +502,7 @@ fn driver_shutdown_wakes_currently_pending() {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_future_pending() {
let rt = rt();

Expand All @@ -508,6 +518,7 @@ fn driver_shutdown_wakes_future_pending() {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_pending_race() {
// TODO: make this a loom test
for _ in 0..100 {
Expand Down Expand Up @@ -538,6 +549,7 @@ async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdRe
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_currently_pending_polls() {
let rt = rt();

Expand All @@ -560,6 +572,7 @@ fn driver_shutdown_wakes_currently_pending_polls() {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_poll() {
let rt = rt();

Expand All @@ -576,6 +589,7 @@ fn driver_shutdown_wakes_poll() {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_then_clear_readiness() {
let rt = rt();

Expand All @@ -593,6 +607,7 @@ fn driver_shutdown_then_clear_readiness() {
}

#[test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
fn driver_shutdown_wakes_poll_race() {
// TODO: make this a loom test
for _ in 0..100 {
Expand All @@ -615,6 +630,7 @@ fn driver_shutdown_wakes_poll_race() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No socket in miri.
#[cfg(any(target_os = "linux", target_os = "android"))]
async fn priority_event_on_oob_data() {
use std::net::SocketAddr;
Expand Down Expand Up @@ -655,7 +671,7 @@ fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth implementing F_GETFL in Miri? That doesn't sound like it would be particularly hard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think it's worth it. We just need to make miri remember the flag whenever it is used, then return it when F_GETFL is called.

But looking at the codepath, to properly support the test here, we might also need F_SETFL?

Copy link

@RalfJung RalfJung Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O_NONBLOCK is the only flag we currently support that could be exposed via that.

And yeah, adding both GET and SET makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the interaction between F_SETFL and anything that involves this.block_thread because F_SETFL can change the status of O_NONBLOCK flag. But I can't verify if the concern is valid until I test it out, we can always get it done first and see if the problem arise.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we to decide what happens when a previously blocking FD is marked as non-blocking and there are waiting threads. Do they wake up immediately, or at some later point, or never?

@tiif can you file a Miri issue, and note this question there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async fn clear_ready_matching_clears_ready() {
use tokio::io::{Interest, Ready};

Expand All @@ -679,7 +695,7 @@ async fn clear_ready_matching_clears_ready() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
async fn clear_ready_matching_clears_ready_mut() {
use tokio::io::{Interest, Ready};

Expand All @@ -703,8 +719,8 @@ async fn clear_ready_matching_clears_ready_mut() {
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No socket in miri.
#[cfg(target_os = "linux")]
#[cfg_attr(miri, ignore)]
async fn await_error_readiness_timestamping() {
use std::net::{Ipv4Addr, SocketAddr};

Expand Down Expand Up @@ -760,8 +776,8 @@ fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::R
}

#[tokio::test]
#[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
#[cfg(target_os = "linux")]
#[cfg_attr(miri, ignore)]
async fn await_error_readiness_invalid_address() {
use std::net::{Ipv4Addr, SocketAddr};
use tokio::io::{Interest, Ready};
Expand Down
Loading