Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous pruning for RubyThreadPoolExecutor #1082

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module Concurrent
module Collection
# @!visibility private
# @!macro ruby_timeout_queue
class RubyTimeoutQueue < ::Queue
def initialize(*args)
if RUBY_VERSION >= '3.2'
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
end

super(*args)

@mutex = Mutex.new
@cond_var = ConditionVariable.new
end

def push(obj)
@mutex.synchronize do
super(obj)
@cond_var.signal
end
end
alias_method :enq, :push
alias_method :<<, :push

def pop(non_block = false, timeout: nil)
if non_block && timeout
raise ArgumentError, "can't set a timeout if non_block is enabled"
end

if non_block
super(true)
elsif @mutex.synchronize { empty? && timed_out?(timeout) { @cond_var.wait(@mutex, timeout) } }
nil
else
super(false)
end
end
alias_method :deq, :pop
alias_method :shift, :pop

private

def timed_out?(timeout)
return unless timeout

# https://github.com/ruby/ruby/pull/4256
if RUBY_VERSION >= '3.1'
yield.nil?
else
deadline = Concurrent.monotonic_time + timeout
yield
Concurrent.monotonic_time >= deadline
end
end
end
end
end
18 changes: 18 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Concurrent
module Collection
# @!visibility private
# @!macro internal_implementation_note
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
::Queue
else
require 'concurrent/collection/ruby_timeout_queue'
RubyTimeoutQueue
end
private_constant :TimeoutQueueImplementation

# @!visibility private
# @!macro timeout_queue
class TimeoutQueue < TimeoutQueueImplementation
end
end
end
14 changes: 0 additions & 14 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@ module Concurrent
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
# new tasks. A value of -1 indicates that the queue may grow without bound.

# @!macro thread_pool_executor_method_prune_pool
# Prune the thread pool of unneeded threads
#
# What is being pruned is controlled by the min_threads and idletime
# parameters passed at pool creation time
#
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
# pool will auto-prune each time a new job is posted. You will need to call
# this method explicitly in case your application post jobs in bursts (a
# lot of jobs and then nothing for long periods)

# @!macro thread_pool_executor_public_api
#
# @!macro abstract_executor_service_public_api
Expand Down Expand Up @@ -122,9 +111,6 @@ module Concurrent
#
# @!method can_overflow?
# @!macro executor_service_method_can_overflow_question
#
# @!method prune_pool
# @!macro thread_pool_executor_method_prune_pool



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ def running?
super && !@executor.isTerminating
end

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
end

private

def ns_initialize(opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'concurrent/concern/logging'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/utility/monotonic_time'
require 'concurrent/collection/timeout_queue'

module Concurrent

Expand Down Expand Up @@ -95,8 +96,16 @@ def remaining_capacity
end

# @!visibility private
def remove_busy_worker(worker)
synchronize { ns_remove_busy_worker worker }
def prunable_capacity
synchronize { ns_prunable_capacity }
end

# @!visibility private
def remove_worker(worker)
synchronize do
ns_remove_ready_worker worker
ns_remove_busy_worker worker
end
end

# @!visibility private
Expand All @@ -114,11 +123,6 @@ def worker_task_completed
synchronize { @completed_task_count += 1 }
end

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This public API no longer makes sense now that the pruning is automatic. It seems it was only added because of the Ruby thread pool's synchronous pruning, which the Java version didn't suffer from, hence why it's a no-op there.

synchronize { ns_prune_pool }
end

private

# @!visibility private
Expand Down Expand Up @@ -146,9 +150,6 @@ def ns_initialize(opts)
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$ # detects if Ruby has forked

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end

# @!visibility private
Expand All @@ -162,12 +163,10 @@ def ns_execute(*args, &task)

if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
nil
else
return fallback_action(*args, &task)
fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

# @!visibility private
Expand Down Expand Up @@ -218,7 +217,7 @@ def ns_assign_worker(*args, &task)
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
Expand Down Expand Up @@ -265,7 +264,7 @@ def ns_ready_worker(worker, last_message, success = true)
end
end

# removes a worker which is not in not tracked in @ready
# removes a worker which is not tracked in @ready
#
# @!visibility private
def ns_remove_busy_worker(worker)
Expand All @@ -274,23 +273,19 @@ def ns_remove_busy_worker(worker)
true
end

# try oldest worker if it is idle for enough time, it's returned back at the start
#
# @!visibility private
def ns_prune_pool
now = Concurrent.monotonic_time
stopped_workers = 0
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
worker, last_message = @ready.first
if now - last_message > self.idletime
stopped_workers += 1
@ready.shift
worker << :stop
else break
end
def ns_remove_ready_worker(worker)
if index = @ready.index { |rw, _| rw == worker }
@ready.delete_at(index)
end
true
end

@next_gc_time = Concurrent.monotonic_time + @gc_interval
def ns_prunable_capacity
if running?
[@pool.size - @min_length, @ready.size].min
else
@pool.size
end
end

def ns_reset_if_forked
Expand All @@ -312,7 +307,7 @@ class Worker

def initialize(pool, id)
# instance variables accessed only under pool's lock so no need to sync here again
@queue = Queue.new
@queue = Collection::TimeoutQueue.new
@pool = pool
@thread = create_worker @queue, pool, pool.idletime

Expand All @@ -338,17 +333,26 @@ def kill
def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
catch(:stop) do
loop do
prunable = true

case message = my_queue.pop
loop do
timeout = prunable && my_pool.running? ? my_idletime : nil
case message = my_queue.pop(timeout: timeout)
when nil
if my_pool.prunable_capacity.positive?
my_pool.remove_worker(self)
throw :stop
end

prunable = false
when :stop
my_pool.remove_busy_worker(self)
my_pool.remove_worker(self)
throw :stop

else
task, args = message
run_task my_pool, task, args
my_pool.ready_worker(self, Concurrent.monotonic_time)
prunable = true
end
end
end
Expand Down
30 changes: 15 additions & 15 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,13 @@ module Concurrent

context 'garbage collection' do

subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
subject { described_class.new(idletime: 0.1, max_threads: 2) }

it 'removes from pool any thread that has been idle too long' do
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
sleep 0.4
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
expect(subject.length).to be < 4
end

Expand Down Expand Up @@ -197,25 +195,27 @@ module Concurrent
expect(subject.length).to be >= 5
3.times { subject << proc { sleep(1) } }
sleep(0.1)
expect(subject.length).to be >= 5
expect(subject.length).to be >= 3
end
end
end

context 'stress' do
configurations = [
{ min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
{
min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 },
{ min_threads: 2,
max_threads: 4,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
},
{
min_threads: 2,
max_threads: 4,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 }
}
]

configurations.each do |config|
Expand Down
7 changes: 0 additions & 7 deletions spec/concurrent/executor/java_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ module Concurrent

it_should_behave_like :thread_pool_executor

context :prune do
it "is a no-op, pruning is handled by the JVM" do
executor = JavaThreadPoolExecutor.new
executor.prune_pool
end
end

context '#overload_policy' do

specify ':abort maps to AbortPolicy' do
Expand Down
Loading