Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit inbound connections. #271

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/limited/config.ru
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env falcon --verbose serve -c
# frozen_string_literal: true

def limited_semaphore_token(request)
if request.respond_to?(:connection)
io = request.connection.stream.io

if io.respond_to?(:token)
return io.token
end
end

return nil
end

run do |env|
# This is not part of the rack specification, but is available when running under Falcon.
request = env["protocol.http.request"]

# There is no guarantee that there is a connection or that the connection has a token:
token = limited_semaphore_token(request)

Console.info(self, "Sleeping 10 seconds", token: token)

if env["PATH_INFO"] == "/fast"
if token
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
token.release
request.connection.persistent = false
end

# Simulated "fast / non-blocking" request:
sleep(1)
else
# Simulated "slow / blocking" request:
sleep(10)
end

[200, {}, ["Hello World"]]
end
28 changes: 28 additions & 0 deletions examples/limited/falcon.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env falcon-host
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2024, by Samuel Williams.

require "falcon/environment/rack"
require_relative "limited"

service "limited.localhost" do
include Falcon::Environment::Rack

scheme "http"
protocol {Async::HTTP::Protocol::HTTP}

# Extend the endpoint options to include the (connection) limited wrapper.
endpoint_options do
super().merge(wrapper: Limited::Wrapper.new)
end

count 2

url "http://localhost:8080"

endpoint do
::Async::HTTP::Endpoint.parse(url).with(**endpoint_options)
end
end
77 changes: 77 additions & 0 deletions examples/limited/limited.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
module Limited
# Thread local storage for the semaphore (per-worker):
Thread.attr_accessor :limited_semaphore

# We use a thread-safe semaphore to limit the number of connections that can be accepted at once.
class Semaphore
# Get the semaphore for the current thread.
def self.instance
Thread.current.limited_semaphore ||= new
end

# Create a new semaphore with the given limit.
def initialize(limit = 1)
@queue = Thread::Queue.new
limit.times{release}
end

# Release the semaphore.
def release
@queue.push(true)
end

# Acquire the semaphore. May block until the semaphore is available.
def acquire
@queue.pop

return Token.new(self)
end

# A token that can be used to release the semaphore once and once only.
class Token
def initialize(semaphore)
@semaphore = semaphore
end

def release
if semaphore = @semaphore
@semaphore = nil
semaphore.release
end
end
end
end

# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
class Wrapper < IO::Endpoint::Wrapper
def socket_accept(server)
semaphore = Semaphore.instance

# Wait until there is a connection ready to be accepted:
server.wait_readable

# Acquire the semaphore:
Console.info(self, "Acquiring semaphore...")
token = semaphore.acquire
Comment on lines +51 to +55
Copy link

@macournoyer macournoyer Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a loop? For the case when it's readable, but the semaphore can't be acquire.

Suggested change
server.wait_readable
# Acquire the semaphore:
Console.info(self, "Acquiring semaphore...")
token = semaphore.acquire
token = loop do
server.wait_readable
# Acquire the semaphore:
Console.info(self, "Acquiring semaphore...")
break semaphore.try_acquire # acts like a acquire_nonblock, returns nil if not ready
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's an interesting point. I actually don't know the answer off the top of my head, I'll have to experiment and think about it a bit.


# Accept the connection:
socket, address = super
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)

# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
socket.define_singleton_method :token do
token
end

# Provide a way to release the semaphore when the connection is closed:
socket.define_singleton_method :close do
super()
ensure
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
token.release
end

return socket, address
end
end
end
100 changes: 100 additions & 0 deletions examples/limited/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Limited Example

This example shows how to limit the number of connections to a server. It takes advantage of `IO::Endpoint`'s wrapper to inject the necessary logic. More specifically, we do the following:

1. Instead of `accept`ing a connection in a loop directly, we call `server.wait_readable` to wait for a connection to be available.
2. We then try to acquire a semaphore token. If we can't, we wait for one to be available.
3. Once we have a token, we accept the connection and process it.
4. Once the connection is closed, we release the token.

This way, we can limit the number of connections to a server.

## Usage

Start the server:

```console
> bundle exec falcon host falcon.rb
0.0s info: Falcon::Command::Host [oid=0x4c8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| Falcon Host v0.49.0 taking flight!
| - Configuration: falcon.rb
| - To terminate: Ctrl-C or kill 99469
| - To reload: kill -HUP 99469
0.03s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| {status: "Initializing..."}
0.04s info: Falcon::Service::Server [oid=0x4e8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| Starting limited.localhost on #<Async::HTTP::Endpoint http://localhost:8080/ {reuse_address: true, timeout: nil, wrapper: #<Limited::Wrapper:0x000000011f5dfc30>}>
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| Controller starting...
0.04s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| {ready: true}
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300]
| Controller started...
```

Then, you can connect to it using `curl -v http://localhost:8080`. The default example includes two workers with a limit of one connection per worker.

```console
> curl -v http://localhost:8080
* Host localhost:8080 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
* Trying [::1]:8080...
* Connected to localhost (::1) port 8080
* using HTTP/1.x
> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.10.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 200 OK
< vary: accept-encoding
< content-length: 11
<
* Connection #0 to host localhost left intact
Hello World
```

There is also a fast path which simulates requests that may not count towards the connection limit:

```console
> curl -v http://localhost:8080/fast http://localhost:8080/fast
* Host localhost:8080 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
* Trying [::1]:8080...
* Connected to localhost (::1) port 8080
* using HTTP/1.x
> GET /fast HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.10.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 200 OK
< vary: accept-encoding
< connection: close
< content-length: 11
<
* shutting down connection #0
Hello World* Hostname localhost was found in DNS cache
* Trying [::1]:8080...
* Connected to localhost (::1) port 8080
* using HTTP/1.x
> GET /fast HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.10.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 200 OK
< vary: accept-encoding
< connection: close
< content-length: 11
<
* shutting down connection #1
Hello World
```

Note that we use `connection: close` because we are using the fast path. This is to ensure that the connection is closed immediately after the response is sent such that a subsequent "slow" request won't double up.
18 changes: 13 additions & 5 deletions lib/falcon/environment/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def count

# Options to use when creating the container.
def container_options
{restart: true, count: self.count, health_check_timeout: 30}.compact
{
restart: true,
count: self.count,
health_check_timeout: 30,
}.compact
end

# The host that this server will receive connections for.
Expand All @@ -45,13 +49,17 @@ def timeout
nil
end

def endpoint_options
{
reuse_address: true,
timeout: timeout,
}
end

# The upstream endpoint that will handle incoming requests.
# @returns [Async::HTTP::Endpoint]
def endpoint
::Async::HTTP::Endpoint.parse(url).with(
reuse_address: true,
timeout: timeout,
)
::Async::HTTP::Endpoint.parse(url).with(**endpoint_options)
end

def verbose
Expand Down
Loading