diff --git a/README.md b/README.md index 8701f74..d90a77c 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ by adding `elsa` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:elsa, "~> 0.5.1"} + {:elsa, "~> 0.6.0"} ] end ``` diff --git a/lib/elsa/consumer/message_handler.ex b/lib/elsa/consumer/message_handler.ex index 216e661..aedcf14 100644 --- a/lib/elsa/consumer/message_handler.ex +++ b/lib/elsa/consumer/message_handler.ex @@ -5,8 +5,8 @@ defmodule Elsa.Consumer.MessageHandler do processes. """ @callback init(term()) :: {:ok, term()} - @callback handle_messages(term(), term()) :: {:ack, term()} | {:no_ack, term()} - @callback handle_messages(term()) :: :ack | :no_ack + @callback handle_messages(term(), term()) :: {:ack, term()} | {:ack, term(), term()} | {:no_ack, term()} + @callback handle_messages(term()) :: :ack | {:ack, term()} | :no_ack @doc """ Defines the macro for implementing the message handler behaviour @@ -24,14 +24,33 @@ defmodule Elsa.Consumer.MessageHandler do end def handle_messages(messages, state) do - result = handle_messages(messages) - {result, state} + case handle_messages(messages) do + :ack -> {:ack, state} + {:ack, offset} -> {:ack, offset, state} + :no_ack -> {:no_ack, state} + end end def handle_messages(messages) do :ack end + def topic() do + Process.get(:elsa_topic) + end + + def partition() do + Process.get(:elsa_partition) + end + + def generation_id() do + Process.get(:elsa_generation_id) + end + + def name() do + Process.get(:elsa_name) + end + defoverridable Elsa.Consumer.MessageHandler end end diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index 2a2ec70..2d74562 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -62,6 +62,14 @@ defmodule Elsa.Group.Manager do GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) end + @doc """ + Trigger acknowldgement of processed messages back to the cluster. + """ + @spec ack(String.t(), %{topic: String.t(), partition: integer(), generation_id: integer(), offset: integer()}) :: :ok + def ack(name, %{topic: topic, partition: partition, generation_id: generation_id, offset: offset}) do + ack(name, topic, partition, generation_id, offset) + end + @doc """ Start the group manager process and register a name with the process registry. """ @@ -126,6 +134,7 @@ defmodule Elsa.Group.Manager do case assignment_generation_id == generation_id do true -> :ok = :brod_group_coordinator.ack(state.group_coordinator_pid, generation_id, topic, partition, offset) + :ok = :brod.consume_ack(state.name, topic, partition, offset) new_workers = WorkerManager.update_offset(state.workers, topic, partition, offset) {:noreply, %{state | workers: new_workers}} diff --git a/lib/elsa/group/worker.ex b/lib/elsa/group/worker.ex index 2a285f2..a095274 100644 --- a/lib/elsa/group/worker.ex +++ b/lib/elsa/group/worker.ex @@ -65,6 +65,11 @@ defmodule Elsa.Group.Worker do config: Keyword.fetch!(init_args, :config) } + Process.put(:elsa_name, state.name) + Process.put(:elsa_topic, state.topic) + Process.put(:elsa_partition, state.partition) + Process.put(:elsa_generation_id, state.generation_id) + Registry.register(registry(state.name), :"worker_#{state.topic}_#{state.partition}", nil) {:ok, handler_state} = state.handler.init(state.handler_init_args) @@ -84,10 +89,19 @@ defmodule Elsa.Group.Worker do end def handle_info({_consumer_pid, kafka_message_set(topic: topic, partition: partition, messages: messages)}, state) do - {:ack, new_handler_state} = send_messages_to_handler(topic, partition, messages, state) - offset = ack_messages(topic, partition, messages, state) - - {:noreply, %{state | offset: offset, handler_state: new_handler_state}} + case send_messages_to_handler(topic, partition, messages, state) do + {:ack, new_handler_state} -> + offset = messages |> List.last() |> kafka_message(:offset) + ack_messages(topic, partition, offset, state) + {:noreply, %{state | offset: offset, handler_state: new_handler_state}} + + {:ack, offset, new_handler_state} -> + ack_messages(topic, partition, offset, state) + {:noreply, %{state | offset: offset, handler_state: new_handler_state}} + + {:no_ack, new_handler_state} -> + {:noreply, %{state | handler_state: new_handler_state}} + end end def handle_call(:unsubscribe, _from, state) do @@ -98,26 +112,24 @@ defmodule Elsa.Group.Worker do defp send_messages_to_handler(topic, partition, messages, state) do messages - |> Enum.map(&transform_message(topic, partition, &1)) + |> Enum.map(&transform_message(topic, partition, state.generation_id, &1)) |> state.handler.handle_messages(state.handler_state) end - defp ack_messages(topic, partition, messages, state) do - offset = messages |> List.last() |> kafka_message(:offset) - + defp ack_messages(topic, partition, offset, state) do Elsa.Group.Manager.ack(state.name, topic, partition, state.generation_id, offset) - :ok = :brod.consume_ack(state.name, topic, partition, offset) offset end - defp transform_message(topic, partition, kafka_message(offset: offset, key: key, value: value)) do + defp transform_message(topic, partition, generation_id, kafka_message(offset: offset, key: key, value: value)) do %{ topic: topic, partition: partition, offset: offset, key: key, - value: value + value: value, + generation_id: generation_id } end diff --git a/mix.exs b/mix.exs index 1c958ce..091d5d1 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Elsa.MixProject do def project do [ app: :elsa, - version: "0.5.1", + version: "0.6.0", elixir: "~> 1.8", start_permanent: Mix.env() == :prod, description: description(), diff --git a/test/integration/elsa/group/consumer_test.exs b/test/integration/elsa/group/consumer_test.exs index 08c2ebf..5f52af7 100644 --- a/test/integration/elsa/group/consumer_test.exs +++ b/test/integration/elsa/group/consumer_test.exs @@ -41,8 +41,9 @@ defmodule Elsa.Group.ConsumerTest do send_messages(["message2"]) assert_async 20, 500, fn -> - assert [%{topic: "elsa-topic", partition: 0, key: "", value: "message2"}] == - Agent.get(:test_message_store, fn s -> s end) + messages = Agent.get(:test_message_store, fn s -> s end) + assert 1 == length(messages) + assert match?(%{topic: "elsa-topic", partition: 0, key: "", value: "message2"}, List.first(messages)) end Supervisor.stop(pid) diff --git a/test/unit/elsa/group/worker_test.exs b/test/unit/elsa/group/worker_test.exs new file mode 100644 index 0000000..c8c71e5 --- /dev/null +++ b/test/unit/elsa/group/worker_test.exs @@ -0,0 +1,101 @@ +defmodule Elsa.Group.WorkerTest do + use ExUnit.Case + use Placebo + import Elsa.Group.Worker, only: [kafka_message_set: 1, kafka_message: 1] + + describe "handle_info/2" do + setup do + Registry.start_link(keys: :unique, name: Elsa.Group.Supervisor.registry(:test_name)) + allow Elsa.Group.Manager.ack(any(), any(), any(), any(), any()), return: :ok + allow :brod.subscribe(any(), any(), any(), any(), any()), return: {:ok, self()} + allow :brod.consume_ack(any(), any(), any(), any()), return: :ok + :ok + end + + test "handler can specifiy offset to ack" do + init_args = [ + name: :test_name, + topic: "test-topic", + partition: 0, + generation_id: 5, + begin_offset: 13, + handler: Elsa.Group.WorkerTest.Handler1, + handler_init_args: [], + config: [] + ] + + Elsa.Group.Worker.start_link(init_args) + + messages = + kafka_message_set( + topic: "test-topic", + partition: 0, + messages: [ + kafka_message(offset: 13, key: "key1", value: "value1"), + kafka_message(offset: 14, key: "key2", value: "value2") + ] + ) + + Elsa.Group.Worker.handle_info({:some_pid, messages}, create_state(init_args)) + + assert_called Elsa.Group.Manager.ack(:test_name, "test-topic", 0, 5, 13) + end + + test "handler can say no_ack" do + init_args = [ + name: :test_name, + topic: "test-topic", + partition: 0, + generation_id: 5, + begin_offset: 13, + handler: Elsa.Group.WorkerTest.NoAck, + handler_init_args: [], + config: [] + ] + + Elsa.Group.Worker.start_link(init_args) + + messages = + kafka_message_set( + topic: "test-topic", + partition: 0, + messages: [ + kafka_message(offset: 13, key: "key1", value: "value1"), + kafka_message(offset: 14, key: "key2", value: "value2") + ] + ) + + Elsa.Group.Worker.handle_info({:some_pid, messages}, create_state(init_args)) + + refute_called Elsa.Group.Manager.ack(:test_name, "test-topic", 0, any(), any()) + refute_called :brod.consume_ack(:test_name, "test-topic", 0, any()) + end + end + + defp create_state(init_args) do + state = + init_args + |> Enum.into(%{}) + |> Map.delete(:begin_offset) + |> Map.put(:offset, 13) + + struct(Elsa.Group.Worker.State, state) + end +end + +defmodule Elsa.Group.WorkerTest.Handler1 do + use Elsa.Consumer.MessageHandler + + def handle_messages(messages) do + offset = messages |> List.first() |> Map.get(:offset) + {:ack, offset} + end +end + +defmodule Elsa.Group.WorkerTest.NoAck do + use Elsa.Consumer.MessageHandler + + def handle_messages(_messages) do + :no_ack + end +end