Skip to content

Commit

Permalink
Ensure we don't hang in Socket#accept due to spurious readiness.
Browse files Browse the repository at this point in the history
It's possible, especially on dual stack, to have issues where a server
may become readable, but by the time accept is called, the connection is
gone. This can cause a deadlock between the semaphore and the accept call,
which can hang indefinitely.
  • Loading branch information
ioquatix committed Feb 25, 2025
1 parent a6b9bd9 commit 0431818
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
6 changes: 2 additions & 4 deletions examples/limited/config.ru
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ run do |env|
# There is no guarantee that there is a connection or that the connection has a token:
token = limited_semaphore_token(request)

Console.info(self, "Sleeping 10 seconds", token: token)

if env["PATH_INFO"] == "/fast"
if token
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
Expand All @@ -30,10 +28,10 @@ run do |env|
end

# Simulated "fast / non-blocking" request:
sleep(1)
sleep(0.01)
else
# Simulated "slow / blocking" request:
sleep(10)
sleep(0.1)
end

[200, {}, ["Hello World"]]
Expand Down
63 changes: 53 additions & 10 deletions examples/limited/limited.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module Limited
# Thread local storage for the semaphore (per-worker):
Thread.attr_accessor :limited_semaphore
Expand All @@ -12,21 +14,32 @@ def self.instance
# Create a new semaphore with the given limit.
def initialize(limit = 1)
@queue = Thread::Queue.new

Console.debug(self, "Initializing queue...", limit: limit)
limit.times{release}
end

# Release the semaphore.
def release
Console.debug(self, "Releasing semaphore...")
@queue.push(true)
end

# Acquire the semaphore. May block until the semaphore is available.
def acquire
Console.debug(self, "Acquiring semaphore...")
@queue.pop

Console.debug(self, "Acquired semaphore...")

return Token.new(self)
end

def try_acquire
if @queue.pop(timeout: 0)
return Token.new(self)
end
end

# A token that can be used to release the semaphore once and once only.
class Token
def initialize(semaphore)
Expand All @@ -44,19 +57,46 @@ def release

# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
class Wrapper < IO::Endpoint::Wrapper
def socket_accept(server)
# Wait for an inbound connection to be ready to be accepted.
def wait_for_inbound_connection(server)
semaphore = Semaphore.instance

# Wait until there is a connection ready to be accepted:
server.wait_readable
while true
server.wait_readable

# Acquire the semaphore:
Console.info(self, "Acquiring semaphore...")
token = semaphore.acquire
# Acquire the semaphore:
if token = semaphore.try_acquire
return token
end
end
end

# Once the server is readable and we've acquired the token, we can accept the connection (if it's still there).
def socket_accept_nonblock(server, token)
result = server.accept_nonblock

# Accept the connection:
socket, address = super
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
success = true
return result
rescue IO::WaitReadable
return nil
ensure
token.release unless success
end

# Accept a connection from the server, limited by the per-worker (thread or process) semaphore.
def socket_accept(server)
while true
if token = wait_for_inbound_connection(server)
# In principle, there is a connection ready to be accepted:
socket, address = socket_accept_nonblock(server, token)

if socket
Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket)
break
end
end
end

# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
socket.define_singleton_method :token do
Expand All @@ -67,11 +107,14 @@ def socket_accept(server)
socket.define_singleton_method :close do
super()
ensure
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket)
token.release
end

success = true
return socket, address
ensure
token&.release unless success
end
end
end

0 comments on commit 0431818

Please sign in to comment.