Skip to content
This repository has been archived by the owner on Jun 2, 2020. It is now read-only.

Commit

Permalink
Create an actor that publishes create messages to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jcoyne committed Dec 4, 2017
1 parent b38ba3b commit a2bb2d8
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ gem 'jbuilder', '~> 2.5'
# Use Capistrano for deployment
# gem 'capistrano-rails', group: :development

gem 'ruby-kafka'

group :production do
# For using an SQS based worker beanstalk environment
gem 'active_elastic_job'
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ GEM
json
multipart-post
oauth2
ruby-kafka (0.5.1)
ruby-progressbar (1.9.0)
ruby_dep (1.5.0)
rubyzip (1.2.1)
Expand Down Expand Up @@ -781,6 +782,7 @@ DEPENDENCIES
rsolr (>= 1.0)
rspec-rails
rubocop
ruby-kafka
sass-rails (~> 5.0)
selenium-webdriver
solr_wrapper (>= 0.3)
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@ include Suri::Druid
```

to the model in order to mint the ID as a DRUID instead of a Fedora UUID.

# Running Streaming

See [https://kafka.apache.org/quickstart]

Start Zookeeper
```
bin/zookeeper-server-start.sh config/zookeeper.properties
```

Start Kafka
```
bin/kafka-server-start.sh config/server.properties
```
32 changes: 32 additions & 0 deletions app/actors/notify_kafka_actor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Sends messages to Kafka when a work is created
class NotifyKafkaActor < Hyrax::Actors::AbstractActor
# @param [Hyrax::Actors::Environment] env
# @return [Boolean] true if create was successful
def create(env)
next_actor.create(env) && publish_to_kafka('created', env)
end

private

# @param action [String] what action was taken
# @param env [Environment] the environment of the action
def publish_to_kafka(action, env)
kafka.deliver_message(message(action, env),
topic: Settings.kafka.topic)
true
end

def kafka
Kafka.new(seed_brokers: Settings.kafka.seed_brokers,
client_id: Settings.kafka.client_id)
end

# @return [String] a JSON encoded message to send to Kafka
def message(action, env)
{ action: action, work: work_message(env) }.to_json
end

def work_message(env)
{ id: env.curation_concern.id, title: env.curation_concern.title }
end
end
2 changes: 2 additions & 0 deletions config/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@

# Initialize the Rails application.
Rails.application.initialize!

Hyrax::CurationConcern.actor_factory.use NotifyKafkaActor
5 changes: 5 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ suri:
username: <%= ENV['SURI_USERNAME'] %>
password: <%= ENV['SURI_PASSWORD'] %>
use_ssl: true
kafka:
seed_brokers:
- 'localhost:9092'
topic: 'test'
client_id: 'hydrox-app'
9 changes: 9 additions & 0 deletions spec/actors/notify_kakfka_actor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require 'rails_helper'

RSpec.describe NotifyKafkaActor do
describe "the middleware" do
subject { Hyrax::CurationConcern.actor_factory.middlewares }

it { is_expected.to include described_class }
end
end
3 changes: 3 additions & 0 deletions spec/features/create_work_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
let(:permission_template) { Hyrax::PermissionTemplate.find_or_create_by!(admin_set_id: admin_set_id) }
let(:workflow) { Sipity::Workflow.create!(active: true, name: 'test-workflow', permission_template: permission_template) }
let(:minter) { instance_double(Suri::Minter, mint: 'oo000oo0000')}
let(:kafka) { instance_double(Kafka::Client, deliver_message: nil) }

before do
allow(Suri::Minter).to receive(:new).and_return(minter)
allow(Kafka).to receive(:new).and_return(kafka)

Sipity::WorkflowAction.create!(name: 'submit', workflow: workflow) # Need to create a single action that can be taken

permission_template_access = Hyrax::PermissionTemplateAccess.create!(permission_template_id: permission_template.id, agent_type: 'user', agent_id: user.user_key, access: 'deposit')
Expand Down

0 comments on commit a2bb2d8

Please sign in to comment.