From eff271daf64a1a04d27a06f2fc8dda424da682b6 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 17:26:35 +0100 Subject: [PATCH 01/12] feature_decouple_dependencies --- eventq_aws/eventq_aws.gemspec | 1 - eventq_base/Gemfile | 1 + eventq_base/eventq_base.gemspec | 6 ------ eventq_base/lib/eventq_base.rb | 5 +---- eventq_base/lib/eventq_base/configuration.rb | 2 +- eventq_base/lib/eventq_base/nonce_manager.rb | 1 + .../lib/eventq_base/serialization_providers.rb | 3 +++ .../binary_serialization_provider.rb | 15 +++++++++++++++ .../json_serialization_provider.rb | 3 +++ .../oj_serialization_provider.rb | 1 + .../sha256_signature_provider.rb | 5 ++--- eventq_base/lib/eventq_base/version.rb | 2 +- 12 files changed, 29 insertions(+), 16 deletions(-) create mode 100644 eventq_base/lib/eventq_base/serialization_providers/binary_serialization_provider.rb diff --git a/eventq_aws/eventq_aws.gemspec b/eventq_aws/eventq_aws.gemspec index 97ff527..4e17446 100644 --- a/eventq_aws/eventq_aws.gemspec +++ b/eventq_aws/eventq_aws.gemspec @@ -26,5 +26,4 @@ Gem::Specification.new do |spec| spec.add_dependency 'aws-sdk-core' spec.add_dependency 'eventq_base' - spec.add_dependency 'hash_kit' end diff --git a/eventq_base/Gemfile b/eventq_base/Gemfile index 1192b22..bf87e43 100644 --- a/eventq_base/Gemfile +++ b/eventq_base/Gemfile @@ -8,3 +8,4 @@ gem 'oj', '2.15.0' gem 'json', '1.8.3' gem 'rspec' gem 'redlock' +gem 'pry' diff --git a/eventq_base/eventq_base.gemspec b/eventq_base/eventq_base.gemspec index c9b31bf..bf85ad3 100644 --- a/eventq_base/eventq_base.gemspec +++ b/eventq_base/eventq_base.gemspec @@ -22,12 +22,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", "~> 1.11" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" - spec.add_development_dependency "pry" spec.add_dependency 'class_kit' - spec.add_dependency 'hash_kit' - spec.add_dependency 'json_kit' - spec.add_dependency 'oj' - spec.add_dependency 'redlock' - spec.add_dependency 'openssl' end diff --git a/eventq_base/lib/eventq_base.rb b/eventq_base/lib/eventq_base.rb index bb9c796..1c96498 100644 --- a/eventq_base/lib/eventq_base.rb +++ b/eventq_base/lib/eventq_base.rb @@ -1,9 +1,6 @@ require 'securerandom' -require 'class_kit' -require 'hash_kit' -require 'json_kit' -require 'oj' require 'redlock' +require 'class_kit' require_relative 'eventq_base/version' require_relative 'eventq_base/eventq_logger' diff --git a/eventq_base/lib/eventq_base/configuration.rb b/eventq_base/lib/eventq_base/configuration.rb index 3bd50e9..b46460d 100644 --- a/eventq_base/lib/eventq_base/configuration.rb +++ b/eventq_base/lib/eventq_base/configuration.rb @@ -6,7 +6,7 @@ def self.serialization_provider=(value) end def self.serialization_provider - @serialization_provider ||= EventQ::SerializationProviders::OJ_PROVIDER + @serialization_provider ||= EventQ::SerializationProviders::BINARY_PROVIDER end def self.signature_provider=(value) diff --git a/eventq_base/lib/eventq_base/nonce_manager.rb b/eventq_base/lib/eventq_base/nonce_manager.rb index 800e934..a2ee309 100644 --- a/eventq_base/lib/eventq_base/nonce_manager.rb +++ b/eventq_base/lib/eventq_base/nonce_manager.rb @@ -24,6 +24,7 @@ def self.is_allowed?(nonce) return true end + require 'redlock' lock = Redlock::Client.new([ @server_url ]).lock(nonce, @timeout) if lock == false EventQ.log(:info, "[#{self.class}] - Message has already been processed: #{nonce}") diff --git a/eventq_base/lib/eventq_base/serialization_providers.rb b/eventq_base/lib/eventq_base/serialization_providers.rb index d2e6081..678ec98 100644 --- a/eventq_base/lib/eventq_base/serialization_providers.rb +++ b/eventq_base/lib/eventq_base/serialization_providers.rb @@ -1,17 +1,20 @@ require_relative 'serialization_providers/json_serialization_provider' require_relative 'serialization_providers/oj_serialization_provider' +require_relative 'serialization_providers/binary_serialization_provider' module EventQ module SerializationProviders OJ_PROVIDER = 'oj'.freeze JSON_PROVIDER = 'json'.freeze + BINARY_PROVIDER = 'binary'.freeze class Manager def initialize @providers = {} @providers[OJ_PROVIDER] = EventQ::SerializationProviders::OjSerializationProvider @providers[JSON_PROVIDER] = EventQ::SerializationProviders::JsonSerializationProvider + @providers[BINARY_PROVIDER] = EventQ::SerializationProviders::BinarySerializationProvider end def get_provider(provider_type) diff --git a/eventq_base/lib/eventq_base/serialization_providers/binary_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/binary_serialization_provider.rb new file mode 100644 index 0000000..f379b49 --- /dev/null +++ b/eventq_base/lib/eventq_base/serialization_providers/binary_serialization_provider.rb @@ -0,0 +1,15 @@ +module EventQ + module SerializationProviders + class BinarySerializationProvider + + def serialize(object) + Marshal::dump(object) + end + + def deserialize(msg) + Marshal::load(msg) + end + + end + end +end diff --git a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb index 0b338f6..5415a93 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb @@ -3,6 +3,9 @@ module SerializationProviders class JsonSerializationProvider def initialize + require 'json' + require 'class_kit' + require 'hash_kit' @class_kit_helper = ClassKit::Helper.new @hash_helper = HashKit::Helper.new end diff --git a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb index 413d19b..ed1d58d 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb @@ -3,6 +3,7 @@ module SerializationProviders class OjSerializationProvider def initialize + require 'oj' @json_serializer = EventQ::SerializationProviders::JsonSerializationProvider.new end diff --git a/eventq_base/lib/eventq_base/signature_providers/sha256_signature_provider.rb b/eventq_base/lib/eventq_base/signature_providers/sha256_signature_provider.rb index bdaedac..27913a8 100644 --- a/eventq_base/lib/eventq_base/signature_providers/sha256_signature_provider.rb +++ b/eventq_base/lib/eventq_base/signature_providers/sha256_signature_provider.rb @@ -1,11 +1,10 @@ -require 'openssl' -require 'base64' - module EventQ module SignatureProviders class Sha256SignatureProvider def initialize + require 'openssl' + require 'base64' @serializer = serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) end diff --git a/eventq_base/lib/eventq_base/version.rb b/eventq_base/lib/eventq_base/version.rb index 02217cd..311b4b6 100644 --- a/eventq_base/lib/eventq_base/version.rb +++ b/eventq_base/lib/eventq_base/version.rb @@ -1,3 +1,3 @@ module EventqBase - VERSION = "1.12.0" + VERSION = "1.13.0" end From 3d9159e231313166dad06ab63296367447f4f9bc Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 17:32:51 +0100 Subject: [PATCH 02/12] added spec for BinarySerializationprovider --- .../binary_serialization_provider_spec.rb | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 eventq_base/spec/eventq_base/serialization_providers/binary_serialization_provider_spec.rb diff --git a/eventq_base/spec/eventq_base/serialization_providers/binary_serialization_provider_spec.rb b/eventq_base/spec/eventq_base/serialization_providers/binary_serialization_provider_spec.rb new file mode 100644 index 0000000..a0f9ca6 --- /dev/null +++ b/eventq_base/spec/eventq_base/serialization_providers/binary_serialization_provider_spec.rb @@ -0,0 +1,51 @@ +require 'spec_helper' + +class TestClass + attr_accessor :name + attr_accessor :age +end + +RSpec.describe EventQ::SerializationProviders::BinarySerializationProvider do + + let(:object) do + TestClass.new.tap do |e| + e.name = 'joe' + e.age = 33 + end + end + + describe '#serialize' do + context 'when passing a general object' do + specify do + expect(subject.serialize(object)).to eq Marshal::dump(object) + end + end + + context 'when passing a hash' do + let(:object) { { name: 'Jane Doe', age: 34 } } + + specify do + expect(subject.serialize(object)).to eq Marshal::dump(object) + end + end + end + describe '#deserialize' do + context 'when receiving a general object' do + + let(:serialized) { Marshal::dump(object) } + + specify do + expect(subject.deserialize(serialized)).to be_a(TestClass) + end + end + + context 'when receiving a hash' do + let(:object) { { name: 'Jane Doe', age: 34 } } + let(:serialized) { Marshal::dump(object) } + + specify do + expect(subject.deserialize(serialized)).to eq object + end + end + end +end From 649ca7134ba5c8ac9047ea8609ffc2630f77d06a Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 17:52:50 +0100 Subject: [PATCH 03/12] updated eventq_rabbitmq to use new eventq_base gem and implement message content_type --- eventq_base/lib/eventq_base/configuration.rb | 2 +- eventq_base/lib/eventq_base/message_args.rb | 4 +++- eventq_base/lib/eventq_base/queue_message.rb | 1 + eventq_base/lib/eventq_base/version.rb | 2 +- eventq_rabbitmq/Gemfile | 4 +++- eventq_rabbitmq/eventq_rabbitmq.gemspec | 10 ++++------ .../lib/eventq_rabbitmq/rabbitmq_eventq_client.rb | 1 + .../lib/eventq_rabbitmq/rabbitmq_queue_worker.rb | 5 ++++- eventq_rabbitmq/lib/eventq_rabbitmq/version.rb | 2 +- .../eventq_rabbitmq/rabbitmq_eventq_client_spec.rb | 2 ++ .../spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb | 1 + 11 files changed, 22 insertions(+), 12 deletions(-) diff --git a/eventq_base/lib/eventq_base/configuration.rb b/eventq_base/lib/eventq_base/configuration.rb index b46460d..3bd50e9 100644 --- a/eventq_base/lib/eventq_base/configuration.rb +++ b/eventq_base/lib/eventq_base/configuration.rb @@ -6,7 +6,7 @@ def self.serialization_provider=(value) end def self.serialization_provider - @serialization_provider ||= EventQ::SerializationProviders::BINARY_PROVIDER + @serialization_provider ||= EventQ::SerializationProviders::OJ_PROVIDER end def self.signature_provider=(value) diff --git a/eventq_base/lib/eventq_base/message_args.rb b/eventq_base/lib/eventq_base/message_args.rb index 7775c9d..2d5d14c 100644 --- a/eventq_base/lib/eventq_base/message_args.rb +++ b/eventq_base/lib/eventq_base/message_args.rb @@ -1,17 +1,19 @@ module EventQ class MessageArgs attr_reader :type + attr_reader :content_type attr_reader :retry_attempts attr_accessor :abort attr_accessor :drop attr_reader :context - def initialize(type, retry_attempts, context = {}) + def initialize(type:, retry_attempts:, context: {}, content_type:) @type = type @retry_attempts = retry_attempts @abort = false @drop = false @context = context + @content_type = content_type end end end \ No newline at end of file diff --git a/eventq_base/lib/eventq_base/queue_message.rb b/eventq_base/lib/eventq_base/queue_message.rb index f5ae49b..aa1682d 100644 --- a/eventq_base/lib/eventq_base/queue_message.rb +++ b/eventq_base/lib/eventq_base/queue_message.rb @@ -6,6 +6,7 @@ class QueueMessage attr_accessor_type :retry_attempts, type: Integer attr_accessor_type :type, type: String attr_accessor_type :content + attr_accessor_type :content_type, type: String attr_accessor_type :created, type: Time attr_accessor_type :signature, type: String attr_accessor_type :context, type: Hash diff --git a/eventq_base/lib/eventq_base/version.rb b/eventq_base/lib/eventq_base/version.rb index 311b4b6..792e07b 100644 --- a/eventq_base/lib/eventq_base/version.rb +++ b/eventq_base/lib/eventq_base/version.rb @@ -1,3 +1,3 @@ module EventqBase - VERSION = "1.13.0" + VERSION = "1.14.0" end diff --git a/eventq_rabbitmq/Gemfile b/eventq_rabbitmq/Gemfile index ce8948b..2eb2f3d 100644 --- a/eventq_rabbitmq/Gemfile +++ b/eventq_rabbitmq/Gemfile @@ -5,4 +5,6 @@ gemspec gem 'oj', '2.15.0' gem 'json', '1.8.3' -gem 'json_kit' +gem 'redlock' +gem 'openssl' +gem 'pry' diff --git a/eventq_rabbitmq/eventq_rabbitmq.gemspec b/eventq_rabbitmq/eventq_rabbitmq.gemspec index 68942b8..3d661c0 100644 --- a/eventq_rabbitmq/eventq_rabbitmq.gemspec +++ b/eventq_rabbitmq/eventq_rabbitmq.gemspec @@ -19,12 +19,10 @@ Gem::Specification.new do |spec| spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] - spec.add_development_dependency "bundler", "~> 1.11" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_development_dependency "rspec" - spec.add_development_dependency "pry" + spec.add_development_dependency 'bundler', '~> 1.11' + spec.add_development_dependency 'rake', '~> 10.0' + spec.add_development_dependency 'rspec' spec.add_dependency 'bunny' - spec.add_dependency 'eventq_base' - spec.add_dependency 'hash_kit' + spec.add_dependency 'eventq_base', '~> 1.14' end diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb index beb5889..7b602c1 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb @@ -110,6 +110,7 @@ def serialized_message(event_type, event, context) qm.content = event qm.type = event_type qm.context = context + qm.content_type = event.class.to_s if EventQ::Configuration.signature_secret != nil provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider) diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb index 1246665..a92e9db 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -340,7 +340,10 @@ def process_message(payload, queue, channel, retry_exchange, delivery_info, bloc @signature_provider_manager.validate_signature(message: message, queue: queue) - message_args = EventQ::MessageArgs.new(message.type, message.retry_attempts, message.context) + message_args = EventQ::MessageArgs.new(type: message.type, + retry_attempts: message.retry_attempts, + context: message.context, + content_type: message.content_type) if(!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb index 2a876af..190f3c5 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb @@ -1,3 +1,3 @@ module EventqRabbitmq - VERSION = "1.15.0" + VERSION = "1.16.0" end diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb index 3151129..50d8dfb 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb @@ -50,6 +50,7 @@ def receive_message(queue) expect(qm).to_not be_nil expect(qm.content).to eq(message) + expect(qm.content_type).to eq message.class.to_s expect(qm.context).to eq message_context end end @@ -70,6 +71,7 @@ def receive_message(queue) expect(qm).to_not be_nil expect(qm.content).to eq(message) + expect(qm.content_type).to eq message.class.to_s expect(qm.context).to eq message_context end end diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb index d9e0d26..22e8332 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb @@ -246,6 +246,7 @@ subject.start(subscriber_queue, {:sleep => 1, client: client}) do |event, args| expect(event).to eq(message) expect(args.type).to eq(event_type) + expect(args.content_type).to eq message.class.to_s expect(args.context).to eq message_context received = true puts "Message Received: #{event}" From b02e458acfdc83e1758e8cb5e7f40f5423cab477 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 17:59:55 +0100 Subject: [PATCH 04/12] implemented eventq_base changes for message content type --- eventq_aws/Gemfile | 2 ++ eventq_aws/eventq_aws.gemspec | 2 +- eventq_aws/lib/eventq_aws/aws_eventq_client.rb | 1 + eventq_aws/lib/eventq_aws/aws_queue_worker.rb | 5 ++++- eventq_aws/lib/eventq_aws/version.rb | 2 +- 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/eventq_aws/Gemfile b/eventq_aws/Gemfile index 1bb2743..adc6d12 100644 --- a/eventq_aws/Gemfile +++ b/eventq_aws/Gemfile @@ -6,3 +6,5 @@ gemspec gem 'oj', '2.15.0' gem 'json', '1.8.3' gem 'openssl', '2.0.4' +gem 'pry' +gem 'redlock' diff --git a/eventq_aws/eventq_aws.gemspec b/eventq_aws/eventq_aws.gemspec index 4e17446..01bb624 100644 --- a/eventq_aws/eventq_aws.gemspec +++ b/eventq_aws/eventq_aws.gemspec @@ -25,5 +25,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency "pry" spec.add_dependency 'aws-sdk-core' - spec.add_dependency 'eventq_base' + spec.add_dependency 'eventq_base', '~> 1.14' end diff --git a/eventq_aws/lib/eventq_aws/aws_eventq_client.rb b/eventq_aws/lib/eventq_aws/aws_eventq_client.rb index a986aff..c784368 100644 --- a/eventq_aws/lib/eventq_aws/aws_eventq_client.rb +++ b/eventq_aws/lib/eventq_aws/aws_eventq_client.rb @@ -86,6 +86,7 @@ def with_prepared_message(event_type, event, context) qm.content = event qm.type = event_type qm.context = context + qm.content_type = event.class.to_s if EventQ::Configuration.signature_secret != nil provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider) diff --git a/eventq_aws/lib/eventq_aws/aws_queue_worker.rb b/eventq_aws/lib/eventq_aws/aws_queue_worker.rb index df34a23..a4b7ab0 100644 --- a/eventq_aws/lib/eventq_aws/aws_queue_worker.rb +++ b/eventq_aws/lib/eventq_aws/aws_queue_worker.rb @@ -250,7 +250,10 @@ def process_message(response, client, queue, q, block) payload = JSON.load(msg.body) message = deserialize_message(payload[MESSAGE]) - message_args = EventQ::MessageArgs.new(message.type, retry_attempts, message.context) + message_args = EventQ::MessageArgs.new(type: message.type, + retry_attempts: retry_attempts, + context: message.context, + content_type: message.content_type) EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{retry_attempts}") diff --git a/eventq_aws/lib/eventq_aws/version.rb b/eventq_aws/lib/eventq_aws/version.rb index 7f54990..66a0f07 100644 --- a/eventq_aws/lib/eventq_aws/version.rb +++ b/eventq_aws/lib/eventq_aws/version.rb @@ -1,5 +1,5 @@ module EventQ module Amazon - VERSION = "1.12.0" + VERSION = "1.13.0" end end From 53b3f40574c78b54df750990596d7e615b31f496 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 18:03:36 +0100 Subject: [PATCH 05/12] update gemspec to remove pry dependency --- eventq_aws/eventq_aws.gemspec | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/eventq_aws/eventq_aws.gemspec b/eventq_aws/eventq_aws.gemspec index 01bb624..117152b 100644 --- a/eventq_aws/eventq_aws.gemspec +++ b/eventq_aws/eventq_aws.gemspec @@ -19,10 +19,9 @@ Gem::Specification.new do |spec| spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] - spec.add_development_dependency "bundler", "~> 1.11" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_development_dependency "rspec" - spec.add_development_dependency "pry" + spec.add_development_dependency 'bundler', '~> 1.11' + spec.add_development_dependency 'rake', '~> 10.0' + spec.add_development_dependency 'rspec' spec.add_dependency 'aws-sdk-core' spec.add_dependency 'eventq_base', '~> 1.14' From 1e3539bec019c09da4801716a024b235b00d99e8 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 18:19:56 +0100 Subject: [PATCH 06/12] resolved issue with oj & json being required when not used --- .../serialization_providers/json_serialization_provider.rb | 2 +- .../serialization_providers/oj_serialization_provider.rb | 3 ++- eventq_base/lib/eventq_base/version.rb | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb index 5415a93..7634808 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb @@ -3,7 +3,6 @@ module SerializationProviders class JsonSerializationProvider def initialize - require 'json' require 'class_kit' require 'hash_kit' @class_kit_helper = ClassKit::Helper.new @@ -11,6 +10,7 @@ def initialize end def serialize(object) + require 'json' JSON.dump(object_to_hash(object)) end diff --git a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb index ed1d58d..23d8281 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb @@ -3,15 +3,16 @@ module SerializationProviders class OjSerializationProvider def initialize - require 'oj' @json_serializer = EventQ::SerializationProviders::JsonSerializationProvider.new end def serialize(object) + require 'oj' return Oj.dump(object, mode: :object) end def deserialize(json) + require 'oj' begin return Oj.load(json) rescue Oj::ParseError diff --git a/eventq_base/lib/eventq_base/version.rb b/eventq_base/lib/eventq_base/version.rb index 792e07b..353077b 100644 --- a/eventq_base/lib/eventq_base/version.rb +++ b/eventq_base/lib/eventq_base/version.rb @@ -1,3 +1,3 @@ module EventqBase - VERSION = "1.14.0" + VERSION = "1.14.1" end From 9a3a54b35bdc67e10cf7175a459c1517522e192e Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Sun, 25 Jun 2017 18:42:27 +0100 Subject: [PATCH 07/12] bugfix to remove oj require --- eventq_rabbitmq/lib/eventq_rabbitmq.rb | 1 - eventq_rabbitmq/lib/eventq_rabbitmq/version.rb | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq.rb b/eventq_rabbitmq/lib/eventq_rabbitmq.rb index 38db791..ae6a676 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq.rb @@ -1,6 +1,5 @@ require 'eventq_base' require 'bunny' -require 'oj' require 'hash_kit' require_relative '../lib/eventq_rabbitmq/version' require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_client' diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb index 190f3c5..f061992 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb @@ -1,3 +1,3 @@ module EventqRabbitmq - VERSION = "1.16.0" + VERSION = "1.16.1" end From eda320dc351a78c476901e0377c034dde6a2e5d2 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Mon, 26 Jun 2017 12:06:34 +0100 Subject: [PATCH 08/12] added jruby support --- eventq_base/Gemfile | 13 +++++++++++-- eventq_base/eventq_base.gemspec | 10 ++++++++++ eventq_base/lib/eventq_base.rb | 2 ++ eventq_base/lib/eventq_base/configuration.rb | 6 +++++- .../lib/eventq_base/serialization_providers.rb | 8 ++++++-- .../json_serialization_provider.rb | 1 - .../oj_serialization_provider.rb | 3 +-- eventq_base/lib/eventq_base/version.rb | 2 +- eventq_base/script/docker-compose.yml | 12 +++++++++++- eventq_base/script/test.sh | 3 ++- .../json_serialization_provider_spec.rb | 10 ++++++---- 11 files changed, 55 insertions(+), 15 deletions(-) diff --git a/eventq_base/Gemfile b/eventq_base/Gemfile index bf87e43..4e5e45b 100644 --- a/eventq_base/Gemfile +++ b/eventq_base/Gemfile @@ -4,8 +4,17 @@ source 'https://rubygems.org' gemspec gem 'rake', '10.5.0' -gem 'oj', '2.15.0' gem 'json', '1.8.3' gem 'rspec' gem 'redlock' -gem 'pry' + +platforms :ruby do + gem 'oj', '2.15.0' + gem 'openssl', '2.0.3' + gem 'pry' +end + +platforms :jruby do + gem 'pry-debugger-jruby' + gem 'jruby-openssl' +end diff --git a/eventq_base/eventq_base.gemspec b/eventq_base/eventq_base.gemspec index bf85ad3..3360e4c 100644 --- a/eventq_base/eventq_base.gemspec +++ b/eventq_base/eventq_base.gemspec @@ -24,4 +24,14 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rspec" spec.add_dependency 'class_kit' + spec.add_dependency 'redlock' + + if RUBY_PLATFORM =~ /java/ + spec.platform = 'java' + spec.add_dependency('jruby-openssl') + else + spec.add_dependency('oj') + spec.add_dependency('openssl') + end + end diff --git a/eventq_base/lib/eventq_base.rb b/eventq_base/lib/eventq_base.rb index 1c96498..be78f3b 100644 --- a/eventq_base/lib/eventq_base.rb +++ b/eventq_base/lib/eventq_base.rb @@ -1,6 +1,8 @@ require 'securerandom' require 'redlock' require 'class_kit' +require 'hash_kit' + require_relative 'eventq_base/version' require_relative 'eventq_base/eventq_logger' diff --git a/eventq_base/lib/eventq_base/configuration.rb b/eventq_base/lib/eventq_base/configuration.rb index 3bd50e9..b6f2a07 100644 --- a/eventq_base/lib/eventq_base/configuration.rb +++ b/eventq_base/lib/eventq_base/configuration.rb @@ -6,7 +6,11 @@ def self.serialization_provider=(value) end def self.serialization_provider - @serialization_provider ||= EventQ::SerializationProviders::OJ_PROVIDER + if RUBY_PLATFORM =~ /java/ + @serialization_provider ||= EventQ::SerializationProviders::JSON_PROVIDER + else + @serialization_provider ||= EventQ::SerializationProviders::OJ_PROVIDER + end end def self.signature_provider=(value) diff --git a/eventq_base/lib/eventq_base/serialization_providers.rb b/eventq_base/lib/eventq_base/serialization_providers.rb index 678ec98..4736a0a 100644 --- a/eventq_base/lib/eventq_base/serialization_providers.rb +++ b/eventq_base/lib/eventq_base/serialization_providers.rb @@ -1,5 +1,7 @@ require_relative 'serialization_providers/json_serialization_provider' -require_relative 'serialization_providers/oj_serialization_provider' +unless RUBY_PLATFORM =~ /java/ + require_relative 'serialization_providers/oj_serialization_provider' +end require_relative 'serialization_providers/binary_serialization_provider' module EventQ @@ -12,7 +14,9 @@ module SerializationProviders class Manager def initialize @providers = {} - @providers[OJ_PROVIDER] = EventQ::SerializationProviders::OjSerializationProvider + unless RUBY_PLATFORM =~ /java/ + @providers[OJ_PROVIDER] = EventQ::SerializationProviders::OjSerializationProvider + end @providers[JSON_PROVIDER] = EventQ::SerializationProviders::JsonSerializationProvider @providers[BINARY_PROVIDER] = EventQ::SerializationProviders::BinarySerializationProvider end diff --git a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb index 7634808..4c2f67f 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/json_serialization_provider.rb @@ -10,7 +10,6 @@ def initialize end def serialize(object) - require 'json' JSON.dump(object_to_hash(object)) end diff --git a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb index 23d8281..ed1d58d 100644 --- a/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb +++ b/eventq_base/lib/eventq_base/serialization_providers/oj_serialization_provider.rb @@ -3,16 +3,15 @@ module SerializationProviders class OjSerializationProvider def initialize + require 'oj' @json_serializer = EventQ::SerializationProviders::JsonSerializationProvider.new end def serialize(object) - require 'oj' return Oj.dump(object, mode: :object) end def deserialize(json) - require 'oj' begin return Oj.load(json) rescue Oj::ParseError diff --git a/eventq_base/lib/eventq_base/version.rb b/eventq_base/lib/eventq_base/version.rb index 353077b..3d1c679 100644 --- a/eventq_base/lib/eventq_base/version.rb +++ b/eventq_base/lib/eventq_base/version.rb @@ -1,3 +1,3 @@ module EventqBase - VERSION = "1.14.1" + VERSION = "1.15.0" end diff --git a/eventq_base/script/docker-compose.yml b/eventq_base/script/docker-compose.yml index 773c555..efa6fb5 100644 --- a/eventq_base/script/docker-compose.yml +++ b/eventq_base/script/docker-compose.yml @@ -1,6 +1,16 @@ testrunner: image: eventq/base - container_name: gem_test_runner + container_name: testrunner + command: bash -c "./scripts/container_loop.sh" + links: + - redis + volumes: + - ./container_loop.sh:/scripts/container_loop.sh + - ../:/gem_src + +testrunner2: + image: 522104923602.dkr.ecr.eu-west-1.amazonaws.com/sageone/jruby:20170623 + container_name: testrunner_jruby command: bash -c "./scripts/container_loop.sh" links: - redis diff --git a/eventq_base/script/test.sh b/eventq_base/script/test.sh index 3f5396d..e77bf3e 100755 --- a/eventq_base/script/test.sh +++ b/eventq_base/script/test.sh @@ -3,4 +3,5 @@ echo start rspec tests docker-compose up -d -docker exec -it gem_test_runner bash -c "cd gem_src && bundle install && bundle exec rspec $*" +docker exec -it testrunner bash -c "cd gem_src && bundle install && bundle exec rspec $*" \ +&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" diff --git a/eventq_base/spec/eventq_base/serialization_providers/json_serialization_provider_spec.rb b/eventq_base/spec/eventq_base/serialization_providers/json_serialization_provider_spec.rb index dc9328f..8553fe1 100644 --- a/eventq_base/spec/eventq_base/serialization_providers/json_serialization_provider_spec.rb +++ b/eventq_base/spec/eventq_base/serialization_providers/json_serialization_provider_spec.rb @@ -5,6 +5,7 @@ context 'when passing a general object' do let(:object_class) do Class.new do + attr_reader :name, :age def initialize @name = 'John Doe' @age = 33 @@ -12,19 +13,20 @@ def initialize end end let(:object) { object_class.new } - let(:expected) { { name: 'John Doe', age: 33 }.to_json } specify do - expect(subject.serialize(object)).to eql expected + json = subject.serialize(object) + obj = JSON.load(json) + expect(obj['name']).to eql object.name + expect(obj['age']).to eql object.age end end context 'when passing a hash' do let(:object) { { name: 'Jane Doe', age: 34 } } - let(:expected) { object.to_json } specify do - expect(subject.serialize(object)).to eql expected + expect(subject.serialize(object)).to eql JSON.dump(object) end end end From 57c2f2a10ed47df21928751fda33969c47712a79 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Mon, 26 Jun 2017 16:20:57 +0100 Subject: [PATCH 09/12] wip, rabbitmq jruby --- eventq_rabbitmq/Gemfile | 14 ++- eventq_rabbitmq/eventq_rabbitmq.gemspec | 10 +- eventq_rabbitmq/lib/eventq_rabbitmq.rb | 7 +- .../eventq_rabbitmq/rabbitmq_eventq_client.rb | 2 +- .../eventq_rabbitmq/rabbitmq_queue_client.rb | 9 +- .../eventq_rabbitmq/rabbitmq_queue_manager.rb | 16 ++++ .../eventq_rabbitmq/rabbitmq_queue_worker.rb | 16 ++-- .../lib/eventq_rabbitmq/version.rb | 2 +- eventq_rabbitmq/script/docker-compose.yml | 13 ++- eventq_rabbitmq/script/test.sh | 3 +- .../rabbitmq_eventq_client_spec.rb | 32 +++++-- .../rabbitmq_queue_client_spec.rb | 53 ++++++++--- .../rabbitmq_queue_worker_spec.rb | 94 +++++++++---------- eventq_rabbitmq/spec/spec_helper.rb | 3 +- 14 files changed, 181 insertions(+), 93 deletions(-) diff --git a/eventq_rabbitmq/Gemfile b/eventq_rabbitmq/Gemfile index 2eb2f3d..251cf1a 100644 --- a/eventq_rabbitmq/Gemfile +++ b/eventq_rabbitmq/Gemfile @@ -3,8 +3,16 @@ source 'http://rubygems.org' # Specify your gem's dependencies in eventq_rabbitmq.gemspec gemspec -gem 'oj', '2.15.0' gem 'json', '1.8.3' gem 'redlock' -gem 'openssl' -gem 'pry' + +platforms :ruby do + gem 'openssl' + gem 'oj', '2.15.0' + gem 'pry' +end + +platforms :jruby do + gem 'pry-debugger-jruby' + gem 'jruby-openssl' +end diff --git a/eventq_rabbitmq/eventq_rabbitmq.gemspec b/eventq_rabbitmq/eventq_rabbitmq.gemspec index 3d661c0..ff6a7b6 100644 --- a/eventq_rabbitmq/eventq_rabbitmq.gemspec +++ b/eventq_rabbitmq/eventq_rabbitmq.gemspec @@ -23,6 +23,12 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rake', '~> 10.0' spec.add_development_dependency 'rspec' - spec.add_dependency 'bunny' - spec.add_dependency 'eventq_base', '~> 1.14' + spec.add_dependency 'eventq_base', '~> 1.15' + + if RUBY_PLATFORM =~ /java/ + spec.platform = 'java' + spec.add_dependency 'march_hare' + else + spec.add_dependency 'bunny' + end end diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq.rb b/eventq_rabbitmq/lib/eventq_rabbitmq.rb index ae6a676..86200d9 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq.rb @@ -1,5 +1,10 @@ require 'eventq_base' -require 'bunny' +if RUBY_PLATFORM =~ /java/ + require 'march_hare' +else + require 'bunny' +end + require 'hash_kit' require_relative '../lib/eventq_rabbitmq/version' require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_client' diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb index 7b602c1..a237232 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_eventq_client.rb @@ -98,7 +98,7 @@ def with_connection yield(channel) ensure - channel&.close + channel&.close if channel.open? connection.close end diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_client.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_client.rb index ba0679b..8f5cd1a 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_client.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_client.rb @@ -30,7 +30,7 @@ def connection_options :pass => @password, :ssl => @ssl, :read_timeout => 4, - :heartbeat => 20, + :heartbeat => 8, :continuation_timeout => 5000, :automatically_recover => true, :network_recovery_interval => 1, @@ -39,7 +39,12 @@ def connection_options end def get_connection - conn = Bunny.new(connection_options) + if RUBY_PLATFORM =~ /java/ + conn = MarchHare.connect(connection_options) + else + conn = Bunny.new(connection_options) + end + conn.start return conn end diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_manager.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_manager.rb index 0d6cd9b..a99f128 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_manager.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_manager.rb @@ -32,6 +32,22 @@ def get_queue(channel, queue) return q end + def pop_message(queue:) + if RUBY_PLATFORM =~ /java/ + headers, payload = queue.pop({ :ack => true, :block => true }) + if headers == nil + return [nil,nil] + end + [headers.delivery_tag, payload] + else + headers, properties, payload = queue.pop({ :manual_ack => true, :block => true }) + if headers == nil + return [nil,nil] + end + [headers.delivery_tag, payload] + end + end + def get_queue_exchange(channel, queue) _exchange_name = EventQ.create_exchange_name(queue.name) channel.direct("#{_exchange_name}.ex") diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb index a92e9db..6ae43cb 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -104,7 +104,7 @@ def start_process(options, queue, block) call_on_error_block(error: e) end - if channel != nil && channel.status != :closed + if channel != nil && channel.open? channel.close end @@ -167,7 +167,7 @@ def thread_process_iteration(channel, manager, queue, block) received = false begin - delivery_info, properties, payload = q.pop(:manual_ack => true, :block => true) + delivery_info, payload = manager.pop_message(queue: q) #check that message was received if payload != nil @@ -254,11 +254,11 @@ def call_on_retry_block(message) end end - def reject_message(channel, message, delivery_info, retry_exchange, queue, abort) + def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") #reject the message to remove from queue - channel.reject(delivery_info.delivery_tag, false) + channel.reject(delivery_tag, false) #check if the message retry limit has been exceeded if message.retry_attempts >= queue.max_retry_attempts @@ -331,7 +331,7 @@ def configure(queue, options = {}) private - def process_message(payload, queue, channel, retry_exchange, delivery_info, block) + def process_message(payload, queue, channel, retry_exchange, delivery_tag, block) abort = false error = false message = deserialize_message(payload) @@ -347,7 +347,7 @@ def process_message(payload, queue, channel, retry_exchange, delivery_info, bloc if(!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") - channel.acknowledge(delivery_info.delivery_tag, false) + channel.acknowledge(delivery_tag, false) return false end @@ -360,7 +360,7 @@ def process_message(payload, queue, channel, retry_exchange, delivery_info, bloc EventQ.logger.info("[#{self.class}] - Message aborted.") else #accept the message as processed - channel.acknowledge(delivery_info.delivery_tag, false) + channel.acknowledge(delivery_tag, false) EventQ.logger.info("[#{self.class}] - Message acknowledged.") end @@ -372,7 +372,7 @@ def process_message(payload, queue, channel, retry_exchange, delivery_info, bloc if error || abort EventQ::NonceManager.failed(message.id) - reject_message(channel, message, delivery_info, retry_exchange, queue, abort) + reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) else EventQ::NonceManager.complete(message.id) end diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb index f061992..834d25c 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/version.rb @@ -1,3 +1,3 @@ module EventqRabbitmq - VERSION = "1.16.1" + VERSION = "1.17.0" end diff --git a/eventq_rabbitmq/script/docker-compose.yml b/eventq_rabbitmq/script/docker-compose.yml index 782aeb7..ee86474 100644 --- a/eventq_rabbitmq/script/docker-compose.yml +++ b/eventq_rabbitmq/script/docker-compose.yml @@ -4,7 +4,18 @@ rabbitmq: testrunner: image: eventq/rabbitmq - container_name: gem_test_runner + container_name: testrunner + command: bash -c "./scripts/container_loop.sh" + links: + - rabbitmq + - redis + volumes: + - ./container_loop.sh:/scripts/container_loop.sh + - ../.:/gem_src + +testrunner2: + image: 522104923602.dkr.ecr.eu-west-1.amazonaws.com/sageone/jruby:20170623 + container_name: testrunner_jruby command: bash -c "./scripts/container_loop.sh" links: - rabbitmq diff --git a/eventq_rabbitmq/script/test.sh b/eventq_rabbitmq/script/test.sh index 333405f..90e9379 100755 --- a/eventq_rabbitmq/script/test.sh +++ b/eventq_rabbitmq/script/test.sh @@ -3,4 +3,5 @@ echo start rspec tests docker-compose up -d -docker exec -it gem_test_runner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" +docker exec -it testrunner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" \ +&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb index 50d8dfb..da774a5 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb @@ -14,7 +14,9 @@ let(:event_type) { 'test_event1' } let(:message) { 'Hello World' } - let(:message_context) { { foo: 'bar' } } + let(:message_context) { { 'foo' => 'bar' } } + + let(:class_kit) { ClassKit::Helper.new } subject do EventQ::RabbitMq::EventQClient.new({client: client, subscription_manager: subscription_manager}) @@ -27,13 +29,25 @@ end def receive_message(queue) - _delivery_info, _properties, payload = queue.pop - qm = Oj.load(payload.to_s) - puts "[QUEUE] - received message: #{qm&.content.inspect}" + delivery_tag, payload = queue_manager.pop_message(queue: queue) + if payload == nil + return nil + end + if RUBY_PLATFORM =~ /java/ + hash = JSON.load(payload.to_s) + if hash == nil + return nil + end + qm = class_kit.from_hash(hash: hash, klass: EventQ::QueueMessage) + else + qm = Oj.load(payload.to_s) + if qm == nil + return nil + end + end + + puts "[QUEUE] - received message: #{qm.content}" qm - rescue Timeout::Error - puts 'Failed due to connection timeout.' - nil end describe '#publish' do @@ -114,7 +128,7 @@ def receive_message(queue) expect(qm).to be_nil puts '[QUEUE] waiting for message...' - sleep 2.2 + sleep 2.5 qm = receive_message(queue) expect(qm).to_not be_nil @@ -199,7 +213,7 @@ def receive_message(queue) end after do - channel.close + channel.close if channel.open? connection.close end end diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb index 63e140d..83309cb 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb @@ -10,6 +10,8 @@ it 'should use dead-letter exchange' do + manager = EventQ::RabbitMq::QueueManager.new + x = channel.fanout("amq.fanout") dlx = channel.fanout("bunny.examples.dlx.exchange") q = channel.queue("subscriber", :durable => true, :arguments => {"x-dead-letter-exchange" => dlx.name}).bind(x, :routing_key => 'post') @@ -19,13 +21,18 @@ x.publish("", :routing_key => 'post') sleep 0.2 - delivery_info, _, _ = q.pop(:manual_ack => true) + delivery_tag, payload = manager.pop_message(queue: q) puts "#{dlq.message_count} messages dead lettered so far" expect(dlq.message_count).to eq(0) puts "Rejecting a message" - channel.nack(delivery_info.delivery_tag, false) + channel.nack(delivery_tag) + sleep 0.2 + + channel = connection.create_channel + dlx = channel.fanout("bunny.examples.dlx.exchange") + dlq = channel.queue("subscriber_retry", :exclusive => true).bind(dlx) puts "#{dlq.message_count} messages dead lettered so far" expect(dlq.message_count).to eq(1) @@ -35,36 +42,40 @@ it 'should use a delay queue correctly' do - retry_exchange = channel.fanout('retry.exchange') - subscriber_exchange = channel.fanout('subscriber.exchange') + retry_exchange = channel.fanout(SecureRandom.uuid) + subscriber_exchange = channel.fanout(SecureRandom.uuid) retry_queue_def = EventQ::Queue.new - retry_queue_def.name = 'retry.queue' + retry_queue_def.name = SecureRandom.uuid queue_manager = EventQ::RabbitMq::QueueManager.new retry_queue = channel.queue(retry_queue_def.name, :arguments => { "x-dead-letter-exchange" => subscriber_exchange.name, "x-message-ttl" => 600 }).bind(retry_exchange) - subscriber_queue = channel.queue('subscriber.queue').bind(subscriber_exchange) + subscriber_queue_name = SecureRandom.uuid + subscriber_queue = channel.queue(subscriber_queue_name).bind(subscriber_exchange) message = 'Hello World' retry_exchange.publish(message) - delivery_info, properties, payload = retry_queue.pop(:manual_ack => true) + delivery_tag, payload = queue_manager.pop_message(queue: retry_queue) expect(payload).to eq(message) - channel.close + retry_queue.purge + + retry_exchange.publish(message) channel = connection.create_channel - subscriber_queue = channel.queue('subscriber.queue') + subscriber_queue = channel.queue(subscriber_queue_name).bind(subscriber_exchange) sleep(1) - delivery_info, properties, payload = subscriber_queue.pop(:manual_ack => true) + delivery_tag, payload = queue_manager.pop_message(queue: subscriber_queue) + expect(payload).to eq(message) - channel.acknowledge(delivery_info.delivery_tag, false) + channel.acknowledge(delivery_tag, false) end @@ -88,14 +99,28 @@ sleep(2) - delivery_info, properties, payload = queue.pop(:manual_ack => true) + delivery_tag, payload = qm.pop_message(queue: queue) expect(payload).to eq(message) - channel.acknowledge(delivery_info.delivery_tag, false) + channel.acknowledge(delivery_tag, false) + + end + + it 'should deliver a message from a queue' do + manager = EventQ::RabbitMq::QueueManager.new + queue = channel.queue(SecureRandom.uuid, :durable => true) + exchange = channel.fanout(SecureRandom.uuid) + queue.bind(exchange) + + exchange.publish('Hello World') + sleep 0.5 + + delivery_tag, payload = manager.pop_message(queue: queue) + expect(payload).to eq 'Hello World' end after do - channel.close + channel.close if channel.open? connection.close end diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb index 22e8332..d341dc1 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb @@ -9,50 +9,52 @@ let(:channel) { connection.create_channel } after do - channel.close + channel.close if channel.open? connection.close end describe '#deserialize_message' do - context 'when serialization provider is OJ_PROVIDER' do - before do - EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER - end - context 'when payload is for a known type' do - let(:a) do - A.new.tap do |a| - a.text = 'ABC' - end + unless RUBY_PLATFORM =~ /java/ + context 'when serialization provider is OJ_PROVIDER' do + before do + EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER end - let(:payload) { Oj.dump(a) } - it 'should deserialize the message into an object of the known type' do - message = subject.deserialize_message(payload) - expect(message).to be_a(A) - expect(message.text).to eq('ABC') - end - end - context 'when payload is for an unknown type' do - let(:a) do - A.new.tap do |a| - a.text = 'ABC' + context 'when payload is for a known type' do + let(:a) do + A.new.tap do |a| + a.text = 'ABC' + end end - end - let(:payload) do - string = Oj.dump(a) - JSON.load(string.sub('"^o":"A"', '"^o":"B"')) - end - let(:message) do - EventQ::QueueMessage.new.tap do |m| - m.content = payload + let(:payload) { Oj.dump(a) } + it 'should deserialize the message into an object of the known type' do + message = subject.deserialize_message(payload) + expect(message).to be_a(A) + expect(message.text).to eq('ABC') end end - let(:json) do - Oj.dump(message) - end - it 'should deserialize the message into a Hash' do - message = subject.deserialize_message(json) - expect(message.content).to be_a(Hash) - expect(message.content[:text]).to eq('ABC') + context 'when payload is for an unknown type' do + let(:a) do + A.new.tap do |a| + a.text = 'ABC' + end + end + let(:payload) do + string = Oj.dump(a) + JSON.load(string.sub('"^o":"A"', '"^o":"B"')) + end + let(:message) do + EventQ::QueueMessage.new.tap do |m| + m.content = payload + end + end + let(:json) do + Oj.dump(message) + end + it 'should deserialize the message into a Hash' do + message = subject.deserialize_message(json) + expect(message.content).to be_a(Hash) + expect(message.content[:text]).to eq('ABC') + end end end end @@ -74,8 +76,10 @@ expect(message.content).to be_a(Hash) expect(message.content[:text]).to eq('ABC') end - after do - EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER + unless RUBY_PLATFORM =~ /java/ + after do + EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER + end end end end @@ -236,30 +240,24 @@ subscription_manager.subscribe(event_type, subscriber_queue) message = 'Hello World' - message_context = { foo: 'bar' } + message_context = { 'foo' => 'bar' } eqclient = EventQ::RabbitMq::EventQClient.new({client: client, subscription_manager: subscription_manager}) eqclient.raise_event(event_type, message, message_context) - received = false - - subject.start(subscriber_queue, {:sleep => 1, client: client}) do |event, args| + subject.start(subscriber_queue, {:sleep => 1, client: client, thread_count: 1 }) do |event, args| expect(event).to eq(message) expect(args.type).to eq(event_type) expect(args.content_type).to eq message.class.to_s expect(args.context).to eq message_context - received = true puts "Message Received: #{event}" end - sleep(0.5) - - expect(received).to eq(true) + sleep(1) subject.stop expect(subject.is_running).to eq(false) - end context 'when queue requires a signature' do @@ -292,7 +290,7 @@ puts "Message Received: #{event}" end - sleep(0.5) + sleep(1) expect(received).to eq(true) diff --git a/eventq_rabbitmq/spec/spec_helper.rb b/eventq_rabbitmq/spec/spec_helper.rb index 4bad25a..466ddbf 100644 --- a/eventq_rabbitmq/spec/spec_helper.rb +++ b/eventq_rabbitmq/spec/spec_helper.rb @@ -1,5 +1,5 @@ -require 'bunny' require 'eventq_base' + require 'pry' require_relative '../lib/eventq_rabbitmq' @@ -18,7 +18,6 @@ mocks.verify_partial_doubles = true end - config.filter_run :focus config.run_all_when_everything_filtered = true config.disable_monkey_patching! From 84a780739ee71941cccd1fd890bdccc0597214a5 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Mon, 26 Jun 2017 19:48:43 +0100 Subject: [PATCH 10/12] wip jruby rabbitmq worker --- .../lib/eventq_rabbitmq/rabbitmq_queue_worker.rb | 8 +++++--- eventq_rabbitmq/script/test.sh | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb index 6ae43cb..01690f3 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -87,7 +87,7 @@ def start_process(options, queue, block) while true do #check if the worker is still allowed to run and break out of thread loop if not - if !@is_running + unless running? break end @@ -192,9 +192,11 @@ def thread_process_iteration(channel, manager, queue, block) def stop puts "[#{self.class}] - Stopping..." @is_running = false - @threads.each { |thr| thr.join } + Thread.list.each do |thread| + thread.exit unless thread == Thread.current + end if @connection != nil - @connection.close + @connection.close if @connection.open? end return true end diff --git a/eventq_rabbitmq/script/test.sh b/eventq_rabbitmq/script/test.sh index 90e9379..f20f03e 100755 --- a/eventq_rabbitmq/script/test.sh +++ b/eventq_rabbitmq/script/test.sh @@ -3,5 +3,7 @@ echo start rspec tests docker-compose up -d -docker exec -it testrunner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" \ -&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" +#docker exec -it testrunner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" \ +#&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" + +docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" \ No newline at end of file From 90b85bd389ee4eb5307a2ce206fd01f2784a94e8 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Tue, 27 Jun 2017 09:51:15 +0100 Subject: [PATCH 11/12] resolved issue with jruby rabbitmq worker specs --- eventq_rabbitmq/.rspec | 2 +- eventq_rabbitmq/lib/eventq_rabbitmq.rb | 8 +- .../jruby/rabbitmq_queue_worker.rb | 373 ++++++++++++++++++ .../eventq_rabbitmq/rabbitmq_queue_worker.rb | 10 +- eventq_rabbitmq/script/test.sh | 6 +- .../rabbitmq_eventq_client_spec.rb | 18 +- .../rabbitmq_queue_client_spec.rb | 8 +- .../rabbitmq_queue_worker_spec.rb | 20 +- eventq_rabbitmq/spec/spec_helper.rb | 2 + 9 files changed, 418 insertions(+), 29 deletions(-) create mode 100644 eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb diff --git a/eventq_rabbitmq/.rspec b/eventq_rabbitmq/.rspec index 7e3eb5c..72cf692 100644 --- a/eventq_rabbitmq/.rspec +++ b/eventq_rabbitmq/.rspec @@ -1,3 +1,3 @@ --color --require spec_helper ---format doc +--format progress diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq.rb b/eventq_rabbitmq/lib/eventq_rabbitmq.rb index 86200d9..6cf5ee1 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq.rb @@ -9,7 +9,13 @@ require_relative '../lib/eventq_rabbitmq/version' require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_client' require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_manager' -require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_worker' + +if RUBY_PLATFORM =~ /java/ + require_relative '../lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker' +else + require_relative '../lib/eventq_rabbitmq/rabbitmq_queue_worker' +end + require_relative '../lib/eventq_rabbitmq/rabbitmq_subscription_manager' require_relative '../lib/eventq_rabbitmq/rabbitmq_eventq_client' require_relative '../lib/eventq_rabbitmq/default_queue' diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb new file mode 100644 index 0000000..fe20f28 --- /dev/null +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb @@ -0,0 +1,373 @@ +require 'java' +java_import java.util.concurrent.Executors +module EventQ + module RabbitMq + class QueueWorker + include EventQ::WorkerId + + attr_accessor :is_running + + def initialize + @threads = [] + @forks = [] + @is_running = false + + @retry_exceeded_block = nil + @on_retry_block = nil + @on_error_block = nil + @hash_helper = HashKit::Helper.new + @serialization_provider_manager = EventQ::SerializationProviders::Manager.new + @signature_provider_manager = EventQ::SignatureProviders::Manager.new + @last_gc_flush = Time.now + @gc_flush_interval = 10 + end + + def start(queue, options = {}, &block) + + EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") + + configure(queue, options) + + raise "[#{self.class}] - Worker is already running." if running? + + if options[:client] == nil + raise "[#{self.class}] - :client (QueueClient) must be specified." + end + + EventQ.logger.info("[#{self.class}] - Listening for messages.") + EventQ.logger.debug do + "[#{self.class} #start] - Listening for messages on queue: #{EventQ.create_queue_name(queue.name)}" + end + + start_process(options, queue, block) + + end + + def start_process(options, queue, block) + + @is_running = true + + %w'INT TERM'.each do |sig| + Signal.trap(sig) { + stop + exit + } + end + + if !options.key?(:durable) + options[:durable] = true + end + + client = options[:client] + manager = EventQ::RabbitMq::QueueManager.new + manager.durable = options[:durable] + @connection = client.get_connection + + @executor = java.util.concurrent.Executors::newFixedThreadPool @thread_count + + #loop through each thread count + @thread_count.times do + + @executor.execute do + + #begin the queue loop for this thread + while true do + + #check if the worker is still allowed to run and break out of thread loop if not + unless running? + break + end + + if @executor.is_shutdown + break + end + + has_received_message = false + + begin + + channel = @connection.create_channel + + has_received_message = thread_process_iteration(channel, manager, queue, block) + + rescue => e + EventQ.logger.error("An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}") + call_on_error_block(error: e) + end + + if channel != nil && channel.open? + channel.close + end + + gc_flush + + if !has_received_message + EventQ.logger.debug { "[#{self.class}] - No message received." } + if @sleep > 0 + EventQ.logger.debug { "[#{self.class}] - Sleeping for #{@sleep} seconds" } + sleep(@sleep) + end + end + + end + + end + + end + + if options.key?(:wait) && options[:wait] == true + while running? do end + @connection.close if @connection.open? + end + + return true + + end + + def call_on_error_block(error:, message: nil) + if @on_error_block + EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } + begin + @on_error_block.call(error, message) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } + end + end + + def gc_flush + if Time.now - last_gc_flush > @gc_flush_interval + GC.start + @last_gc_flush = Time.now + end + end + + def last_gc_flush + @last_gc_flush + end + + def thread_process_iteration(channel, manager, queue, block) + + #get the queue + q = manager.get_queue(channel, queue) + retry_exchange = manager.get_retry_exchange(channel, queue) + + received = false + + begin + delivery_info, payload = manager.pop_message(queue: q) + + #check that message was received + if payload != nil + received = true + begin + tag_processing_thread + process_message(payload, queue, channel, retry_exchange, delivery_info, block) + ensure + untag_processing_thread + end + + end + + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | Backtrace: #{e.backtrace}") + call_on_error_block(error: e) + end + + return received + end + + def stop + EventQ.logger.info { "[#{self.class}] - Stopping..." } + @is_running = false + @executor.shutdown + if @connection != nil + @connection.close if @connection.open? + end + return true + end + + def on_retry_exceeded(&block) + @retry_exceeded_block = block + return nil + end + + def on_retry(&block) + @on_retry_block = block + return nil + end + + def on_error(&block) + @on_error_block = block + return nil + end + + def running? + return @is_running + end + + def deserialize_message(payload) + provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) + return provider.deserialize(payload) + end + + def serialize_message(msg) + provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) + return provider.serialize(msg) + end + + def call_on_retry_exceeded_block(message) + if @retry_exceeded_block != nil + EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } + begin + @retry_exceeded_block.call(message) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } + end + end + + def call_on_retry_block(message) + if @on_retry_block + EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } + begin + @on_retry_block.call(message, abort) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } + end + end + + def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) + + EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") + #reject the message to remove from queue + channel.reject(delivery_tag, false) + + #check if the message retry limit has been exceeded + if message.retry_attempts >= queue.max_retry_attempts + + EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{serialize_message(message)}") + + call_on_retry_exceeded_block(message) + + #check if the message is allowed to be retried + elsif queue.allow_retry + + EventQ.logger.debug { "[#{self.class}] - Incrementing retry attempts count." } + message.retry_attempts += 1 + + if queue.allow_retry_back_off == true + EventQ.logger.debug { "[#{self.class}] - Calculating message back off retry delay. Attempts: #{message.retry_attempts} * Retry Delay: #{queue.retry_delay}" } + message_ttl = message.retry_attempts * queue.retry_delay + if (message.retry_attempts * queue.retry_delay) > queue.max_retry_delay + EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } + message_ttl = queue.max_retry_delay + end + else + EventQ.logger.debug { "[#{self.class}] - Setting fixed retry delay for message." } + message_ttl = queue.retry_delay + end + + EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" } + retry_exchange.publish(serialize_message(message), :expiration => message_ttl) + EventQ.logger.debug { "[#{self.class}] - Published message to retry exchange." } + + call_on_retry_block(message) + + end + + return true + + end + + def configure(queue, options = {}) + + @queue = queue + + #default thread count + @thread_count = 4 + if options.key?(:thread_count) + @thread_count = options[:thread_count] + end + + #default sleep time in seconds + @sleep = 15 + if options.key?(:sleep) + @sleep = options[:sleep] + end + + @fork_count = 1 + if options.key?(:fork_count) + @fork_count = options[:fork_count] + end + + @gc_flush_interval = 10 + if options.key?(:gc_flush_interval) + @gc_flush_interval = options[:gc_flush_interval] + end + + EventQ.logger.info("[#{self.class}] - Configuring. Process Count: #{@fork_count} | Thread Count: #{@thread_count} | Interval Sleep: #{@sleep}.") + + return true + + end + + private + + def process_message(payload, queue, channel, retry_exchange, delivery_tag, block) + abort = false + error = false + message = deserialize_message(payload) + + EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{message.retry_attempts}") + + @signature_provider_manager.validate_signature(message: message, queue: queue) + + message_args = EventQ::MessageArgs.new(type: message.type, + retry_attempts: message.retry_attempts, + context: message.context, + content_type: message.content_type) + + if(!EventQ::NonceManager.is_allowed?(message.id)) + EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") + channel.acknowledge(delivery_tag, false) + return false + end + + #begin worker block for queue message + begin + block.call(message.content, message_args) + + if message_args.abort == true + abort = true + EventQ.logger.info("[#{self.class}] - Message aborted.") + else + #accept the message as processed + channel.acknowledge(delivery_tag, false) + EventQ.logger.info("[#{self.class}] - Message acknowledged.") + end + + rescue => e + EventQ.logger.error("[#{self.class}] - An unhandled error happened attempting to process a queue message. Error: #{e} | Backtrace: #{e.backtrace}") + error = true + call_on_error_block(error: e, message: message) + end + + if error || abort + EventQ::NonceManager.failed(message.id) + reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) + else + EventQ::NonceManager.complete(message.id) + end + end + end + end +end + diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb index 01690f3..784a8d0 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/rabbitmq_queue_worker.rb @@ -127,7 +127,7 @@ def start_process(options, queue, block) if options.key?(:wait) && options[:wait] == true @threads.each { |thr| thr.join } - @connection.close + @connection.close if @connection.open? end return true @@ -190,13 +190,17 @@ def thread_process_iteration(channel, manager, queue, block) end def stop - puts "[#{self.class}] - Stopping..." + EventQ.logger.info { "[#{self.class}] - Stopping..." } @is_running = false Thread.list.each do |thread| thread.exit unless thread == Thread.current end if @connection != nil - @connection.close if @connection.open? + begin + @connection.close if @connection.open? + rescue Timeout::Error + EventQ.logger.error { 'Timeout occurred closing connection.' } + end end return true end diff --git a/eventq_rabbitmq/script/test.sh b/eventq_rabbitmq/script/test.sh index f20f03e..1f19546 100755 --- a/eventq_rabbitmq/script/test.sh +++ b/eventq_rabbitmq/script/test.sh @@ -3,7 +3,5 @@ echo start rspec tests docker-compose up -d -#docker exec -it testrunner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" \ -#&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" - -docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" \ No newline at end of file +docker exec -it testrunner bash -c "cd gem_src && sleep 4 && bundle install && bundle exec rspec $*" \ +&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" \ No newline at end of file diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb index da774a5..3591035 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_eventq_client_spec.rb @@ -46,7 +46,7 @@ def receive_message(queue) end end - puts "[QUEUE] - received message: #{qm.content}" + EventQ.logger.debug { "[QUEUE] - received message: #{qm.content}" } qm end @@ -58,7 +58,7 @@ def receive_message(queue) queue = queue_manager.get_queue(channel, subscriber_queue) - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } qm = receive_message(queue) @@ -79,7 +79,7 @@ def receive_message(queue) queue = queue_manager.get_queue(channel, subscriber_queue) - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } qm = receive_message(queue) @@ -122,12 +122,12 @@ def receive_message(queue) queue = channel.queue(queue_name, durable: queue_manager.durable) - puts '[QUEUE] waiting for message... (but there should be none yet)' + EventQ.logger.debug { '[QUEUE] waiting for message... (but there should be none yet)' } qm = receive_message(queue) expect(qm).to be_nil - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } sleep 2.5 qm = receive_message(queue) @@ -145,12 +145,12 @@ def receive_message(queue) queue = channel.queue(queue_name, durable: queue_manager.durable) - puts '[QUEUE] waiting for message... (but there should be none yet)' + EventQ.logger.debug { '[QUEUE] waiting for message... (but there should be none yet)' } qm = receive_message(queue) expect(qm).to be_nil - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } sleep 2.2 qm = receive_message(queue) @@ -159,12 +159,12 @@ def receive_message(queue) # check for other message - puts '[QUEUE] waiting for other message... (but there should be none yet)' + EventQ.logger.debug { '[QUEUE] waiting for other message... (but there should be none yet)' } qm = receive_message(queue) expect(qm).to be_nil - puts '[QUEUE] waiting for other message...' + EventQ.logger.debug { '[QUEUE] waiting for other message...' } sleep 2 qm = receive_message(queue) diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb index 83309cb..30b7485 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_client_spec.rb @@ -22,10 +22,10 @@ sleep 0.2 delivery_tag, payload = manager.pop_message(queue: q) - puts "#{dlq.message_count} messages dead lettered so far" + EventQ.logger.debug { "#{dlq.message_count} messages dead lettered so far" } expect(dlq.message_count).to eq(0) - puts "Rejecting a message" + EventQ.logger.debug { "Rejecting a message" } channel.nack(delivery_tag) sleep 0.2 @@ -33,11 +33,11 @@ channel = connection.create_channel dlx = channel.fanout("bunny.examples.dlx.exchange") dlq = channel.queue("subscriber_retry", :exclusive => true).bind(dlx) - puts "#{dlq.message_count} messages dead lettered so far" + EventQ.logger.debug { "#{dlq.message_count} messages dead lettered so far" } expect(dlq.message_count).to eq(1) dlx.delete - puts "Disconnecting..." + EventQ.logger.debug { "Disconnecting..." } end it 'should use a delay queue correctly' do diff --git a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb index d341dc1..a7126fb 100644 --- a/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb +++ b/eventq_rabbitmq/spec/eventq_rabbitmq/rabbitmq_queue_worker_spec.rb @@ -9,8 +9,12 @@ let(:channel) { connection.create_channel } after do + begin channel.close if channel.open? - connection.close + connection.close if connection.open? + rescue => e + EventQ.logger.error { "Timeout error occurred closing connection. Error: #{e}" } + end end describe '#deserialize_message' do @@ -143,6 +147,8 @@ sleep(2) + subject.stop + expect(received_count).to eq 1 end @@ -250,7 +256,7 @@ expect(args.type).to eq(event_type) expect(args.content_type).to eq message.class.to_s expect(args.context).to eq message_context - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(1) @@ -287,7 +293,7 @@ expect(event).to eq(message) expect(args.type).to eq(event_type) received = true - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(1) @@ -326,7 +332,7 @@ expect(event).to eq(message) expect(args.type).to eq(event_type) received = true - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(0.5) @@ -375,10 +381,10 @@ expect(args.type).to eq(event_type) mutex.synchronize do - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } message_count += 1 add_to_received_list(received_messages) - puts 'message processed.' + EventQ.logger.debug { 'message processed.' } sleep 0.2 end end @@ -610,7 +616,7 @@ class A def add_to_received_list(received_messages) thread_name = Thread.current.object_id - puts "[THREAD] #{thread_name}" + EventQ.logger.debug { "[THREAD] #{thread_name}" } thread = received_messages.select { |i| i[:thread] == thread_name } if thread.length > 0 diff --git a/eventq_rabbitmq/spec/spec_helper.rb b/eventq_rabbitmq/spec/spec_helper.rb index 466ddbf..4ee4d6b 100644 --- a/eventq_rabbitmq/spec/spec_helper.rb +++ b/eventq_rabbitmq/spec/spec_helper.rb @@ -4,6 +4,8 @@ require_relative '../lib/eventq_rabbitmq' +EventQ.logger.level = Logger::ERROR + RSpec.configure do |config| config.before(:each) do From 149ca4d0f04cf40317e0750a37c33a94dc3e6382 Mon Sep 17 00:00:00 2001 From: vaughanbrittonsage Date: Tue, 27 Jun 2017 11:13:10 +0100 Subject: [PATCH 12/12] implemented jruby update for eventq_aws --- eventq_aws/.rspec | 1 + eventq_aws/Gemfile | 15 +- eventq_aws/eventq_aws.gemspec | 6 +- eventq_aws/lib/eventq_aws.rb | 7 +- .../lib/eventq_aws/jruby/aws_queue_worker.rb | 370 ++++++++++++++++++ eventq_aws/lib/eventq_aws/version.rb | 2 +- eventq_aws/script/docker-compose.yml | 14 +- eventq_aws/script/test.sh | 3 +- eventq_aws/spec/aws_queue_worker_spec.rb | 78 ++-- .../integration/aws_eventq_client_spec.rb | 37 +- .../spec/integration/aws_queue_worker_spec.rb | 20 +- eventq_aws/spec/spec_helper.rb | 5 +- .../jruby/rabbitmq_queue_worker.rb | 10 +- 13 files changed, 487 insertions(+), 81 deletions(-) create mode 100644 eventq_aws/lib/eventq_aws/jruby/aws_queue_worker.rb diff --git a/eventq_aws/.rspec b/eventq_aws/.rspec index 83e16f8..72cf692 100644 --- a/eventq_aws/.rspec +++ b/eventq_aws/.rspec @@ -1,2 +1,3 @@ --color --require spec_helper +--format progress diff --git a/eventq_aws/Gemfile b/eventq_aws/Gemfile index adc6d12..4424708 100644 --- a/eventq_aws/Gemfile +++ b/eventq_aws/Gemfile @@ -3,8 +3,17 @@ source 'https://rubygems.org' # Specify your gem's dependencies in eventq_aws.gemspec gemspec -gem 'oj', '2.15.0' + gem 'json', '1.8.3' -gem 'openssl', '2.0.4' -gem 'pry' gem 'redlock' + +platforms :ruby do + gem 'oj', '2.15.0' + gem 'pry' + gem 'openssl', '2.0.4' +end + +platforms :jruby do + gem 'pry-debugger-jruby' + gem 'jruby-openssl' +end diff --git a/eventq_aws/eventq_aws.gemspec b/eventq_aws/eventq_aws.gemspec index 117152b..acf8a06 100644 --- a/eventq_aws/eventq_aws.gemspec +++ b/eventq_aws/eventq_aws.gemspec @@ -24,5 +24,9 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_dependency 'aws-sdk-core' - spec.add_dependency 'eventq_base', '~> 1.14' + spec.add_dependency 'eventq_base', '~> 1.15' + + if RUBY_PLATFORM =~ /java/ + spec.platform = 'java' + end end diff --git a/eventq_aws/lib/eventq_aws.rb b/eventq_aws/lib/eventq_aws.rb index 530a27b..f709531 100644 --- a/eventq_aws/lib/eventq_aws.rb +++ b/eventq_aws/lib/eventq_aws.rb @@ -5,10 +5,15 @@ require 'eventq_aws/aws_eventq_client' require 'eventq_aws/aws_queue_client' require 'eventq_aws/aws_queue_manager' -require 'eventq_aws/aws_queue_worker' require 'eventq_aws/aws_subscription_manager' require_relative 'eventq_aws/aws_status_checker' +if RUBY_PLATFORM =~ /java/ + require 'eventq_aws/jruby/aws_queue_worker' +else + require 'eventq_aws/aws_queue_worker' +end + module EventQ def self.namespace @namespace diff --git a/eventq_aws/lib/eventq_aws/jruby/aws_queue_worker.rb b/eventq_aws/lib/eventq_aws/jruby/aws_queue_worker.rb new file mode 100644 index 0000000..12d5ab4 --- /dev/null +++ b/eventq_aws/lib/eventq_aws/jruby/aws_queue_worker.rb @@ -0,0 +1,370 @@ +require 'java' +java_import java.util.concurrent.Executors +module EventQ + module Amazon + class QueueWorker + include EventQ::WorkerId + + APPROXIMATE_RECEIVE_COUNT = 'ApproximateReceiveCount'.freeze + MESSAGE = 'Message'.freeze + + attr_accessor :is_running + + def initialize + @is_running = false + + @on_retry_exceeded_block = nil + @on_retry_block = nil + @on_error_block = nil + + @hash_helper = HashKit::Helper.new + @serialization_provider_manager = EventQ::SerializationProviders::Manager.new + @signature_provider_manager = EventQ::SignatureProviders::Manager.new + + @last_gc_flush = Time.now + @gc_flush_interval = 10 + + @queue_poll_wait = 10 + end + + def start(queue, options = {}, &block) + + EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") + + configure(queue, options) + + if options[:client] == nil + raise "[#{self.class}] - :client (QueueClient) must be specified." + end + + raise "[#{self.class}] - Worker is already running." if running? + + client = options[:client] + EventQ.logger.debug do + "[#{self.class} #start] - Listening for messages on queue: #{queue.name}, Queue Url: #{client.get_queue_url(queue)}, Queue arn: #{client.get_queue_arn(queue)}" + end + + EventQ.logger.info("[#{self.class}] - Listening for messages.") + + start_process(options, queue, block) + + return true + end + + def start_process(options, queue, block) + + %w'INT TERM'.each do |sig| + Signal.trap(sig) { + stop + exit + } + end + + @is_running = true + + @executor = java.util.concurrent.Executors::newFixedThreadPool @thread_count + + #loop through each thread count + @thread_count.times do + + @executor.execute do + + client = options[:client] + manager = EventQ::Amazon::QueueManager.new({ client: client }) + + #begin the queue loop for this thread + while true do + + #check if the worker is still allowed to run and break out of thread loop if not + unless running? + break + end + + if @executor.is_shutdown + break + end + + has_message_received = thread_process_iteration(client, manager, queue, block) + + gc_flush + + if !has_message_received + EventQ.logger.debug { "[#{self.class}] - No message received." } + if @sleep > 0 + EventQ.logger.debug { "[#{self.class}] - Sleeping for #{@sleep} seconds" } + sleep(@sleep) + end + end + + end + + end + + end + + if options.key?(:wait) && options[:wait] == true + while running? do end + end + + end + + def gc_flush + if Time.now - last_gc_flush > @gc_flush_interval + GC.start + @last_gc_flush = Time.now + end + end + + def last_gc_flush + @last_gc_flush + end + + def thread_process_iteration(client, manager, queue, block) + #get the queue + q = manager.get_queue(queue) + + received = false + + begin + + #request a message from the queue + response = client.sqs.receive_message({ + queue_url: q, + max_number_of_messages: 1, + wait_time_seconds: @queue_poll_wait, + attribute_names: [APPROXIMATE_RECEIVE_COUNT] + }) + + #check that a message was received + if response.messages.length > 0 + received = true + begin + tag_processing_thread + process_message(response, client, queue, q, block) + ensure + untag_processing_thread + end + + end + + rescue => e + EventQ.log(:error, "[#{self.class}] - An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}") + call_on_error_block(error: e) + end + + return received + end + + def call_on_error_block(error:, message: nil) + if @on_error_block + EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } + begin + @on_error_block.call(error, message) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } + end + end + + def call_on_retry_exceeded_block(message) + if @on_retry_exceeded_block != nil + EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } + begin + @on_retry_exceeded_block.call(message) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } + end + end + + def call_on_retry_block(message) + if @on_retry_block + EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } + begin + @on_retry_block.call(message, abort) + rescue => e + EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") + end + else + EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } + end + end + + def stop + EventQ.logger.info("[#{self.class}] - Stopping.") + @is_running = false + @executor.shutdown + return true + end + + def on_retry_exceeded(&block) + @retry_exceeded_block = block + end + + def on_retry(&block) + @on_retry_block = block + return nil + end + + def on_error(&block) + @on_error_block = block + return nil + end + + def running? + return @is_running + end + + def deserialize_message(payload) + provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) + return provider.deserialize(payload) + end + + def serialize_message(msg) + provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) + return provider.serialize(msg) + end + + private + + def process_message(response, client, queue, q, block) + msg = response.messages[0] + retry_attempts = msg.attributes[APPROXIMATE_RECEIVE_COUNT].to_i - 1 + + #deserialize the message payload + payload = JSON.load(msg.body) + message = deserialize_message(payload[MESSAGE]) + + message_args = EventQ::MessageArgs.new(type: message.type, + retry_attempts: retry_attempts, + context: message.context, + content_type: message.content_type) + + EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{retry_attempts}") + + @signature_provider_manager.validate_signature(message: message, queue: queue) + + if(!EventQ::NonceManager.is_allowed?(message.id)) + EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") + client.sqs.delete_message({ queue_url: q, receipt_handle: msg.receipt_handle }) + return false + end + + #begin worker block for queue message + begin + + block.call(message.content, message_args) + + if message_args.abort == true + EventQ.logger.info("[#{self.class}] - Message aborted.") + else + #accept the message as processed + client.sqs.delete_message({ queue_url: q, receipt_handle: msg.receipt_handle }) + EventQ.logger.info("[#{self.class}] - Message acknowledged.") + end + + rescue => e + EventQ.logger.error("[#{self.class}] - An unhandled error happened while attempting to process a queue message. Error: #{e} | Backtrace: #{e.backtrace}") + error = true + call_on_error_block(error: e, message: message) + end + + if message_args.abort || error + EventQ::NonceManager.failed(message.id) + reject_message(queue, client, msg, q, retry_attempts, message, message_args.abort) + else + EventQ::NonceManager.complete(message.id) + end + + return true + end + + def reject_message(queue, client, msg, q, retry_attempts, message, abort) + + if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts + + EventQ.logger.info("[#{self.class}] - Message rejected removing from queue. Message: #{serialize_message(message)}") + + #remove the message from the queue so that it does not get retried again + client.sqs.delete_message({ queue_url: q, receipt_handle: msg.receipt_handle }) + + if retry_attempts >= queue.max_retry_attempts + + EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded.") + call_on_retry_exceeded_block(message) + + end + + elsif queue.allow_retry + + retry_attempts += 1 + + EventQ.logger.info("[#{self.class}] - Message rejected requesting retry. Attempts: #{retry_attempts}") + + if queue.allow_retry_back_off == true + EventQ.logger.debug { "[#{self.class}] - Calculating message back off retry delay. Attempts: #{retry_attempts} * Delay: #{queue.retry_delay}" } + visibility_timeout = (queue.retry_delay * retry_attempts) / 1000 + if visibility_timeout > (queue.max_retry_delay / 1000) + EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } + visibility_timeout = queue.max_retry_delay / 1000 + end + else + EventQ.logger.debug { "[#{self.class}] - Setting fixed retry delay for message." } + visibility_timeout = queue.retry_delay / 1000 + end + + if visibility_timeout > 43200 + EventQ.logger.debug { "[#{self.class}] - AWS max visibility timeout of 12 hours has been exceeded. Setting message retry delay to 12 hours." } + visibility_timeout = 43200 + end + + EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" } + client.sqs.change_message_visibility({ + queue_url: q, # required + receipt_handle: msg.receipt_handle, # required + visibility_timeout: visibility_timeout.to_s, # required + }) + + call_on_retry_block(message) + + end + + end + + def configure(queue, options = {}) + + @queue = queue + + #default thread count + @thread_count = 5 + if options.key?(:thread_count) + @thread_count = options[:thread_count] + end + + #default sleep time in seconds + @sleep = 5 + if options.key?(:sleep) + @sleep = options[:sleep] + end + + if options.key?(:gc_flush_interval) + @gc_flush_interval = options[:gc_flush_interval] + end + + if options.key?(:queue_poll_wait) + @queue_poll_wait = options[:queue_poll_wait] + end + + EventQ.logger.info("[#{self.class}] - Configuring. Thread Count: #{@thread_count} | Interval Sleep: #{@sleep} | GC Flush Interval: #{@gc_flush_interval} | Queue Poll Wait: #{@queue_poll_wait}.") + + return true + + end + + end + end +end diff --git a/eventq_aws/lib/eventq_aws/version.rb b/eventq_aws/lib/eventq_aws/version.rb index 66a0f07..e239e43 100644 --- a/eventq_aws/lib/eventq_aws/version.rb +++ b/eventq_aws/lib/eventq_aws/version.rb @@ -1,5 +1,5 @@ module EventQ module Amazon - VERSION = "1.13.0" + VERSION = "1.14.0" end end diff --git a/eventq_aws/script/docker-compose.yml b/eventq_aws/script/docker-compose.yml index 401bf35..d85fc71 100644 --- a/eventq_aws/script/docker-compose.yml +++ b/eventq_aws/script/docker-compose.yml @@ -1,6 +1,18 @@ testrunner: image: eventq/aws - container_name: gem_test_runner + container_name: testrunner + command: bash -c "./scripts/container_loop.sh" + links: + - redis + volumes: + - ./container_loop.sh:/scripts/container_loop.sh + - ../:/gem_src + env_file: + - ../../.aws.env + +testrunner_jruby: + image: 522104923602.dkr.ecr.eu-west-1.amazonaws.com/sageone/jruby:20170623 + container_name: testrunner_jruby command: bash -c "./scripts/container_loop.sh" links: - redis diff --git a/eventq_aws/script/test.sh b/eventq_aws/script/test.sh index 3f5396d..c1d4b8d 100755 --- a/eventq_aws/script/test.sh +++ b/eventq_aws/script/test.sh @@ -3,4 +3,5 @@ echo start rspec tests docker-compose up -d -docker exec -it gem_test_runner bash -c "cd gem_src && bundle install && bundle exec rspec $*" +docker exec -it testrunner bash -c "cd gem_src && bundle install && bundle exec rspec $*" \ +&& docker exec -it testrunner_jruby bash -c "cd gem_src && rm -rf Gemfile.lock && jruby -S bundle install && jruby -S rspec $*" \ No newline at end of file diff --git a/eventq_aws/spec/aws_queue_worker_spec.rb b/eventq_aws/spec/aws_queue_worker_spec.rb index 9a447af..fe35a2f 100644 --- a/eventq_aws/spec/aws_queue_worker_spec.rb +++ b/eventq_aws/spec/aws_queue_worker_spec.rb @@ -3,50 +3,52 @@ RSpec.describe EventQ::Amazon::QueueWorker do describe '#deserialize_message' do - context 'when serialization provider is OJ_PROVIDER' do - before do - EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER - end + unless RUBY_PLATFORM =~ /java/ + context 'when serialization provider is OJ_PROVIDER' do + before do + EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER + end - context 'when payload is for a known type' do - let(:a) do - A.new.tap do |a| - a.text = 'ABC' + context 'when payload is for a known type' do + let(:a) do + A.new.tap do |a| + a.text = 'ABC' + end end - end - let(:payload) { Oj.dump(a) } + let(:payload) { Oj.dump(a) } - it 'should deserialize the message into an object of the known type' do - message = subject.deserialize_message(payload) - expect(message).to be_a(A) - expect(message.text).to eq('ABC') + it 'should deserialize the message into an object of the known type' do + message = subject.deserialize_message(payload) + expect(message).to be_a(A) + expect(message.text).to eq('ABC') + end end - end - context 'when payload is for an unknown type' do - let(:a) do - A.new.tap do |a| - a.text = 'ABC' + context 'when payload is for an unknown type' do + let(:a) do + A.new.tap do |a| + a.text = 'ABC' + end end - end - let(:payload) do - string = Oj.dump(a) - JSON.load(string.sub('"^o":"A"', '"^o":"B"')) - end - let(:message) do - EventQ::QueueMessage.new.tap do |m| - m.content = payload + let(:payload) do + string = Oj.dump(a) + JSON.load(string.sub('"^o":"A"', '"^o":"B"')) + end + let(:message) do + EventQ::QueueMessage.new.tap do |m| + m.content = payload + end + end + let(:json) do + Oj.dump(message) end - end - let(:json) do - Oj.dump(message) - end - it 'should deserialize the message into a Hash' do - message = subject.deserialize_message(json) - expect(message.content).to be_a(Hash) - expect(message.content[:text]).to eq('ABC') + it 'should deserialize the message into a Hash' do + message = subject.deserialize_message(json) + expect(message.content).to be_a(Hash) + expect(message.content[:text]).to eq('ABC') + end end end end @@ -71,8 +73,10 @@ expect(message.content).to be_a(Hash) expect(message.content[:text]).to eq('ABC') end - after do - EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER + unless RUBY_PLATFORM =~ /java/ + after do + EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER + end end end end diff --git a/eventq_aws/spec/integration/aws_eventq_client_spec.rb b/eventq_aws/spec/integration/aws_eventq_client_spec.rb index ea1aaf5..838e5a9 100644 --- a/eventq_aws/spec/integration/aws_eventq_client_spec.rb +++ b/eventq_aws/spec/integration/aws_eventq_client_spec.rb @@ -24,23 +24,25 @@ end end + let(:class_kit) { ClassKit::Helper.new } + let(:event_type) { 'test_queue1_event1' } let(:message) { 'Hello World' } - let(:message_context) { { foo: 'bar' } } + let(:message_context) { { 'foo' => 'bar' } } describe '#publish' do it 'should raise an event object and be broadcast to a subscriber queue' do subscription_manager.subscribe(event_type, subscriber_queue) id = eventq_client.publish(topic: event_type, event: message, context: message_context) - puts "Message ID: #{id}" + EventQ.logger.debug { "Message ID: #{id}" } #sleep for 2 seconds to allow the aws message to be sent to the topic and broadcast to subscribers sleep(1) q = queue_manager.get_queue(subscriber_queue) - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } #request a message from the queue response = queue_client.sqs.receive_message({ @@ -53,9 +55,10 @@ expect(response.messages.length).to eq(1) msg = response.messages[0] - msg_body = Oj.load(msg.body) - payload = Oj.load(msg_body["Message"]) - puts "[QUEUE] - received message: #{payload}" + msg_body = JSON.load(msg.body) + payload_hash = JSON.load(msg_body["Message"]) + payload = class_kit.from_hash(hash: payload_hash, klass: EventQ::QueueMessage) + EventQ.logger.debug { "[QUEUE] - received message: #{payload}" } #remove the message from the queue so that it does not get retried queue_client.sqs.delete_message({ queue_url: q, receipt_handle: msg.receipt_handle }) @@ -74,14 +77,14 @@ subscription_manager.subscribe(event_type, subscriber_queue) id = eventq_client.raise_event(event_type, message, message_context) - puts "Message ID: #{id}" + EventQ.logger.debug { "Message ID: #{id}" } #sleep for 2 seconds to allow the aws message to be sent to the topic and broadcast to subscribers sleep(1) q = queue_manager.get_queue(subscriber_queue) - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } #request a message from the queue response = queue_client.sqs.receive_message({ @@ -94,9 +97,10 @@ expect(response.messages.length).to eq(1) msg = response.messages[0] - msg_body = Oj.load(msg.body) - payload = Oj.load(msg_body["Message"]) - puts "[QUEUE] - received message: #{payload}" + msg_body = JSON.load(msg.body) + payload_hash = JSON.load(msg_body["Message"]) + payload = class_kit.from_hash(hash: payload_hash, klass: EventQ::QueueMessage) + EventQ.logger.debug { "[QUEUE] - received message: #{payload}" } #remove the message from the queue so that it does not get retried queue_client.sqs.delete_message({ queue_url: q, receipt_handle: msg.receipt_handle }) @@ -139,9 +143,9 @@ queue_client.sqs.purge_queue(queue_url: queue_client.get_queue_url(queue)) id = eventq_client.raise_event_in_queue(event_type, message, queue, delay_seconds) - puts "Message ID: #{id}" + EventQ.logger.debug { "Message ID: #{id}" } - puts '[QUEUE] waiting for message...' + EventQ.logger.debug { '[QUEUE] waiting for message...' } #request a message from the queue queue_url = queue_client.get_queue_url(queue) @@ -166,14 +170,15 @@ expect(response.messages.length).to eq(1) msg = response.messages[0] - msg_body = Oj.load(msg.body) + payload_hash = JSON.load(msg.body) + payload = class_kit.from_hash(hash: payload_hash, klass: EventQ::QueueMessage) - puts "[QUEUE] - received message: #{msg_body}" + EventQ.logger.debug { "[QUEUE] - received message: #{msg_body}" } #remove the message from the queue so that it does not get retried queue_client.sqs.delete_message(queue_url: queue_url, receipt_handle: msg.receipt_handle) - expect(msg_body.content).to eq(message) + expect(payload.content).to eq(message) end end end diff --git a/eventq_aws/spec/integration/aws_queue_worker_spec.rb b/eventq_aws/spec/integration/aws_queue_worker_spec.rb index 950cf79..5ffd94f 100644 --- a/eventq_aws/spec/integration/aws_queue_worker_spec.rb +++ b/eventq_aws/spec/integration/aws_queue_worker_spec.rb @@ -27,13 +27,14 @@ let(:event_type) { 'queue_worker_event1' } let(:event_type2) { 'queue_worker_event2' } let(:message) { 'Hello World' } - let(:message_context) { { foo: 'bar' } } + let(:message_context) { { 'foo' => 'bar' } } it 'should receive an event from the subscriber queue' do subscription_manager.subscribe(event_type, subscriber_queue) eventq_client.raise_event(event_type, message, message_context) received = false + context = nil #wait 1 second to allow the message to be sent and broadcast to the queue sleep(1) @@ -41,9 +42,9 @@ subject.start(subscriber_queue, {:sleep => 1, :thread_count => 1, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) - expect(args.context).to eq message_context + context = message_context received = true - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(2) @@ -51,6 +52,7 @@ subject.stop expect(received).to eq(true) + expect(context).to eq message_context expect(subject.is_running).to eq(false) end @@ -77,7 +79,7 @@ expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) received = true - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(2) @@ -108,7 +110,7 @@ expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) received = true - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } end sleep(2) @@ -143,7 +145,7 @@ received = true received_count += 1 received_attribute = args.retry_attempts - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } if received_count != 2 args.abort = true end @@ -178,10 +180,10 @@ expect(args).to be_a(EventQ::MessageArgs) mutex.synchronize do - puts "Message Received: #{event}" + EventQ.logger.debug { "Message Received: #{event}" } message_count += 1 add_to_received_list(received_messages) - puts 'message processed.' + EventQ.logger.debug { 'message processed.' } end end @@ -250,7 +252,7 @@ def add_to_received_list(received_messages) thread_name = Thread.current.object_id - puts "[THREAD] #{thread_name}" + EventQ.logger.debug { "[THREAD] #{thread_name}" } thread = received_messages.detect { |i| i[:thread] == thread_name } if thread != nil diff --git a/eventq_aws/spec/spec_helper.rb b/eventq_aws/spec/spec_helper.rb index 7d2dabf..1c8dc1c 100644 --- a/eventq_aws/spec/spec_helper.rb +++ b/eventq_aws/spec/spec_helper.rb @@ -1,8 +1,7 @@ -require 'pry' -require 'oj' -require 'json' require_relative '../lib/eventq_aws' +EventQ.logger.level = Logger::ERROR + module EventQ def self.AWS_ACCOUNT_NUMBER ENV.fetch('AWS_ACCOUNT_NUMBER') diff --git a/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb b/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb index fe20f28..287cb35 100644 --- a/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb +++ b/eventq_rabbitmq/lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb @@ -8,8 +8,6 @@ class QueueWorker attr_accessor :is_running def initialize - @threads = [] - @forks = [] @is_running = false @retry_exceeded_block = nil @@ -41,6 +39,7 @@ def start(queue, options = {}, &block) start_process(options, queue, block) + return true end def start_process(options, queue, block) @@ -303,17 +302,12 @@ def configure(queue, options = {}) @sleep = options[:sleep] end - @fork_count = 1 - if options.key?(:fork_count) - @fork_count = options[:fork_count] - end - @gc_flush_interval = 10 if options.key?(:gc_flush_interval) @gc_flush_interval = options[:gc_flush_interval] end - EventQ.logger.info("[#{self.class}] - Configuring. Process Count: #{@fork_count} | Thread Count: #{@thread_count} | Interval Sleep: #{@sleep}.") + EventQ.logger.info("[#{self.class}] - Configuring. Thread Count: #{@thread_count} | Interval Sleep: #{@sleep}.") return true