diff --git a/lib/active_job/queue_adapters/amazon_sqs_adapter.rb b/lib/active_job/queue_adapters/amazon_sqs_adapter.rb index 19d3ea57..c5f0698c 100644 --- a/lib/active_job/queue_adapters/amazon_sqs_adapter.rb +++ b/lib/active_job/queue_adapters/amazon_sqs_adapter.rb @@ -14,6 +14,27 @@ def enqueue_at(job, timestamp) _enqueue(job, nil, delay_seconds: delay) end + def enqueue_all(jobs) + jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs| + queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(queue_name) + base_send_message_opts = { queue_url: queue_url } + + same_queue_jobs.each_slice(10) do |chunk| + entries = chunk.map do |job| + entry = Params.new(job, nil).entry + entry[:id] = job.job_id + entry[:delay_seconds] = Params.assured_delay_seconds(job.scheduled_at) if job.scheduled_at + entry + end + + send_message_opts = base_send_message_opts.deep_dup + send_message_opts[:entries] = entries + + Aws::Rails::SqsActiveJob.config.client.send_message_batch(send_message_opts) + end + end + end + private def _enqueue(job, body = nil, send_message_opts = {}) diff --git a/test/active_job/queue_adapters/amazon_sqs_adapter_test.rb b/test/active_job/queue_adapters/amazon_sqs_adapter_test.rb index b72ab5f5..86363b7e 100644 --- a/test/active_job/queue_adapters/amazon_sqs_adapter_test.rb +++ b/test/active_job/queue_adapters/amazon_sqs_adapter_test.rb @@ -188,6 +188,32 @@ module QueueAdapters end.to raise_error ArgumentError end end + + if Gem::Version.new(Rails::VERSION::STRING) >= Gem::Version.new('7.1.0') + describe 'with multiple jobs' do + it do + expect(client).to receive(:send_message_batch).with( + { + queue_url: 'https://queue-url', + entries: [ + { + id: instance_of(String), + message_body: instance_of(String), + message_attributes: instance_of(Hash) + }, + { + id: instance_of(String), + message_body: instance_of(String), + message_attributes: instance_of(Hash) + } + ] + } + ).once + + ActiveJob.perform_all_later(TestJob.new('test'), TestJob.new('test')) + end + end + end end end end