diff --git a/lib/elsa/group/direct_acknowledger.ex b/lib/elsa/group/direct_acknowledger.ex new file mode 100644 index 0000000..d7e272a --- /dev/null +++ b/lib/elsa/group/direct_acknowledger.ex @@ -0,0 +1,102 @@ +defmodule Elsa.Group.DirectAcknowledger do + use GenServer + require Logger + + @timeout 5_000 + + def ack(server, member_id, topic, partition, generation_id, offset) do + GenServer.call(server, {:ack, member_id, topic, partition, generation_id, offset}) + end + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + def init(opts) do + state = %{ + client: Keyword.fetch!(opts, :client), + group: Keyword.fetch!(opts, :group) + } + + {:ok, state, {:continue, :connect}} + end + + def handle_continue(:connect, state) do + with {:ok, {endpoint, conn_config}} <- :brod_client.get_group_coordinator(state.client, state.group), + {:ok, connection} <- :kpro.connect(endpoint, conn_config) do + Logger.debug(fn -> "#{__MODULE__}: Coordinator available for group #{state.group} on client #{state.client}" end) + {:noreply, Map.put(state, :connection, connection)} + else + {:error, reason} when is_list(reason) -> + case Keyword.get(reason, :error_code) do + :coordinator_not_available -> + Process.sleep(1_000) + {:noreply, state, {:continue, :connect}} + + _ -> + {:stop, reason, state} + end + + {:error, reason} -> + {:stop, reason, state} + end + end + + def handle_call({:ack, member_id, topic, partition, generation_id, offset}, _from, state) do + request = make_request_body(state, member_id, topic, partition, generation_id, offset) + + with {:ok, response} <- :brod_utils.request_sync(state.connection, request, @timeout), + :ok <- parse_response(response) do + {:reply, :ok, state} + else + {:error, reason} -> {:stop, reason, state} + end + end + + defp make_request_body(state, member_id, topic, partition, generation_id, offset) do + partitions = [ + %{ + partition: partition, + offset: offset + 1, + metadata: "+1/#{:io_lib.format("~p/~p", [node(), self()])}" + } + ] + + topics = [ + %{ + topic: topic, + partitions: partitions + } + ] + + request_body = %{ + group_id: state.group, + generation_id: generation_id, + member_id: member_id, + retention_time: -1, + topics: topics + } + + :brod_kafka_request.offset_commit(state.connection, request_body) + end + + defp parse_response(response) do + case parse_offset_commit_response(response) do + [] -> :ok + errors -> {:error, errors} + end + end + + defp parse_offset_commit_response(response) do + response.responses + |> Enum.map(&parse_partition_responses/1) + |> List.flatten() + end + + defp parse_partition_responses(%{topic: topic, partition_responses: responses}) do + responses + |> Enum.filter(fn %{error_code: code} -> code != :no_error end) + |> Enum.map(fn %{error_code: code, partition: partition} -> %{topic: topic, error: code, partition: partition} end) + end +end diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index e8944c3..fbe4434 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -102,7 +102,8 @@ defmodule Elsa.Group.Manager do :handler, :handler_init_args, :workers, - :generation_id + :generation_id, + :direct_acknowledger_pid ] end @@ -114,8 +115,8 @@ defmodule Elsa.Group.Manager do Trigger the assignment of workers to a given topic and partition """ @spec assignments_received(pid(), term(), integer(), [tuple()]) :: :ok - def assignments_received(pid, _group, generation_id, assignments) do - GenServer.call(pid, {:process_assignments, generation_id, assignments}) + def assignments_received(pid, group_member_id, generation_id, assignments) do + GenServer.call(pid, {:process_assignments, group_member_id, generation_id, assignments}) end @doc """ @@ -132,8 +133,27 @@ defmodule Elsa.Group.Manager do """ @spec ack(String.t(), String.t(), integer(), integer(), integer()) :: :ok def ack(name, topic, partition, generation_id, offset) do - group_manager = {:via, Registry, {registry(name), __MODULE__}} - GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) + case direct_ack?(name) do + false -> + group_manager = {:via, Registry, {registry(name), __MODULE__}} + GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) + + true -> + case :ets.lookup(table_name(name), :assignments) do + [{:assignments, member_id, assigned_generation_id}] when assigned_generation_id == generation_id -> + direct_acknowledger = {:via, Registry, {registry(name), Elsa.Group.DirectAcknowledger}} + Elsa.Group.DirectAcknowledger.ack(direct_acknowledger, member_id, topic, partition, generation_id, offset) + + _ -> + Logger.warn( + "Invalid generation_id(#{generation_id}), ignoring ack - topic #{topic} partition #{partition} offset #{ + offset + }" + ) + end + + :ok + end end @doc """ @@ -170,6 +190,10 @@ defmodule Elsa.Group.Manager do workers: %{} } + table_name = table_name(state.name) + :ets.new(table_name, [:set, :protected, :named_table]) + :ets.insert(table_name, {:direct_ack, Keyword.get(opts, :direct_ack, false)}) + {:ok, state, {:continue, :start_coordinator}} end @@ -185,23 +209,28 @@ defmodule Elsa.Group.Manager do Registry.put_meta(registry(state.name), :group_coordinator, group_coordinator_pid) - {:noreply, %{state | client_pid: client_pid, group_coordinator_pid: group_coordinator_pid}} + {:noreply, + %{ + state + | client_pid: client_pid, + group_coordinator_pid: group_coordinator_pid, + direct_acknowledger_pid: create_direct_acknowledger(state) + }} catch :exit, reason -> wait_and_stop(reason, state) end - def handle_call({:process_assignments, generation_id, assignments}, _from, state) do + def handle_call({:process_assignments, member_id, generation_id, assignments}, _from, state) do case call_lifecycle_assignment_received(state, assignments, generation_id) do {:error, reason} -> {:stop, reason, {:error, reason}, state} :ok -> - new_workers = - Enum.reduce(assignments, state.workers, fn assignment, workers -> - WorkerManager.start_worker(workers, generation_id, assignment, state) - end) + table_name = table_name(state.name) + :ets.insert(table_name, {:assignments, member_id, generation_id}) + new_workers = start_workers(state, generation_id, assignments) {:reply, :ok, %{state | workers: new_workers, generation_id: generation_id}} end end @@ -210,6 +239,7 @@ defmodule Elsa.Group.Manager do Logger.info("Assignments revoked for group #{state.group}") new_workers = WorkerManager.stop_all_workers(state.workers) :ok = apply(state.assignments_revoked_handler, []) + :ets.delete(table_name(state.name), :assignments) {:reply, :ok, %{state | workers: new_workers, generation_id: nil}} end @@ -222,7 +252,12 @@ defmodule Elsa.Group.Manager do {:noreply, %{state | workers: new_workers}} false -> - Logger.warn("Invalid generation_id, ignoring ack - topic #{topic} parition #{partition} offset #{offset}") + Logger.warn( + "Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{ + partition + } offset #{offset}" + ) + {:noreply, state} end end @@ -246,8 +281,40 @@ defmodule Elsa.Group.Manager do end) end + defp start_workers(state, generation_id, assignments) do + Enum.reduce(assignments, state.workers, fn assignment, workers -> + WorkerManager.start_worker(workers, generation_id, assignment, state) + end) + end + defp wait_and_stop(reason, state) do Process.sleep(2_000) {:stop, reason, state} end + + defp direct_ack?(name) do + case :ets.lookup(table_name(name), :direct_ack) do + [{:direct_ack, result}] -> result + _ -> false + end + end + + defp create_direct_acknowledger(state) do + case direct_ack?(state.name) do + false -> + nil + + true -> + name = {:via, Registry, {registry(state.name), Elsa.Group.DirectAcknowledger}} + + {:ok, direct_acknowledger_pid} = + Elsa.Group.DirectAcknowledger.start_link(name: name, client: state.name, group: state.group) + + direct_acknowledger_pid + end + end + + defp table_name(name) do + :"#{name}_elsa_table" + end end diff --git a/mix.lock b/mix.lock index ba14a76..632cce4 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "brod": {:hex, :brod, "3.8.0", "069f199ec3dfaade68c73529162f1d21cc58ec492c4535c71e5712b406234075", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"}, + "brod": {:hex, :brod, "3.8.1", "74426e2d27989cd7f973599bc209c899dca8588068d27a2a37ca1b4bf8823c2a", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"}, "checkov": {:hex, :checkov, "0.4.0", "a1974a5885f62f8943a9059f0d5252b222271e0e58dd872ffb60323f2e376433", [:mix], [], "hexpm"}, "crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/integration/elsa/group/direct_acknowledger_test.exs b/test/integration/elsa/group/direct_acknowledger_test.exs new file mode 100644 index 0000000..21244a5 --- /dev/null +++ b/test/integration/elsa/group/direct_acknowledger_test.exs @@ -0,0 +1,55 @@ +defmodule Elsa.Group.DirectAcknowledgerTest do + use ExUnit.Case + use Divo + import AsyncAssertion + + defmodule MessageHandler do + use Elsa.Consumer.MessageHandler + + def handle_messages(_messages) do + :ack + end + end + + @endpoints Application.get_env(:elsa, :brokers) + @group "group-1a" + @topic "topic-1a" + + test "direct acknowledger ack over privately managed connection" do + :ok = Elsa.create_topic(@endpoints, @topic) + + {:ok, _elsa_sup_pid} = + Elsa.Group.Supervisor.start_link( + name: :test_direct_acker, + endpoints: @endpoints, + group: @group, + topics: [@topic], + handler: MessageHandler, + direct_ack: true, + config: [ + begin_offset: :earliest + ] + ) + + Elsa.produce(@endpoints, @topic, {"key1", "value1"}, partition: 0) + + Process.sleep(8_000) + + assert_async(fn -> + assert 1 == get_committed_offsets(:test_direct_acker, @group, @topic, 0) + end) + end + + defp get_committed_offsets(client, group, topic, partition) do + {:ok, responses} = :brod.fetch_committed_offsets(client, group) + + case Enum.find(responses, fn %{topic: t} -> topic == t end) do + nil -> + :undefined + + topic -> + partition = Enum.find(topic.partition_responses, fn %{partition: p} -> partition == p end) + partition.offset + end + end +end diff --git a/test/unit/elsa/group/direct_acknowledger_test.exs b/test/unit/elsa/group/direct_acknowledger_test.exs new file mode 100644 index 0000000..ddb6000 --- /dev/null +++ b/test/unit/elsa/group/direct_acknowledger_test.exs @@ -0,0 +1,142 @@ +defmodule Elsa.Group.DirectAcknowledgerTest do + use ExUnit.Case + use Placebo + import AsyncAssertion + + alias Elsa.Group.DirectAcknowledger + + @client :brod_client + @group "group1" + @moduletag :capture_log + + setup do + Process.flag(:trap_exit, true) + :ok + end + + describe "ack/6 - happy path" do + setup do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:ok, :connection} + allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request + allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, %{responses: []}} + + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + [pid: pid] + end + + test "creates connection to group coordinator" do + assert_async(fn -> + assert_called :brod_client.get_group_coordinator(@client, @group) + assert_called :kpro.connect(:group_coordinator_endpoint, :group_coordinator_config) + end) + end + + test "ack get sent to group coordinator connection", %{pid: pid} do + member_id = :member_id + topic = "topic1" + partition = 0 + generation_id = 7 + offset = 32 + + :ok = DirectAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset) + + assert_called :brod_utils.request_sync(:connection, :offset_commit_kafka_request, 5_000) + end + end + + describe "ack/6 - exception paths" do + test "dies when unable to find group_coordinator" do + allow :brod_client.get_group_coordinator(any(), any()), return: {:error, :something_went_wrong} + + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :something_went_wrong} + end + + test "retries to connect to group coordinator when coordinator_not_available error" do + allow :brod_client.get_group_coordinator(any(), any()), + seq: [{:error, [error_code: :coordinator_not_available]}, {:error, :something_went_wrong}] + + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :something_went_wrong}, 2_000 + + assert_called :brod_client.get_group_coordinator(any(), any()), times(2) + end + + test "dies when unable to connection to group coordinator" do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:error, :bad_connection} + + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :bad_connection} + end + end + + describe "bad responses from ack/6" do + setup do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:ok, :connection} + allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request + + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + [pid: pid] + end + + test "process dies when error in talking to coordinator", %{pid: pid} do + allow :brod_utils.request_sync(any(), any(), any()), return: {:error, "some reason"} + + try do + DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited direct acknowledger") + catch + :exit, _ -> nil + end + + assert_receive {:EXIT, ^pid, "some reason"} + end + + test "process dies when ack response contains errors", %{pid: pid} do + response = %{ + responses: [ + %{topic: "topic2", partition_responses: [%{error_code: :no_error, partition: 0}]}, + %{ + topic: "topic1", + partition_responses: [%{error_code: :no_error, partition: 0}, %{error_code: :bad_stuff, partition: 1}] + } + ] + } + + allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, response} + + try do + DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited direct acknowledger") + catch + :exit, _ -> nil + end + + assert_receive {:EXIT, ^pid, [%{topic: "topic1", partition: 1, error: :bad_stuff}]} + end + end + + defp wait(pid) do + ref = Process.monitor(pid) + Process.exit(pid, :normal) + assert_receive {:DOWN, ^ref, _, _, _} + end +end diff --git a/test/unit/elsa/group/lifecycle_hooks_test.exs b/test/unit/elsa/group/lifecycle_hooks_test.exs index 7e0db60..44b32e0 100644 --- a/test/unit/elsa/group/lifecycle_hooks_test.exs +++ b/test/unit/elsa/group/lifecycle_hooks_test.exs @@ -12,9 +12,12 @@ defmodule Elsa.Group.LifecycleHooksTest do allow WorkerManager.start_worker(any(), any(), any(), any()), return: :workers allow WorkerManager.stop_all_workers(any()), return: :workers + :ets.new(:fake_test_name_elsa_table, [:set, :public, :named_table]) + test_pid = self() state = %{ + name: :fake_test_name, workers: :workers, group: "group1", assignment_received_handler: fn group, topic, partition, generation_id -> @@ -38,7 +41,7 @@ defmodule Elsa.Group.LifecycleHooksTest do ] {:reply, :ok, ^state} = - Elsa.Group.Manager.handle_call({:process_assignments, :generation_id, assignments}, self(), state) + Elsa.Group.Manager.handle_call({:process_assignments, :member_id, :generation_id, assignments}, self(), state) assert_received {:assignment_received, "group1", "topic1", 0, :generation_id} assert_received {:assignment_received, "group1", "topic1", 1, :generation_id} @@ -53,7 +56,11 @@ defmodule Elsa.Group.LifecycleHooksTest do ] {:stop, :some_reason, {:error, :some_reason}, ^error_state} = - Elsa.Group.Manager.handle_call({:process_assignments, :generation_id, assignments}, self(), error_state) + Elsa.Group.Manager.handle_call( + {:process_assignments, :member_id, :generation_id, assignments}, + self(), + error_state + ) refute_called WorkerManager.start_worker(any(), any(), any(), any()) end