Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add with_backlog functionality to TcpListener and UnixListener #94407

Closed
wants to merge 7 commits into from
47 changes: 45 additions & 2 deletions library/std/src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,53 @@ impl fmt::Debug for TcpStream {
}

impl TcpListener {
/// Default "backlog" for [`TcpListener::bind`]. See
/// [`TcpListener::bind_with_backlog`] for an explanation of backlog
/// values.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub const DEFAULT_BACKLOG: usize = 128;

Copy link
Member

@joshtriplett joshtriplett Feb 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Default "backlog" for [`TcpListener::bind`]. See
/// [`TcpListener::bind_with_backlog`] for an explanation of backlog
/// values.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub const DEFAULT_BACKLOG: usize = 128;

I think, rather than exposing this backlog as a constant, we can just document it. If people want to override it they can call bind_with_backlog; if they want to use the default they can call bind; if they want to have a default value for some configurable backlog parameter, they can determine an appropriate default themselves with our documentation for inspiration.

That said, this should absolutely be a const in this module, I just don't think it should be a pub const.

/// Creates a new `TcpListener` which will be bound to the specified
/// address.
///
/// The returned listener is ready for accepting connections.
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted by
/// [`TcpListener::accept`]. The backlog argument overrides the default
/// specified by [`TcpListener::DEFAULT_BACKLOG`]; that default is
/// reasonable for most use cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// [`TcpListener::accept`]. The backlog argument overrides the default
/// specified by [`TcpListener::DEFAULT_BACKLOG`]; that default is
/// reasonable for most use cases.
/// [`TcpListener::accept`]. The backlog argument overrides the default
/// backlog used by [`TcpListener::bind`]; that default works reasonably
/// for most use cases.

///
/// This function is otherwise [`TcpListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// Creates a TCP listener bound to `127.0.0.1:80` with a backlog of 1000:
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::net::TcpListener;
///
/// let listener = TcpListener::bind_with_backlog("127.0.0.1:80", 1000).unwrap();
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<A: ToSocketAddrs>(addr: A, backlog: usize) -> io::Result<TcpListener> {
super::each_addr(addr, move |a| net_imp::TcpListener::bind_with_backlog(a, backlog))
.map(TcpListener)
}

/// Creates a new `TcpListener` which will be bound to the specified
/// address. The returned listener is ready for accepting
/// connections.
///
/// The listener will have a backlog given by
/// [`TcpListener::DEFAULT_BACKLOG`]. See the documentation for
/// [`TcpListener::bind_with_backlog`] for further information.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The listener will have a backlog given by
/// [`TcpListener::DEFAULT_BACKLOG`]. See the documentation for
/// [`TcpListener::bind_with_backlog`] for further information.
/// The listener will have a backlog of 128. See the documentation
/// of [`TcpListener::bind_with_backlog`] for further information.

///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the
Expand Down Expand Up @@ -748,7 +791,7 @@ impl TcpListener {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
Self::bind_with_backlog(addr, TcpListener::DEFAULT_BACKLOG)
}

/// Returns the local socket address of this listener.
Expand Down
32 changes: 27 additions & 5 deletions library/std/src/net/tcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn connect_error() {
#[test]
fn listen_localhost() {
let socket_addr = next_test_ip4();
let listener = t!(TcpListener::bind(&socket_addr));
let listener = t!(TcpListener::bind_with_backlog(&socket_addr, 1));

let _t = thread::spawn(move || {
let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
Expand All @@ -64,7 +64,7 @@ fn listen_localhost() {
#[test]
fn connect_loopback() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let _t = thread::spawn(move || {
let host = match addr {
Expand All @@ -85,7 +85,7 @@ fn connect_loopback() {
#[test]
fn smoke_test() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let (tx, rx) = channel();
let _t = thread::spawn(move || {
Expand Down Expand Up @@ -172,11 +172,33 @@ fn multiple_connect_serial() {
})
}

#[test]
fn multiple_connect_serial_with_backlog() {
each_ip(&mut |addr| {
let max = 10;
let acceptor = t!(TcpListener::bind_with_backlog(&addr, max));

let _t = thread::spawn(move || {
for _ in 0..max {
let mut stream = t!(TcpStream::connect(&addr));
t!(stream.write(&[99]));
}
});

for stream in acceptor.incoming().take(max) {
let mut stream = t!(stream);
let mut buf = [0];
t!(stream.read(&mut buf));
assert_eq!(buf[0], 99);
}
})
}

#[test]
fn multiple_connect_interleaved_greedy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
let acceptor = acceptor;
Expand Down Expand Up @@ -213,7 +235,7 @@ fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_interleaved_lazy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
for stream in acceptor.incoming().take(MAX) {
Expand Down
100 changes: 98 additions & 2 deletions library/std/src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::sys::cvt;
use crate::sys::net::Socket;
use crate::sys_common::{AsInner, FromInner, IntoInner};
use crate::{fmt, io, mem};
use core::convert::TryInto;

/// A structure representing a Unix domain socket server.
///
Expand Down Expand Up @@ -53,8 +54,18 @@ impl fmt::Debug for UnixListener {
}

impl UnixListener {
/// Default "backlog" for [`UnixListener::bind`] and
/// [`UnixListener::bind_addr`]. See [`UnixListener::bind_with_backlog`]
/// for an explanation of backlog values.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub const DEFAULT_BACKLOG: usize = 128;

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The listener will have a backlog given by
/// [`UnixListener::DEFAULT_BACKLOG`]. See the documentation for
/// [`UnixListener::bind_with_backlog`] for further information.
///
/// # Examples
///
/// ```no_run
Expand All @@ -70,12 +81,51 @@ impl UnixListener {
/// ```
#[stable(feature = "unix_socket", since = "1.10.0")]
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
UnixListener::bind_with_backlog(path, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted by
/// [`UnixListener::accept`]. The backlog argument overrides the default
/// specified by [`UnixListener::DEFAULT_BACKLOG`]; that default is
/// reasonable for most use cases.
///
/// This function is otherwise [`UnixListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::UnixListener;
///
/// let listener = match UnixListener::bind_with_backlog("/path/to/the/socket", 1000) {
/// Ok(sock) => sock,
/// Err(e) => {
/// println!("Couldn't connect: {:?}", e);
/// return
/// }
/// };
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: usize) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
let (addr, len) = sockaddr_un(path.as_ref())?;

cvt(libc::bind(inner.as_inner().as_raw_fd(), &addr as *const _ as *const _, len as _))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), backlog))?;

Ok(UnixListener(inner))
}
Expand Down Expand Up @@ -107,14 +157,60 @@ impl UnixListener {
/// ```
#[unstable(feature = "unix_socket_abstract", issue = "85410")]
pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result<UnixListener> {
UnixListener::bind_addr_with_backlog(socket_addr, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified [`socket address`].
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted by
/// [`UnixListener::accept`]. The backlog argument overrides the default
/// specified by [`UnixListener::DEFAULT_BACKLOG`]; that default is
/// reasonable for most use cases.
///
/// This function is otherwise [`UnixListener::bind_addr`]: see that
/// documentation for full details of operation.
///
/// [`socket address`]: crate::os::unix::net::SocketAddr
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_abstract)]
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::{UnixListener};
///
/// fn main() -> std::io::Result<()> {
/// let listener1 = UnixListener::bind("path/to/socket")?;
/// let addr = listener1.local_addr()?;
///
/// let listener2 = match UnixListener::bind_addr_with_backlog(&addr, 1000) {
/// Ok(sock) => sock,
/// Err(err) => {
/// println!("Couldn't bind: {:?}", err);
/// return Err(err);
/// }
/// };
/// Ok(())
/// }
/// ```
//#[unstable(feature = "unix_socket_abstract", issue = "85410")]
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_addr_with_backlog(
socket_addr: &SocketAddr,
backlog: usize,
) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
cvt(libc::bind(
inner.as_raw_fd(),
&socket_addr.addr as *const _ as *const _,
socket_addr.len as _,
))?;
cvt(libc::listen(inner.as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_raw_fd(), backlog))?;
Ok(UnixListener(inner))
}
}
Expand Down
10 changes: 5 additions & 5 deletions library/std/src/os/unix/net/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn basic() {
let msg1 = b"hello";
let msg2 = b"world!";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
let mut buf = [0; 5];
Expand Down Expand Up @@ -111,7 +111,7 @@ fn try_clone() {
let msg1 = b"hello";
let msg2 = b"world";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(msg1));
Expand All @@ -135,7 +135,7 @@ fn iter() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 2));
let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
let mut stream = or_panic!(stream);
Expand Down Expand Up @@ -423,7 +423,7 @@ fn test_abstract_stream_connect() {
let msg2 = b"world";

let socket_addr = or_panic!(SocketAddr::from_abstract_namespace(b"namespace"));
let listener = or_panic!(UnixListener::bind_addr(&socket_addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&socket_addr, 1));

let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
Expand Down Expand Up @@ -451,7 +451,7 @@ fn test_abstract_stream_connect() {
#[test]
fn test_abstract_stream_iter() {
let addr = or_panic!(SocketAddr::from_abstract_namespace(b"hidden"));
let listener = or_panic!(UnixListener::bind_addr(&addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&addr, 2));

let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
Expand Down
12 changes: 10 additions & 2 deletions library/std/src/sys_common/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,17 @@ pub struct TcpListener {
}

impl TcpListener {
pub fn bind(addr: io::Result<&SocketAddr>) -> io::Result<TcpListener> {
pub fn bind_with_backlog(
addr: io::Result<&SocketAddr>,
backlog: usize,
) -> io::Result<TcpListener> {
let addr = addr?;

// Type-convert the backlog
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;

init();

let sock = Socket::new(addr, c::SOCK_STREAM)?;
Expand All @@ -385,7 +393,7 @@ impl TcpListener {
cvt(unsafe { c::bind(sock.as_raw(), addrp, len as _) })?;

// Start listening
cvt(unsafe { c::listen(sock.as_raw(), 128) })?;
cvt(unsafe { c::listen(sock.as_raw(), backlog) })?;
Ok(TcpListener { inner: sock })
}

Expand Down