From e757f5e52d4fdf15d48b7122d9dc0934dc99c458 Mon Sep 17 00:00:00 2001 From: Justin Coyne Date: Fri, 1 Dec 2017 10:34:00 -0800 Subject: [PATCH] Create an actor that publishes create messages to Kafka --- Gemfile | 2 ++ Gemfile.lock | 2 ++ README.md | 14 +++++++++++ app/actors/notify_kafka_actor.rb | 32 +++++++++++++++++++++++++ config/environment.rb | 2 ++ config/settings.yml | 5 ++++ spec/actors/notify_kakfka_actor_spec.rb | 9 +++++++ spec/features/create_work_spec.rb | 3 +++ 8 files changed, 69 insertions(+) create mode 100644 app/actors/notify_kafka_actor.rb create mode 100644 spec/actors/notify_kakfka_actor_spec.rb diff --git a/Gemfile b/Gemfile index b5a71e7..21b1b9b 100644 --- a/Gemfile +++ b/Gemfile @@ -33,6 +33,8 @@ gem 'jbuilder', '~> 2.5' # Use Capistrano for deployment # gem 'capistrano-rails', group: :development +gem 'ruby-kafka' + group :production do # For using an SQS based worker beanstalk environment gem 'active_elastic_job' diff --git a/Gemfile.lock b/Gemfile.lock index 100870c..ef38423 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -642,6 +642,7 @@ GEM json multipart-post oauth2 + ruby-kafka (0.5.1) ruby-progressbar (1.9.0) ruby_dep (1.5.0) rubyzip (1.2.1) @@ -781,6 +782,7 @@ DEPENDENCIES rsolr (>= 1.0) rspec-rails rubocop + ruby-kafka sass-rails (~> 5.0) selenium-webdriver solr_wrapper (>= 0.3) diff --git a/README.md b/README.md index 164f959..95a4c22 100644 --- a/README.md +++ b/README.md @@ -23,3 +23,17 @@ include Suri::Druid ``` to the model in order to mint the ID as a DRUID instead of a Fedora UUID. + +# Running Streaming + +See [https://kafka.apache.org/quickstart] + +Start Zookeeper +``` +bin/zookeeper-server-start.sh config/zookeeper.properties +``` + +Start Kafka +``` +bin/kafka-server-start.sh config/server.properties +``` diff --git a/app/actors/notify_kafka_actor.rb b/app/actors/notify_kafka_actor.rb new file mode 100644 index 0000000..3ba1eb5 --- /dev/null +++ b/app/actors/notify_kafka_actor.rb @@ -0,0 +1,32 @@ +# Sends messages to Kafka when a work is created +class NotifyKafkaActor < Hyrax::Actors::AbstractActor + # @param [Hyrax::Actors::Environment] env + # @return [Boolean] true if create was successful + def create(env) + next_actor.create(env) && publish_to_kafka('created', env) + end + + private + + # @param action [String] what action was taken + # @param env [Environment] the environment of the action + def publish_to_kafka(action, env) + kafka.deliver_message(message(action, env), + topic: Settings.kafka.topic) + true + end + + def kafka + Kafka.new(seed_brokers: Settings.kafka.seed_brokers, + client_id: Settings.kafka.client_id) + end + + # @return [String] a JSON encoded message to send to Kafka + def message(action, env) + { action: action, work: work_message(env) }.to_json + end + + def work_message(env) + { id: env.curation_concern.id, title: env.curation_concern.title } + end +end diff --git a/config/environment.rb b/config/environment.rb index 426333b..0bed3fb 100644 --- a/config/environment.rb +++ b/config/environment.rb @@ -3,3 +3,5 @@ # Initialize the Rails application. Rails.application.initialize! + +Hyrax::CurationConcern.actor_factory.use NotifyKafkaActor diff --git a/config/settings.yml b/config/settings.yml index 8c85e13..2f50b0e 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -3,3 +3,8 @@ suri: username: <%= ENV['SURI_USERNAME'] %> password: <%= ENV['SURI_PASSWORD'] %> use_ssl: true +kafka: + seed_brokers: + - 'localhost:9092' + topic: 'test' + client_id: 'hydrox-app' diff --git a/spec/actors/notify_kakfka_actor_spec.rb b/spec/actors/notify_kakfka_actor_spec.rb new file mode 100644 index 0000000..2cbaad9 --- /dev/null +++ b/spec/actors/notify_kakfka_actor_spec.rb @@ -0,0 +1,9 @@ +require 'rails_helper' + +RSpec.describe NotifyKafkaActor do + describe "the middleware" do + subject { Hyrax::CurationConcern.actor_factory.middlewares } + + it { is_expected.to include described_class } + end +end diff --git a/spec/features/create_work_spec.rb b/spec/features/create_work_spec.rb index 06a3f50..d306ed9 100644 --- a/spec/features/create_work_spec.rb +++ b/spec/features/create_work_spec.rb @@ -16,9 +16,12 @@ let(:permission_template) { Hyrax::PermissionTemplate.find_or_create_by!(admin_set_id: admin_set_id) } let(:workflow) { Sipity::Workflow.create!(active: true, name: 'test-workflow', permission_template: permission_template) } let(:minter) { instance_double(Suri::Minter, mint: 'oo000oo0000')} + let(:kafka) { instance_double(Kafka, deliver_message: nil) } before do allow(Suri::Minter).to receive(:new).and_return(minter) + allow(Kafka).to receive(:new).and_return(kafka) + Sipity::WorkflowAction.create!(name: 'submit', workflow: workflow) # Need to create a single action that can be taken permission_template_access = Hyrax::PermissionTemplateAccess.create!(permission_template_id: permission_template.id, agent_type: 'user', agent_id: user.user_key, access: 'deposit')