Skip to content

Commit

Permalink
Merge pull request #24 from bbalser/async_ack
Browse files Browse the repository at this point in the history
Async ack
  • Loading branch information
Brian Balser authored Jul 3, 2019
2 parents c6a2a9d + 89269a6 commit 286d813
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ by adding `elsa` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:elsa, "~> 0.5.1"}
{:elsa, "~> 0.6.0"}
]
end
```
Expand Down
27 changes: 23 additions & 4 deletions lib/elsa/consumer/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Elsa.Consumer.MessageHandler do
processes.
"""
@callback init(term()) :: {:ok, term()}
@callback handle_messages(term(), term()) :: {:ack, term()} | {:no_ack, term()}
@callback handle_messages(term()) :: :ack | :no_ack
@callback handle_messages(term(), term()) :: {:ack, term()} | {:ack, term(), term()} | {:no_ack, term()}
@callback handle_messages(term()) :: :ack | {:ack, term()} | :no_ack

@doc """
Defines the macro for implementing the message handler behaviour
Expand All @@ -24,14 +24,33 @@ defmodule Elsa.Consumer.MessageHandler do
end

def handle_messages(messages, state) do
result = handle_messages(messages)
{result, state}
case handle_messages(messages) do
:ack -> {:ack, state}
{:ack, offset} -> {:ack, offset, state}
:no_ack -> {:no_ack, state}
end
end

def handle_messages(messages) do
:ack
end

def topic() do
Process.get(:elsa_topic)
end

def partition() do
Process.get(:elsa_partition)
end

def generation_id() do
Process.get(:elsa_generation_id)
end

def name() do
Process.get(:elsa_name)
end

defoverridable Elsa.Consumer.MessageHandler
end
end
Expand Down
9 changes: 9 additions & 0 deletions lib/elsa/group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ defmodule Elsa.Group.Manager do
GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset})
end

@doc """
Trigger acknowldgement of processed messages back to the cluster.
"""
@spec ack(String.t(), %{topic: String.t(), partition: integer(), generation_id: integer(), offset: integer()}) :: :ok
def ack(name, %{topic: topic, partition: partition, generation_id: generation_id, offset: offset}) do
ack(name, topic, partition, generation_id, offset)
end

@doc """
Start the group manager process and register a name with the process registry.
"""
Expand Down Expand Up @@ -126,6 +134,7 @@ defmodule Elsa.Group.Manager do
case assignment_generation_id == generation_id do
true ->
:ok = :brod_group_coordinator.ack(state.group_coordinator_pid, generation_id, topic, partition, offset)
:ok = :brod.consume_ack(state.name, topic, partition, offset)
new_workers = WorkerManager.update_offset(state.workers, topic, partition, offset)
{:noreply, %{state | workers: new_workers}}

Expand Down
34 changes: 23 additions & 11 deletions lib/elsa/group/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ defmodule Elsa.Group.Worker do
config: Keyword.fetch!(init_args, :config)
}

Process.put(:elsa_name, state.name)
Process.put(:elsa_topic, state.topic)
Process.put(:elsa_partition, state.partition)
Process.put(:elsa_generation_id, state.generation_id)

Registry.register(registry(state.name), :"worker_#{state.topic}_#{state.partition}", nil)

{:ok, handler_state} = state.handler.init(state.handler_init_args)
Expand All @@ -84,10 +89,19 @@ defmodule Elsa.Group.Worker do
end

def handle_info({_consumer_pid, kafka_message_set(topic: topic, partition: partition, messages: messages)}, state) do
{:ack, new_handler_state} = send_messages_to_handler(topic, partition, messages, state)
offset = ack_messages(topic, partition, messages, state)

{:noreply, %{state | offset: offset, handler_state: new_handler_state}}
case send_messages_to_handler(topic, partition, messages, state) do
{:ack, new_handler_state} ->
offset = messages |> List.last() |> kafka_message(:offset)
ack_messages(topic, partition, offset, state)
{:noreply, %{state | offset: offset, handler_state: new_handler_state}}

{:ack, offset, new_handler_state} ->
ack_messages(topic, partition, offset, state)
{:noreply, %{state | offset: offset, handler_state: new_handler_state}}

{:no_ack, new_handler_state} ->
{:noreply, %{state | handler_state: new_handler_state}}
end
end

def handle_call(:unsubscribe, _from, state) do
Expand All @@ -98,26 +112,24 @@ defmodule Elsa.Group.Worker do

defp send_messages_to_handler(topic, partition, messages, state) do
messages
|> Enum.map(&transform_message(topic, partition, &1))
|> Enum.map(&transform_message(topic, partition, state.generation_id, &1))
|> state.handler.handle_messages(state.handler_state)
end

defp ack_messages(topic, partition, messages, state) do
offset = messages |> List.last() |> kafka_message(:offset)

defp ack_messages(topic, partition, offset, state) do
Elsa.Group.Manager.ack(state.name, topic, partition, state.generation_id, offset)
:ok = :brod.consume_ack(state.name, topic, partition, offset)

offset
end

defp transform_message(topic, partition, kafka_message(offset: offset, key: key, value: value)) do
defp transform_message(topic, partition, generation_id, kafka_message(offset: offset, key: key, value: value)) do
%{
topic: topic,
partition: partition,
offset: offset,
key: key,
value: value
value: value,
generation_id: generation_id
}
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Elsa.MixProject do
def project do
[
app: :elsa,
version: "0.5.1",
version: "0.6.0",
elixir: "~> 1.8",
start_permanent: Mix.env() == :prod,
description: description(),
Expand Down
5 changes: 3 additions & 2 deletions test/integration/elsa/group/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ defmodule Elsa.Group.ConsumerTest do
send_messages(["message2"])

assert_async 20, 500, fn ->
assert [%{topic: "elsa-topic", partition: 0, key: "", value: "message2"}] ==
Agent.get(:test_message_store, fn s -> s end)
messages = Agent.get(:test_message_store, fn s -> s end)
assert 1 == length(messages)
assert match?(%{topic: "elsa-topic", partition: 0, key: "", value: "message2"}, List.first(messages))
end

Supervisor.stop(pid)
Expand Down
101 changes: 101 additions & 0 deletions test/unit/elsa/group/worker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Elsa.Group.WorkerTest do
use ExUnit.Case
use Placebo
import Elsa.Group.Worker, only: [kafka_message_set: 1, kafka_message: 1]

describe "handle_info/2" do
setup do
Registry.start_link(keys: :unique, name: Elsa.Group.Supervisor.registry(:test_name))
allow Elsa.Group.Manager.ack(any(), any(), any(), any(), any()), return: :ok
allow :brod.subscribe(any(), any(), any(), any(), any()), return: {:ok, self()}
allow :brod.consume_ack(any(), any(), any(), any()), return: :ok
:ok
end

test "handler can specifiy offset to ack" do
init_args = [
name: :test_name,
topic: "test-topic",
partition: 0,
generation_id: 5,
begin_offset: 13,
handler: Elsa.Group.WorkerTest.Handler1,
handler_init_args: [],
config: []
]

Elsa.Group.Worker.start_link(init_args)

messages =
kafka_message_set(
topic: "test-topic",
partition: 0,
messages: [
kafka_message(offset: 13, key: "key1", value: "value1"),
kafka_message(offset: 14, key: "key2", value: "value2")
]
)

Elsa.Group.Worker.handle_info({:some_pid, messages}, create_state(init_args))

assert_called Elsa.Group.Manager.ack(:test_name, "test-topic", 0, 5, 13)
end

test "handler can say no_ack" do
init_args = [
name: :test_name,
topic: "test-topic",
partition: 0,
generation_id: 5,
begin_offset: 13,
handler: Elsa.Group.WorkerTest.NoAck,
handler_init_args: [],
config: []
]

Elsa.Group.Worker.start_link(init_args)

messages =
kafka_message_set(
topic: "test-topic",
partition: 0,
messages: [
kafka_message(offset: 13, key: "key1", value: "value1"),
kafka_message(offset: 14, key: "key2", value: "value2")
]
)

Elsa.Group.Worker.handle_info({:some_pid, messages}, create_state(init_args))

refute_called Elsa.Group.Manager.ack(:test_name, "test-topic", 0, any(), any())
refute_called :brod.consume_ack(:test_name, "test-topic", 0, any())
end
end

defp create_state(init_args) do
state =
init_args
|> Enum.into(%{})
|> Map.delete(:begin_offset)
|> Map.put(:offset, 13)

struct(Elsa.Group.Worker.State, state)
end
end

defmodule Elsa.Group.WorkerTest.Handler1 do
use Elsa.Consumer.MessageHandler

def handle_messages(messages) do
offset = messages |> List.first() |> Map.get(:offset)
{:ack, offset}
end
end

defmodule Elsa.Group.WorkerTest.NoAck do
use Elsa.Consumer.MessageHandler

def handle_messages(_messages) do
:no_ack
end
end

0 comments on commit 286d813

Please sign in to comment.