From 933d36fff0c9e5a6a838764f9c1c8b63b5b3fc0d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 25 Feb 2025 15:50:29 +1300 Subject: [PATCH] Ensure we don't hang in `Socket#accept` due to spurious readiness. It's possible, especially on dual stack, to have issues where a server may become readable, but by the time accept is called, the connection is gone. This can cause a deadlock between the semaphore and the accept call, which can hang indefinitely. --- examples/limited/config.ru | 6 ++-- examples/limited/limited.rb | 62 +++++++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/examples/limited/config.ru b/examples/limited/config.ru index 6918253..32d6cea 100755 --- a/examples/limited/config.ru +++ b/examples/limited/config.ru @@ -20,8 +20,6 @@ run do |env| # There is no guarantee that there is a connection or that the connection has a token: token = limited_semaphore_token(request) - Console.info(self, "Sleeping 10 seconds", token: token) - if env["PATH_INFO"] == "/fast" if token # Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already. @@ -30,10 +28,10 @@ run do |env| end # Simulated "fast / non-blocking" request: - sleep(1) + sleep(0.01) else # Simulated "slow / blocking" request: - sleep(10) + sleep(0.1) end [200, {}, ["Hello World"]] diff --git a/examples/limited/limited.rb b/examples/limited/limited.rb index 9a9dc5a..8bb6aef 100644 --- a/examples/limited/limited.rb +++ b/examples/limited/limited.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Limited # Thread local storage for the semaphore (per-worker): Thread.attr_accessor :limited_semaphore @@ -12,21 +14,33 @@ def self.instance # Create a new semaphore with the given limit. def initialize(limit = 1) @queue = Thread::Queue.new + + Console.info(self, "Initializing queue...", limit: limit) limit.times{release} end # Release the semaphore. def release + Console.info(self, "Releasing semaphore...") @queue.push(true) end # Acquire the semaphore. May block until the semaphore is available. def acquire + Console.info(self, "Acquiring semaphore...") @queue.pop return Token.new(self) end + def try_acquire + if @queue.empty? + return nil + else + return acquire + end + end + # A token that can be used to release the semaphore once and once only. class Token def initialize(semaphore) @@ -44,19 +58,44 @@ def release # A wrapper implementation for the endpoint that limits the number of connections that can be accepted. class Wrapper < IO::Endpoint::Wrapper - def socket_accept(server) + # Wait for an inbound connection to be ready to be accepted. + def wait_for_inbound_connection(server) semaphore = Semaphore.instance # Wait until there is a connection ready to be accepted: - server.wait_readable - - # Acquire the semaphore: - Console.info(self, "Acquiring semaphore...") - token = semaphore.acquire + while true + server.wait_readable - # Accept the connection: - socket, address = super - Console.info(self, "Accepted connection from #{address.inspect}", socket: socket) + # Acquire the semaphore: + Console.debug(self, "Acquiring semaphore...") + if token = semaphore.try_acquire + Console.debug(self, "Acquired semaphore...") + return token + end + end + end + + # Once the server is readable and we've acquired the token, we can accept the connection (if it's still there). + def socket_accept_nonblock(server, token) + return server.accept_nonblock + rescue IO::WaitReadable, Errno::EINTR + token.release + return nil + end + + # Accept a connection from the server, limited by the per-worker (thread or process) semaphore. + def socket_accept(server) + while true + if token = wait_for_inbound_connection(server) + # In principle, there is a connection ready to be accepted: + socket, address = socket_accept_nonblock(server, token) + + if socket + Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket) + break + end + end + end # Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server: socket.define_singleton_method :token do @@ -67,11 +106,14 @@ def socket_accept(server) socket.define_singleton_method :close do super() ensure - Console.info(self, "Closing connection from #{address.inspect}", socket: socket) + Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket) token.release end + success = true return socket, address + ensure + token&.release unless success end end end