-
-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit inbound connections. #271
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
#!/usr/bin/env falcon --verbose serve -c | ||
# frozen_string_literal: true | ||
|
||
def limited_semaphore_token(request) | ||
if request.respond_to?(:connection) | ||
io = request.connection.stream.io | ||
|
||
if io.respond_to?(:token) | ||
return io.token | ||
end | ||
end | ||
|
||
return nil | ||
end | ||
|
||
run do |env| | ||
# This is not part of the rack specification, but is available when running under Falcon. | ||
request = env["protocol.http.request"] | ||
|
||
# There is no guarantee that there is a connection or that the connection has a token: | ||
token = limited_semaphore_token(request) | ||
|
||
Console.info(self, "Sleeping 10 seconds", token: token) | ||
|
||
if env["PATH_INFO"] == "/fast" | ||
if token | ||
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already. | ||
token.release | ||
request.connection.persistent = false | ||
end | ||
|
||
# Simulated "fast / non-blocking" request: | ||
sleep(1) | ||
else | ||
# Simulated "slow / blocking" request: | ||
sleep(10) | ||
end | ||
|
||
[200, {}, ["Hello World"]] | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
#!/usr/bin/env falcon-host | ||
# frozen_string_literal: true | ||
|
||
# Released under the MIT License. | ||
# Copyright, 2019-2024, by Samuel Williams. | ||
|
||
require "falcon/environment/rack" | ||
require_relative "limited" | ||
|
||
service "limited.localhost" do | ||
include Falcon::Environment::Rack | ||
|
||
scheme "http" | ||
protocol {Async::HTTP::Protocol::HTTP} | ||
|
||
# Extend the endpoint options to include the (connection) limited wrapper. | ||
endpoint_options do | ||
super().merge(wrapper: Limited::Wrapper.new) | ||
end | ||
|
||
count 2 | ||
|
||
url "http://localhost:8080" | ||
|
||
endpoint do | ||
::Async::HTTP::Endpoint.parse(url).with(**endpoint_options) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
module Limited | ||
# Thread local storage for the semaphore (per-worker): | ||
Thread.attr_accessor :limited_semaphore | ||
|
||
# We use a thread-safe semaphore to limit the number of connections that can be accepted at once. | ||
class Semaphore | ||
# Get the semaphore for the current thread. | ||
def self.instance | ||
Thread.current.limited_semaphore ||= new | ||
end | ||
|
||
# Create a new semaphore with the given limit. | ||
def initialize(limit = 1) | ||
@queue = Thread::Queue.new | ||
limit.times{release} | ||
end | ||
|
||
# Release the semaphore. | ||
def release | ||
@queue.push(true) | ||
end | ||
|
||
# Acquire the semaphore. May block until the semaphore is available. | ||
def acquire | ||
@queue.pop | ||
|
||
return Token.new(self) | ||
end | ||
|
||
# A token that can be used to release the semaphore once and once only. | ||
class Token | ||
def initialize(semaphore) | ||
@semaphore = semaphore | ||
end | ||
|
||
def release | ||
if semaphore = @semaphore | ||
@semaphore = nil | ||
semaphore.release | ||
end | ||
end | ||
end | ||
end | ||
|
||
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted. | ||
class Wrapper < IO::Endpoint::Wrapper | ||
def socket_accept(server) | ||
semaphore = Semaphore.instance | ||
|
||
# Wait until there is a connection ready to be accepted: | ||
server.wait_readable | ||
|
||
# Acquire the semaphore: | ||
Console.info(self, "Acquiring semaphore...") | ||
token = semaphore.acquire | ||
|
||
# Accept the connection: | ||
socket, address = super | ||
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket) | ||
|
||
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server: | ||
socket.define_singleton_method :token do | ||
token | ||
end | ||
|
||
# Provide a way to release the semaphore when the connection is closed: | ||
socket.define_singleton_method :close do | ||
super() | ||
ensure | ||
Console.info(self, "Closing connection from #{address.inspect}", socket: socket) | ||
token.release | ||
end | ||
|
||
return socket, address | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# Limited Example | ||
|
||
This example shows how to limit the number of connections to a server. It takes advantage of `IO::Endpoint`'s wrapper to inject the necessary logic. More specifically, we do the following: | ||
|
||
1. Instead of `accept`ing a connection in a loop directly, we call `server.wait_readable` to wait for a connection to be available. | ||
2. We then try to acquire a semaphore token. If we can't, we wait for one to be available. | ||
3. Once we have a token, we accept the connection and process it. | ||
4. Once the connection is closed, we release the token. | ||
|
||
This way, we can limit the number of connections to a server. | ||
|
||
## Usage | ||
|
||
Start the server: | ||
|
||
```console | ||
> bundle exec falcon host falcon.rb | ||
0.0s info: Falcon::Command::Host [oid=0x4c8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| Falcon Host v0.49.0 taking flight! | ||
| - Configuration: falcon.rb | ||
| - To terminate: Ctrl-C or kill 99469 | ||
| - To reload: kill -HUP 99469 | ||
0.03s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| {status: "Initializing..."} | ||
0.04s info: Falcon::Service::Server [oid=0x4e8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| Starting limited.localhost on #<Async::HTTP::Endpoint http://localhost:8080/ {reuse_address: true, timeout: nil, wrapper: #<Limited::Wrapper:0x000000011f5dfc30>}> | ||
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| Controller starting... | ||
0.04s info: Async::Container::Notify::Console [oid=0x4d8] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| {ready: true} | ||
0.04s info: Async::Service::Controller [oid=0x4f0] [ec=0x4d0] [pid=99469] [2025-02-11 17:53:59 +1300] | ||
| Controller started... | ||
``` | ||
|
||
Then, you can connect to it using `curl -v http://localhost:8080`. The default example includes two workers with a limit of one connection per worker. | ||
|
||
```console | ||
> curl -v http://localhost:8080 | ||
* Host localhost:8080 was resolved. | ||
* IPv6: ::1 | ||
* IPv4: 127.0.0.1 | ||
* Trying [::1]:8080... | ||
* Connected to localhost (::1) port 8080 | ||
* using HTTP/1.x | ||
> GET / HTTP/1.1 | ||
> Host: localhost:8080 | ||
> User-Agent: curl/8.10.1 | ||
> Accept: */* | ||
> | ||
* Request completely sent off | ||
< HTTP/1.1 200 OK | ||
< vary: accept-encoding | ||
< content-length: 11 | ||
< | ||
* Connection #0 to host localhost left intact | ||
Hello World | ||
``` | ||
|
||
There is also a fast path which simulates requests that may not count towards the connection limit: | ||
|
||
```console | ||
> curl -v http://localhost:8080/fast http://localhost:8080/fast | ||
* Host localhost:8080 was resolved. | ||
* IPv6: ::1 | ||
* IPv4: 127.0.0.1 | ||
* Trying [::1]:8080... | ||
* Connected to localhost (::1) port 8080 | ||
* using HTTP/1.x | ||
> GET /fast HTTP/1.1 | ||
> Host: localhost:8080 | ||
> User-Agent: curl/8.10.1 | ||
> Accept: */* | ||
> | ||
* Request completely sent off | ||
< HTTP/1.1 200 OK | ||
< vary: accept-encoding | ||
< connection: close | ||
< content-length: 11 | ||
< | ||
* shutting down connection #0 | ||
Hello World* Hostname localhost was found in DNS cache | ||
* Trying [::1]:8080... | ||
* Connected to localhost (::1) port 8080 | ||
* using HTTP/1.x | ||
> GET /fast HTTP/1.1 | ||
> Host: localhost:8080 | ||
> User-Agent: curl/8.10.1 | ||
> Accept: */* | ||
> | ||
* Request completely sent off | ||
< HTTP/1.1 200 OK | ||
< vary: accept-encoding | ||
< connection: close | ||
< content-length: 11 | ||
< | ||
* shutting down connection #1 | ||
Hello World | ||
``` | ||
|
||
Note that we use `connection: close` because we are using the fast path. This is to ensure that the connection is closed immediately after the response is sent such that a subsequent "slow" request won't double up. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in a loop? For the case when it's readable, but the semaphore can't be acquire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's an interesting point. I actually don't know the answer off the top of my head, I'll have to experiment and think about it a bit.