diff --git a/lib/rage-rb.rb b/lib/rage-rb.rb index 5f9094c4..59bc7aec 100644 --- a/lib/rage-rb.rb +++ b/lib/rage-rb.rb @@ -65,8 +65,10 @@ def self.patch_active_record_connection_pool if is_connected puts "INFO: Patching ActiveRecord::ConnectionPool" Iodine.on_state(:on_start) do - ActiveRecord::Base.connection_pool.extend(Rage::Ext::ActiveRecord::ConnectionPool) - ActiveRecord::Base.connection_pool.__init_rage_extension + ActiveRecord::Base.connection_handler.connection_pool_list(:all).each do |pool| + pool.extend(Rage::Ext::ActiveRecord::ConnectionPool) + pool.__init_rage_extension + end end else puts "WARNING: DB connection is not established - can't patch ActiveRecord::ConnectionPool" diff --git a/lib/rage/cable/channel.rb b/lib/rage/cable/channel.rb index 5377578c..3947364a 100644 --- a/lib/rage/cable/channel.rb +++ b/lib/rage/cable/channel.rb @@ -135,7 +135,7 @@ def __register_action_proc(action_name) end is_subscribing = action_name == :subscribed - activerecord_loaded = defined?(::ActiveRecord) + should_release_connections = Rage.config.internal.should_manually_release_ar_connections? method_name = class_eval <<~RUBY, __FILE__, __LINE__ + 1 def __run_#{action_name}(data) @@ -163,12 +163,10 @@ def __run_#{action_name}(data) #{periodic_timers_chunk} #{rescue_handlers_chunk} - #{if activerecord_loaded + #{if should_release_connections <<~RUBY ensure - if ActiveRecord::Base.connection_pool.active_connection? - ActiveRecord::Base.connection_handler.clear_active_connections! - end + ActiveRecord::Base.connection_handler.clear_active_connections!(:all) RUBY end} end diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 50f2c569..c6f6b09c 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -130,9 +130,13 @@ # # > Disables the `io_write` hook to fix the ["zero-length iov"](https://bugs.ruby-lang.org/issues/19640) error on Ruby < 3.3. # -# • _RAGE_PATCH_AR_POOL_ +# • _RAGE_DISABLE_AR_POOL_PATCH_ # -# > Enables the `ActiveRecord::ConnectionPool` patch to optimize database connection management. Use it to increase throughput under high load. +# > Disables the `ActiveRecord::ConnectionPool` patch and makes Rage use the original ActiveRecord implementation. +# +# • _RAGE_DISABLE_AR_WEAK_CONNECTIONS_ +# +# > Instructs Rage to not reuse Active Record connections between different fibers. # class Rage::Configuration attr_accessor :logger @@ -264,6 +268,22 @@ class PublicFileServer class Internal attr_accessor :rails_mode + def patch_ar_pool? + !ENV["RAGE_DISABLE_AR_POOL_PATCH"] && !Rage.env.test? + end + + # whether we should manually release AR connections; + # AR 7.2+ uses `with_connection` internaly, so we only need to do this for older versions; + def should_manually_release_ar_connections? + defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.2.0") + end + + # whether we should manually reconnect closed AR connections; + # AR 7.1+ does this automatically while executing the query; + def should_manually_restore_ar_connections? + defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.1.0") + end + def inspect "#<#{self.class.name}>" end diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index 9894fe1b..31385793 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -76,8 +76,6 @@ def __register_action(action) "" end - activerecord_loaded = defined?(::ActiveRecord) - wrap_parameters_chunk = if __wrap_parameters_key <<~RUBY wrap_key = self.class.__wrap_parameters_key @@ -95,9 +93,12 @@ def __register_action(action) RUBY end + query_cache_enabled = defined?(::ActiveRecord) + should_release_connections = Rage.config.internal.should_manually_release_ar_connections? + class_eval <<~RUBY, __FILE__, __LINE__ + 1 def __run_#{action} - #{if activerecord_loaded + #{if query_cache_enabled <<~RUBY ActiveRecord::Base.connection_pool.enable_query_cache! RUBY @@ -119,12 +120,15 @@ def __run_#{action} #{rescue_handlers_chunk} ensure - #{if activerecord_loaded + #{if query_cache_enabled <<~RUBY ActiveRecord::Base.connection_pool.disable_query_cache! - if ActiveRecord::Base.connection_pool.active_connection? - ActiveRecord::Base.connection_handler.clear_active_connections! - end + RUBY + end} + + #{if should_release_connections + <<~RUBY + ActiveRecord::Base.connection_handler.clear_active_connections!(:all) RUBY end} diff --git a/lib/rage/ext/active_record/connection_pool.rb b/lib/rage/ext/active_record/connection_pool.rb index cb6ed07e..225cd691 100644 --- a/lib/rage/ext/active_record/connection_pool.rb +++ b/lib/rage/ext/active_record/connection_pool.rb @@ -24,11 +24,30 @@ def to_a end end + # reconnect closed connections on checkout; + # only included with `Rage.config.should_manually_restore_ar_connections?` + module ConnectionWithVerify + def connection + conn = super + + if conn.__needs_reconnect + conn.reconnect! + conn.__needs_reconnect = false + end + + conn + end + end + if Rage.config.internal.should_manually_restore_ar_connections? + prepend ConnectionWithVerify + end + def self.extended(instance) instance.class.alias_method :__checkout__, :checkout instance.class.alias_method :__remove__, :remove ActiveRecord::ConnectionAdapters::AbstractAdapter.attr_accessor(:__idle_since) + ActiveRecord::ConnectionAdapters::AbstractAdapter.attr_accessor(:__needs_reconnect) end def __init_rage_extension @@ -47,7 +66,7 @@ def __init_rage_extension @__checkout_timeout = checkout_timeout # how long a connection can be idle for before disconnecting - @__idle_timeout = reaper.frequency + @__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout # how often should we check for fibers that wait for a connection for too long @__timeout_worker_frequency = 0.5 @@ -65,8 +84,30 @@ def __init_rage_extension end end + # monitor connections health + if Rage.config.internal.should_manually_restore_ar_connections? + Iodine.run_every(1_000) do + i = 0 + while i < @__connections.length + conn = @__connections[i] + + unless conn.__needs_reconnect + needs_reconnect = !conn.active? rescue true + if needs_reconnect + conn.__needs_reconnect = true + conn.disconnect! + end + end + + i += 1 + end + end + end + + @release_connection_channel = "ext:ar-connection-released:#{object_id}" + # resume blocked fibers once connections become available - Iodine.subscribe("ext:ar-connection-released") do + Iodine.subscribe(@release_connection_channel) do if @__blocked.length > 0 && @__connections.length > 0 f, _ = @__blocked.shift f.resume @@ -75,7 +116,7 @@ def __init_rage_extension # unsubscribe on shutdown Iodine.on_state(:on_finish) do - Iodine.unsubscribe("ext:ar-connection-released") + Iodine.unsubscribe(@release_connection_channel) end end @@ -100,7 +141,7 @@ def release_connection(owner = Fiber.current) if (conn = @__in_use.delete(owner)) conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) @__connections << conn - Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 + Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end conn @@ -108,20 +149,27 @@ def release_connection(owner = Fiber.current) # Recover lost connections for the pool. def reap + crashed_fibers = nil + @__in_use.each do |fiber, conn| unless fiber.alive? if conn.active? conn.reset! - release_connection(fiber) + (crashed_fibers ||= []) << fiber else @__in_use.delete(fiber) conn.disconnect! __remove__(conn) + self.automatic_reconnect = true @__connections += build_new_connections(1) - Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 + Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end end end + + if crashed_fibers + crashed_fibers.each { |fiber| release_connection(fiber) } + end end # Disconnect all connections that have been idle for at least @@ -135,6 +183,7 @@ def flush(minimum_idle = @__idle_timeout) conn = @__connections[i] if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle conn.__idle_since = nil + conn.__needs_reconnect = true conn.disconnect! end i += 1 @@ -212,11 +261,12 @@ def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) end # create a new pool + self.automatic_reconnect = true @__connections = build_new_connections # notify blocked fibers that there are new connections available [@__blocked.length, @__connections.length].min.times do - Iodine.publish("ext:ar-connection-released", "", Iodine::PubSub::PROCESS) + Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) end end diff --git a/lib/rage/ext/setup.rb b/lib/rage/ext/setup.rb index aeac011b..3aa386b3 100644 --- a/lib/rage/ext/setup.rb +++ b/lib/rage/ext/setup.rb @@ -1,19 +1,45 @@ +if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("6") + fail "Rage is only compatible with Active Record 6+. Detected Active Record version: #{ActiveRecord.version}." +end + # set ActiveSupport isolation level if defined?(ActiveSupport::IsolatedExecutionState) ActiveSupport::IsolatedExecutionState.isolation_level = :fiber end +# patch Active Record 6.0 to accept the role argument +if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("6.1") + %i(active_connections? connection_pool_list clear_active_connections!).each do |m| + ActiveRecord::Base.connection_handler.define_singleton_method(m) do |_ = nil| + super() + end + end +end + # release ActiveRecord connections on yield -if defined?(ActiveRecord) && ActiveRecord.version < Gem::Version.create("7.1.0") - class Fiber - def self.defer - res = Fiber.yield +if defined?(ActiveRecord) && Rage.config.internal.patch_ar_pool? + if ENV["RAGE_DISABLE_AR_WEAK_CONNECTIONS"] + unless Rage.config.internal.should_manually_release_ar_connections? + puts "WARNING: The RAGE_DISABLE_AR_WEAK_CONNECTIONS setting does not have any effect with Active Record 7.2+" + end + elsif Rage.config.internal.should_manually_release_ar_connections? + class Fiber + def self.defer(fileno) + f = Fiber.current + f.__awaited_fileno = fileno - if ActiveRecord::Base.connection_pool.active_connection? - ActiveRecord::Base.connection_handler.clear_active_connections! - end + res = Fiber.yield - res + if ActiveRecord::Base.connection_handler.active_connections?(:all) + Iodine.defer do + if fileno != f.__awaited_fileno || !f.alive? + ActiveRecord::Base.connection_handler.connection_pool_list(:all).each { |pool| pool.release_connection(f) } + end + end + end + + res + end end end end @@ -31,6 +57,6 @@ def connection_cache_key(_) end # patch `ActiveRecord::ConnectionPool` -if defined?(ActiveRecord) && ENV["RAGE_PATCH_AR_POOL"] && !Rage.env.test? +if defined?(ActiveRecord) && Rage.config.internal.patch_ar_pool? Rage.patch_active_record_connection_pool end diff --git a/lib/rage/fiber.rb b/lib/rage/fiber.rb index dda50921..2220bacd 100644 --- a/lib/rage/fiber.rb +++ b/lib/rage/fiber.rb @@ -39,6 +39,43 @@ # Many developers see fibers as "lightweight threads" that should be used in conjunction with fiber pools, the same way we use thread pools for threads.
# Instead, it makes sense to think of fibers as regular Ruby objects. We don't use a pool of arrays when we need to create an array - we create a new object and let Ruby and the GC do their job.
# Same applies to fibers. Feel free to create as many fibers as you need on demand. +# +# ## Active Record Connections +# +# Let's consider the following controller, where we update a record in the database: +# +# ```ruby +# class UsersController < RageController::API +# def update +# User.update!(params[:id], email: params[:email]) +# render status: :ok +# end +# end +# ``` +# +# The `User.update!` call here checks out an Active Record connection, and Rage will automatically check it back in once the action is completed. So far so good! +# +# Let's consider another example: +# +# ```ruby +# require "net/http" +# +# class UsersController < RageController::API +# def update +# User.update!(params[:id], email: params[:email]) # takes 5ms +# Net::HTTP.post_form(URI("https://mailing.service/update"), { user_id: params[:id] }) # takes 50ms +# render status: :ok +# end +# end +# ``` +# +# Here, we've added another step: once the record is updated, we will send a request to update the user's data in the mailing list service. +# +# However, in this case, we want to release the Active Record connection before the action is completed. You can see that we need the connection only for the `User.update!` call. +# The next 50ms the code will spend waiting for the HTTP request to finish, and if we don't release the Active Record connection right away, other fibers won't be able to use it. +# +# Active Record 7.2 handles this case by using [#with_connection](https://api.rubyonrails.org/classes/ActiveRecord/ConnectionAdapters/ConnectionPool.html#method-i-with_connection) internally. +# With older Active Record versions, Rage handles this case on its own by keeping track of blocking calls and releasing Active Record connections between them. class Fiber # @private AWAIT_ERROR_MESSAGE = "err" @@ -81,6 +118,9 @@ def __block_channel(force = false) "block:#{object_id}:#{@__block_channel_i}" end + # @private + attr_accessor :__awaited_fileno + # @private # pause a fiber and resume in the next iteration of the event loop def self.pause @@ -138,7 +178,7 @@ def self.await(fibers) end end - Fiber.yield + Fiber.defer(-1) Iodine.defer { Iodine.unsubscribe("await:#{f.object_id}") } # if num_wait_for is not 0 means we exited prematurely because of an error diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index b577012d..56b37189 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -14,9 +14,9 @@ def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil || 0) { |err| f.resume(err) } - err = Fiber.defer - if err == Errno::ETIMEDOUT::Errno - 0 + err = Fiber.defer(io.fileno) + if err && err < 0 + err else events end diff --git a/rage.gemspec b/rage.gemspec index e095bb11..fc44c96d 100644 --- a/rage.gemspec +++ b/rage.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", "~> 1.0" spec.add_dependency "rack", "~> 2.0" - spec.add_dependency "rage-iodine", "~> 3.0" + spec.add_dependency "rage-iodine", "~> 4.0" spec.add_dependency "zeitwerk", "~> 2.6" spec.add_dependency "rack-test", "~> 2.1" end