Skip to content

Commit

Permalink
Rollup merge of rust-lang#94407 - BartMassey-upstream:with-backlog, r…
Browse files Browse the repository at this point in the history
…=joshtriplett

Add `with_backlog` functionality to `TcpListener` and `UnixListener`

Adds

* `std::net::TcpListener::DEFAULT_BACKLOG`
* `std::net::TcpListener::bind_with_backlog`
* `std::os::unix::net::UnixListener::DEFAULT_BACKLOG`
* `std::os::unix::net::UnixListener::bind_with_backlog`
* `std::os::unix::net::UnixListener::bind_addr_with_backlog`

Closes rust-lang#94406
  • Loading branch information
Dylan-DPC authored Feb 27, 2022
2 parents 8622ee2 + b98c009 commit c7844d7
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 16 deletions.
42 changes: 40 additions & 2 deletions library/std/src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,48 @@ impl fmt::Debug for TcpStream {
}

impl TcpListener {
/// Default listen backlog.
const DEFAULT_BACKLOG: usize = 128;

/// Creates a new `TcpListener` which will be bound to the specified
/// address.
///
/// The returned listener is ready for accepting connections.
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted by
/// [`TcpListener::accept`]. The backlog argument overrides the default
/// value of 128; that default is reasonable for most use cases.
///
/// This function is otherwise [`TcpListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// Creates a TCP listener bound to `127.0.0.1:80` with a backlog of 1000:
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::net::TcpListener;
///
/// let listener = TcpListener::bind_with_backlog("127.0.0.1:80", 1000).unwrap();
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<A: ToSocketAddrs>(addr: A, backlog: usize) -> io::Result<TcpListener> {
super::each_addr(addr, move |a| net_imp::TcpListener::bind_with_backlog(a, backlog))
.map(TcpListener)
}

/// Creates a new `TcpListener` which will be bound to the specified
/// address. The returned listener is ready for accepting
/// connections.
///
/// The listener will have a backlog of 128. See the documentation for
/// [`TcpListener::bind_with_backlog`] for further information.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the
Expand Down Expand Up @@ -748,7 +786,7 @@ impl TcpListener {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
Self::bind_with_backlog(addr, TcpListener::DEFAULT_BACKLOG)
}

/// Returns the local socket address of this listener.
Expand Down
32 changes: 27 additions & 5 deletions library/std/src/net/tcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn connect_error() {
#[test]
fn listen_localhost() {
let socket_addr = next_test_ip4();
let listener = t!(TcpListener::bind(&socket_addr));
let listener = t!(TcpListener::bind_with_backlog(&socket_addr, 1));

let _t = thread::spawn(move || {
let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
Expand All @@ -64,7 +64,7 @@ fn listen_localhost() {
#[test]
fn connect_loopback() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let _t = thread::spawn(move || {
let host = match addr {
Expand All @@ -85,7 +85,7 @@ fn connect_loopback() {
#[test]
fn smoke_test() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let (tx, rx) = channel();
let _t = thread::spawn(move || {
Expand Down Expand Up @@ -172,11 +172,33 @@ fn multiple_connect_serial() {
})
}

#[test]
fn multiple_connect_serial_with_backlog() {
each_ip(&mut |addr| {
let max = 10;
let acceptor = t!(TcpListener::bind_with_backlog(&addr, max));

let _t = thread::spawn(move || {
for _ in 0..max {
let mut stream = t!(TcpStream::connect(&addr));
t!(stream.write(&[99]));
}
});

for stream in acceptor.incoming().take(max) {
let mut stream = t!(stream);
let mut buf = [0];
t!(stream.read(&mut buf));
assert_eq!(buf[0], 99);
}
})
}

#[test]
fn multiple_connect_interleaved_greedy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
let acceptor = acceptor;
Expand Down Expand Up @@ -213,7 +235,7 @@ fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_interleaved_lazy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
for stream in acceptor.incoming().take(MAX) {
Expand Down
98 changes: 96 additions & 2 deletions library/std/src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::sys::cvt;
use crate::sys::net::Socket;
use crate::sys_common::{AsInner, FromInner, IntoInner};
use crate::{fmt, io, mem};
use core::convert::TryInto;

/// A structure representing a Unix domain socket server.
///
Expand Down Expand Up @@ -53,8 +54,14 @@ impl fmt::Debug for UnixListener {
}

impl UnixListener {
/// Default backlog for `bind` and `bind_addr`.
const DEFAULT_BACKLOG: usize = 128;

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The listener will have a backlog of 128. See the documentation for
/// [`UnixListener::bind_with_backlog`] for further information.
///
/// # Examples
///
/// ```no_run
Expand All @@ -70,19 +77,61 @@ impl UnixListener {
/// ```
#[stable(feature = "unix_socket", since = "1.10.0")]
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
UnixListener::bind_with_backlog(path, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted
/// by [`UnixListener::accept`]. The backlog argument overrides the
/// default backlog of 128; that default is reasonable for most use
/// cases.
///
/// This function is otherwise [`UnixListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::UnixListener;
///
/// let listener = match UnixListener::bind_with_backlog("/path/to/the/socket", 1000) {
/// Ok(sock) => sock,
/// Err(e) => {
/// println!("Couldn't connect: {:?}", e);
/// return
/// }
/// };
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: usize) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
let (addr, len) = sockaddr_un(path.as_ref())?;

cvt(libc::bind(inner.as_inner().as_raw_fd(), &addr as *const _ as *const _, len as _))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), backlog))?;

Ok(UnixListener(inner))
}
}

/// Creates a new `UnixListener` bound to the specified [`socket address`].
///
/// The listener will have a backlog of 128. See the documentation for
/// [`UnixListener::bind_addr_with_backlog`] for further information.
///
/// [`socket address`]: crate::os::unix::net::SocketAddr
///
/// # Examples
Expand All @@ -107,14 +156,59 @@ impl UnixListener {
/// ```
#[unstable(feature = "unix_socket_abstract", issue = "85410")]
pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result<UnixListener> {
UnixListener::bind_addr_with_backlog(socket_addr, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified [`socket address`].
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted
/// by [`UnixListener::accept`]. The backlog argument overrides the
/// default of 128; that default is reasonable for most use cases.
///
/// This function is otherwise [`UnixListener::bind_addr`]: see that
/// documentation for full details of operation.
///
/// [`socket address`]: crate::os::unix::net::SocketAddr
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_abstract)]
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::{UnixListener};
///
/// fn main() -> std::io::Result<()> {
/// let listener1 = UnixListener::bind("path/to/socket")?;
/// let addr = listener1.local_addr()?;
///
/// let listener2 = match UnixListener::bind_addr_with_backlog(&addr, 1000) {
/// Ok(sock) => sock,
/// Err(err) => {
/// println!("Couldn't bind: {:?}", err);
/// return Err(err);
/// }
/// };
/// Ok(())
/// }
/// ```
//#[unstable(feature = "unix_socket_abstract", issue = "85410")]
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_addr_with_backlog(
socket_addr: &SocketAddr,
backlog: usize,
) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
cvt(libc::bind(
inner.as_raw_fd(),
&socket_addr.addr as *const _ as *const _,
socket_addr.len as _,
))?;
cvt(libc::listen(inner.as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_raw_fd(), backlog))?;
Ok(UnixListener(inner))
}
}
Expand Down
10 changes: 5 additions & 5 deletions library/std/src/os/unix/net/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn basic() {
let msg1 = b"hello";
let msg2 = b"world!";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
let mut buf = [0; 5];
Expand Down Expand Up @@ -111,7 +111,7 @@ fn try_clone() {
let msg1 = b"hello";
let msg2 = b"world";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(msg1));
Expand All @@ -135,7 +135,7 @@ fn iter() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 2));
let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
let mut stream = or_panic!(stream);
Expand Down Expand Up @@ -423,7 +423,7 @@ fn test_abstract_stream_connect() {
let msg2 = b"world";

let socket_addr = or_panic!(SocketAddr::from_abstract_namespace(b"namespace"));
let listener = or_panic!(UnixListener::bind_addr(&socket_addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&socket_addr, 1));

let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
Expand Down Expand Up @@ -451,7 +451,7 @@ fn test_abstract_stream_connect() {
#[test]
fn test_abstract_stream_iter() {
let addr = or_panic!(SocketAddr::from_abstract_namespace(b"hidden"));
let listener = or_panic!(UnixListener::bind_addr(&addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&addr, 2));

let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
Expand Down
12 changes: 10 additions & 2 deletions library/std/src/sys_common/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,17 @@ pub struct TcpListener {
}

impl TcpListener {
pub fn bind(addr: io::Result<&SocketAddr>) -> io::Result<TcpListener> {
pub fn bind_with_backlog(
addr: io::Result<&SocketAddr>,
backlog: usize,
) -> io::Result<TcpListener> {
let addr = addr?;

// Type-convert the backlog
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;

init();

let sock = Socket::new(addr, c::SOCK_STREAM)?;
Expand All @@ -385,7 +393,7 @@ impl TcpListener {
cvt(unsafe { c::bind(sock.as_raw(), addrp, len as _) })?;

// Start listening
cvt(unsafe { c::listen(sock.as_raw(), 128) })?;
cvt(unsafe { c::listen(sock.as_raw(), backlog) })?;
Ok(TcpListener { inner: sock })
}

Expand Down

0 comments on commit c7844d7

Please sign in to comment.