Skip to content

Commit

Permalink
Merge pull request #103 from rage-rb/standalone-db
Browse files Browse the repository at this point in the history
DB Pool Updates
  • Loading branch information
rsamoilov authored Sep 13, 2024
2 parents bbd50fa + 24ebe94 commit 06da72c
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 37 deletions.
6 changes: 4 additions & 2 deletions lib/rage-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def self.patch_active_record_connection_pool
if is_connected
puts "INFO: Patching ActiveRecord::ConnectionPool"
Iodine.on_state(:on_start) do
ActiveRecord::Base.connection_pool.extend(Rage::Ext::ActiveRecord::ConnectionPool)
ActiveRecord::Base.connection_pool.__init_rage_extension
ActiveRecord::Base.connection_handler.connection_pool_list(:all).each do |pool|
pool.extend(Rage::Ext::ActiveRecord::ConnectionPool)
pool.__init_rage_extension
end
end
else
puts "WARNING: DB connection is not established - can't patch ActiveRecord::ConnectionPool"
Expand Down
8 changes: 3 additions & 5 deletions lib/rage/cable/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def __register_action_proc(action_name)
end

is_subscribing = action_name == :subscribed
activerecord_loaded = defined?(::ActiveRecord)
should_release_connections = Rage.config.internal.should_manually_release_ar_connections?

method_name = class_eval <<~RUBY, __FILE__, __LINE__ + 1
def __run_#{action_name}(data)
Expand Down Expand Up @@ -163,12 +163,10 @@ def __run_#{action_name}(data)
#{periodic_timers_chunk}
#{rescue_handlers_chunk}
#{if activerecord_loaded
#{if should_release_connections
<<~RUBY
ensure
if ActiveRecord::Base.connection_pool.active_connection?
ActiveRecord::Base.connection_handler.clear_active_connections!
end
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
RUBY
end}
end
Expand Down
24 changes: 22 additions & 2 deletions lib/rage/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@
#
# > Disables the `io_write` hook to fix the ["zero-length iov"](https://bugs.ruby-lang.org/issues/19640) error on Ruby < 3.3.
#
# • _RAGE_PATCH_AR_POOL_
# • _RAGE_DISABLE_AR_POOL_PATCH_
#
# > Enables the `ActiveRecord::ConnectionPool` patch to optimize database connection management. Use it to increase throughput under high load.
# > Disables the `ActiveRecord::ConnectionPool` patch and makes Rage use the original ActiveRecord implementation.
#
# • _RAGE_DISABLE_AR_WEAK_CONNECTIONS_
#
# > Instructs Rage to not reuse Active Record connections between different fibers.
#
class Rage::Configuration
attr_accessor :logger
Expand Down Expand Up @@ -264,6 +268,22 @@ class PublicFileServer
class Internal
attr_accessor :rails_mode

def patch_ar_pool?
!ENV["RAGE_DISABLE_AR_POOL_PATCH"] && !Rage.env.test?
end

# whether we should manually release AR connections;
# AR 7.2+ uses `with_connection` internaly, so we only need to do this for older versions;
def should_manually_release_ar_connections?
defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.2.0")
end

# whether we should manually reconnect closed AR connections;
# AR 7.1+ does this automatically while executing the query;
def should_manually_restore_ar_connections?
defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.1.0")
end

def inspect
"#<#{self.class.name}>"
end
Expand Down
18 changes: 11 additions & 7 deletions lib/rage/controller/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ def __register_action(action)
""
end

activerecord_loaded = defined?(::ActiveRecord)

wrap_parameters_chunk = if __wrap_parameters_key
<<~RUBY
wrap_key = self.class.__wrap_parameters_key
Expand All @@ -95,9 +93,12 @@ def __register_action(action)
RUBY
end

query_cache_enabled = defined?(::ActiveRecord)
should_release_connections = Rage.config.internal.should_manually_release_ar_connections?

class_eval <<~RUBY, __FILE__, __LINE__ + 1
def __run_#{action}
#{if activerecord_loaded
#{if query_cache_enabled
<<~RUBY
ActiveRecord::Base.connection_pool.enable_query_cache!
RUBY
Expand All @@ -119,12 +120,15 @@ def __run_#{action}
#{rescue_handlers_chunk}
ensure
#{if activerecord_loaded
#{if query_cache_enabled
<<~RUBY
ActiveRecord::Base.connection_pool.disable_query_cache!
if ActiveRecord::Base.connection_pool.active_connection?
ActiveRecord::Base.connection_handler.clear_active_connections!
end
RUBY
end}
#{if should_release_connections
<<~RUBY
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
RUBY
end}
Expand Down
64 changes: 57 additions & 7 deletions lib/rage/ext/active_record/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,30 @@ def to_a
end
end

# reconnect closed connections on checkout;
# only included with `Rage.config.should_manually_restore_ar_connections?`
module ConnectionWithVerify
def connection
conn = super

if conn.__needs_reconnect
conn.reconnect!
conn.__needs_reconnect = false
end

conn
end
end
if Rage.config.internal.should_manually_restore_ar_connections?
prepend ConnectionWithVerify
end

def self.extended(instance)
instance.class.alias_method :__checkout__, :checkout
instance.class.alias_method :__remove__, :remove

ActiveRecord::ConnectionAdapters::AbstractAdapter.attr_accessor(:__idle_since)
ActiveRecord::ConnectionAdapters::AbstractAdapter.attr_accessor(:__needs_reconnect)
end

def __init_rage_extension
Expand All @@ -47,7 +66,7 @@ def __init_rage_extension
@__checkout_timeout = checkout_timeout

# how long a connection can be idle for before disconnecting
@__idle_timeout = reaper.frequency
@__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout

# how often should we check for fibers that wait for a connection for too long
@__timeout_worker_frequency = 0.5
Expand All @@ -65,8 +84,30 @@ def __init_rage_extension
end
end

# monitor connections health
if Rage.config.internal.should_manually_restore_ar_connections?
Iodine.run_every(1_000) do
i = 0
while i < @__connections.length
conn = @__connections[i]

unless conn.__needs_reconnect
needs_reconnect = !conn.active? rescue true
if needs_reconnect
conn.__needs_reconnect = true
conn.disconnect!
end
end

i += 1
end
end
end

@release_connection_channel = "ext:ar-connection-released:#{object_id}"

# resume blocked fibers once connections become available
Iodine.subscribe("ext:ar-connection-released") do
Iodine.subscribe(@release_connection_channel) do
if @__blocked.length > 0 && @__connections.length > 0
f, _ = @__blocked.shift
f.resume
Expand All @@ -75,7 +116,7 @@ def __init_rage_extension

# unsubscribe on shutdown
Iodine.on_state(:on_finish) do
Iodine.unsubscribe("ext:ar-connection-released")
Iodine.unsubscribe(@release_connection_channel)
end
end

Expand All @@ -100,28 +141,35 @@ def release_connection(owner = Fiber.current)
if (conn = @__in_use.delete(owner))
conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@__connections << conn
Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
end

conn
end

# Recover lost connections for the pool.
def reap
crashed_fibers = nil

@__in_use.each do |fiber, conn|
unless fiber.alive?
if conn.active?
conn.reset!
release_connection(fiber)
(crashed_fibers ||= []) << fiber
else
@__in_use.delete(fiber)
conn.disconnect!
__remove__(conn)
self.automatic_reconnect = true
@__connections += build_new_connections(1)
Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
end
end
end

if crashed_fibers
crashed_fibers.each { |fiber| release_connection(fiber) }
end
end

# Disconnect all connections that have been idle for at least
Expand All @@ -135,6 +183,7 @@ def flush(minimum_idle = @__idle_timeout)
conn = @__connections[i]
if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle
conn.__idle_since = nil
conn.__needs_reconnect = true
conn.disconnect!
end
i += 1
Expand Down Expand Up @@ -212,11 +261,12 @@ def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0)
end

# create a new pool
self.automatic_reconnect = true
@__connections = build_new_connections

# notify blocked fibers that there are new connections available
[@__blocked.length, @__connections.length].min.times do
Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS)
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS)
end
end

Expand Down
44 changes: 35 additions & 9 deletions lib/rage/ext/setup.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("6")
fail "Rage is only compatible with Active Record 6+. Detected Active Record version: #{ActiveRecord.version}."
end

# set ActiveSupport isolation level
if defined?(ActiveSupport::IsolatedExecutionState)
ActiveSupport::IsolatedExecutionState.isolation_level = :fiber
end

# patch Active Record 6.0 to accept the role argument
if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("6.1")
%i(active_connections? connection_pool_list clear_active_connections!).each do |m|
ActiveRecord::Base.connection_handler.define_singleton_method(m) do |_ = nil|
super()
end
end
end

# release ActiveRecord connections on yield
if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.1.0")
class Fiber
def self.defer
res = Fiber.yield
if defined?(ActiveRecord) && Rage.config.internal.patch_ar_pool?
if ENV["RAGE_DISABLE_AR_WEAK_CONNECTIONS"]
unless Rage.config.internal.should_manually_release_ar_connections?
puts "WARNING: The RAGE_DISABLE_AR_WEAK_CONNECTIONS setting does not have any effect with Active Record 7.2+"
end
elsif Rage.config.internal.should_manually_release_ar_connections?
class Fiber
def self.defer(fileno)
f = Fiber.current
f.__awaited_fileno = fileno

if ActiveRecord::Base.connection_pool.active_connection?
ActiveRecord::Base.connection_handler.clear_active_connections!
end
res = Fiber.yield

res
if ActiveRecord::Base.connection_handler.active_connections?(:all)
Iodine.defer do
if fileno != f.__awaited_fileno || !f.alive?
ActiveRecord::Base.connection_handler.connection_pool_list(:all).each { |pool| pool.release_connection(f) }
end
end
end

res
end
end
end
end
Expand All @@ -31,6 +57,6 @@ def connection_cache_key(_)
end

# patch `ActiveRecord::ConnectionPool`
if defined?(ActiveRecord) && ENV["RAGE_PATCH_AR_POOL"] && !Rage.env.test?
if defined?(ActiveRecord) && Rage.config.internal.patch_ar_pool?
Rage.patch_active_record_connection_pool
end
42 changes: 41 additions & 1 deletion lib/rage/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,43 @@
# Many developers see fibers as "lightweight threads" that should be used in conjunction with fiber pools, the same way we use thread pools for threads.<br>
# Instead, it makes sense to think of fibers as regular Ruby objects. We don't use a pool of arrays when we need to create an array - we create a new object and let Ruby and the GC do their job.<br>
# Same applies to fibers. Feel free to create as many fibers as you need on demand.
#
# ## Active Record Connections
#
# Let's consider the following controller, where we update a record in the database:
#
# ```ruby
# class UsersController < RageController::API
# def update
# User.update!(params[:id], email: params[:email])
# render status: :ok
# end
# end
# ```
#
# The `User.update!` call here checks out an Active Record connection, and Rage will automatically check it back in once the action is completed. So far so good!
#
# Let's consider another example:
#
# ```ruby
# require "net/http"
#
# class UsersController < RageController::API
# def update
# User.update!(params[:id], email: params[:email]) # takes 5ms
# Net::HTTP.post_form(URI("https://mailing.service/update"), { user_id: params[:id] }) # takes 50ms
# render status: :ok
# end
# end
# ```
#
# Here, we've added another step: once the record is updated, we will send a request to update the user's data in the mailing list service.
#
# However, in this case, we want to release the Active Record connection before the action is completed. You can see that we need the connection only for the `User.update!` call.
# The next 50ms the code will spend waiting for the HTTP request to finish, and if we don't release the Active Record connection right away, other fibers won't be able to use it.
#
# Active Record 7.2 handles this case by using [#with_connection](https://api.rubyonrails.org/classes/ActiveRecord/ConnectionAdapters/ConnectionPool.html#method-i-with_connection) internally.
# With older Active Record versions, Rage handles this case on its own by keeping track of blocking calls and releasing Active Record connections between them.
class Fiber
# @private
AWAIT_ERROR_MESSAGE = "err"
Expand Down Expand Up @@ -81,6 +118,9 @@ def __block_channel(force = false)
"block:#{object_id}:#{@__block_channel_i}"
end

# @private
attr_accessor :__awaited_fileno

# @private
# pause a fiber and resume in the next iteration of the event loop
def self.pause
Expand Down Expand Up @@ -138,7 +178,7 @@ def self.await(fibers)
end
end

Fiber.yield
Fiber.defer(-1)
Iodine.defer { Iodine.unsubscribe("await:#{f.object_id}") }

# if num_wait_for is not 0 means we exited prematurely because of an error
Expand Down
6 changes: 3 additions & 3 deletions lib/rage/fiber_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ def io_wait(io, events, timeout = nil)
f = Fiber.current
::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil || 0) { |err| f.resume(err) }

err = Fiber.defer
if err == Errno::ETIMEDOUT::Errno
0
err = Fiber.defer(io.fileno)
if err && err < 0
err
else
events
end
Expand Down
Loading

0 comments on commit 06da72c

Please sign in to comment.