From 9da7fa950df805b4f5ff5ddadbe8cd912bae0db0 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Mon, 19 Aug 2019 16:46:38 -0400 Subject: [PATCH 1/4] Started working on custom ack hack --- lib/elsa/group/custom_acknowledger.ex | 67 ++++++++++++++++ lib/elsa/group/manager.ex | 69 ++++++++++++++-- mix.lock | 2 +- .../elsa/group/custom_acknowledger_test.exs | 55 +++++++++++++ .../elsa/group/custom_acknowledger_test.exs | 79 +++++++++++++++++++ 5 files changed, 264 insertions(+), 8 deletions(-) create mode 100644 lib/elsa/group/custom_acknowledger.ex create mode 100644 test/integration/elsa/group/custom_acknowledger_test.exs create mode 100644 test/unit/elsa/group/custom_acknowledger_test.exs diff --git a/lib/elsa/group/custom_acknowledger.ex b/lib/elsa/group/custom_acknowledger.ex new file mode 100644 index 0000000..8cb08be --- /dev/null +++ b/lib/elsa/group/custom_acknowledger.ex @@ -0,0 +1,67 @@ +defmodule Elsa.Group.CustomAcknowledger do + use GenServer + + @timeout 5_000 + + def ack(server, member_id, topic, partition, generation_id, offset) do + GenServer.call(server, {:ack, member_id, topic, partition, generation_id, offset}) + end + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + def init(opts) do + state = %{ + client: Keyword.fetch!(opts, :client), + group: Keyword.fetch!(opts, :group) + } + + {:ok, state, {:continue, :connect}} + end + + def handle_continue(:connect, state) do + with {:ok, {endpoint, conn_config}} <- :brod_client.get_group_coordinator(state.client, state.group), + {:ok, connection} <- :kpro.connect(endpoint, conn_config) do + {:noreply, Map.put(state, :connection, connection)} + else + {:error, reason} -> {:stop, reason, state} + end + end + + def handle_call({:ack, member_id, topic, partition, generation_id, offset}, _from, state) do + request = make_request_body(state, member_id, topic, partition, generation_id, offset) + response = :brod_utils.request_sync(state.connection, request, @timeout) + IO.inspect(response, label: "Response") + + {:reply, :ok, state} + end + + defp make_request_body(state, member_id, topic, partition, generation_id, offset) do + partitions = [ + %{ + partition: partition, + offset: offset + 1, + metadata: "+1/#{:io_lib.format("~p/~p", [node(), self()])}" + } + ] + + topics = [ + %{ + topic: topic, + partitions: partitions + } + ] + + request_body = %{ + group_id: state.group, + generation_id: generation_id, + member_id: member_id, + retention_time: -1, + topics: topics + } + + :brod_kafka_request.offset_commit(state.connection, request_body) + end +end diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index e8944c3..25b1d40 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -102,7 +102,8 @@ defmodule Elsa.Group.Manager do :handler, :handler_init_args, :workers, - :generation_id + :generation_id, + :custom_acknowledger_pid ] end @@ -114,8 +115,8 @@ defmodule Elsa.Group.Manager do Trigger the assignment of workers to a given topic and partition """ @spec assignments_received(pid(), term(), integer(), [tuple()]) :: :ok - def assignments_received(pid, _group, generation_id, assignments) do - GenServer.call(pid, {:process_assignments, generation_id, assignments}) + def assignments_received(pid, group_member_id, generation_id, assignments) do + GenServer.call(pid, {:process_assignments, group_member_id, generation_id, assignments}) end @doc """ @@ -132,8 +133,23 @@ defmodule Elsa.Group.Manager do """ @spec ack(String.t(), String.t(), integer(), integer(), integer()) :: :ok def ack(name, topic, partition, generation_id, offset) do - group_manager = {:via, Registry, {registry(name), __MODULE__}} - GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) + case custom_ack_hack?(name) do + false -> + group_manager = {:via, Registry, {registry(name), __MODULE__}} + GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) + + true -> + case :ets.lookup(table_name(name), :assignments) do + [{:assignments, member_id, assigned_generation_id}] when assigned_generation_id == generation_id -> + custom_acknowledger = {:via, Registry, {registry(name), Elsa.Group.CustomAcknowledger}} + Elsa.Group.CustomAcknowledger.ack(custom_acknowledger, member_id, topic, partition, generation_id, offset) + + [] -> + Logger.warn("Invalid generation_id, ignoring ack - topic #{topic} parition #{partition} offset #{offset}") + end + + :ok + end end @doc """ @@ -170,6 +186,10 @@ defmodule Elsa.Group.Manager do workers: %{} } + table_name = table_name(state.name) + :ets.new(table_name, [:set, :protected, :named_table]) + :ets.insert(table_name, {:custom_ack_hack, Keyword.get(opts, :custom_ack_hack, false)}) + {:ok, state, {:continue, :start_coordinator}} end @@ -185,18 +205,27 @@ defmodule Elsa.Group.Manager do Registry.put_meta(registry(state.name), :group_coordinator, group_coordinator_pid) - {:noreply, %{state | client_pid: client_pid, group_coordinator_pid: group_coordinator_pid}} + {:noreply, + %{ + state + | client_pid: client_pid, + group_coordinator_pid: group_coordinator_pid, + custom_acknowledger_pid: create_custom_acknowledger(state) + }} catch :exit, reason -> wait_and_stop(reason, state) end - def handle_call({:process_assignments, generation_id, assignments}, _from, state) do + def handle_call({:process_assignments, member_id, generation_id, assignments}, _from, state) do case call_lifecycle_assignment_received(state, assignments, generation_id) do {:error, reason} -> {:stop, reason, {:error, reason}, state} :ok -> + table_name = table_name(state.name) + :ets.insert(table_name, {:assignments, member_id, generation_id}) + new_workers = Enum.reduce(assignments, state.workers, fn assignment, workers -> WorkerManager.start_worker(workers, generation_id, assignment, state) @@ -210,6 +239,7 @@ defmodule Elsa.Group.Manager do Logger.info("Assignments revoked for group #{state.group}") new_workers = WorkerManager.stop_all_workers(state.workers) :ok = apply(state.assignments_revoked_handler, []) + :ets.delete(table_name(state.name), :assignments) {:reply, :ok, %{state | workers: new_workers, generation_id: nil}} end @@ -250,4 +280,29 @@ defmodule Elsa.Group.Manager do Process.sleep(2_000) {:stop, reason, state} end + + defp custom_ack_hack?(name) do + case :ets.lookup(table_name(name), :custom_ack_hack) do + [{:custom_ack_hack, result}] -> result + _ -> false + end + end + + defp create_custom_acknowledger(state) do + case custom_ack_hack?(state.name) do + false -> + nil + + true -> + name = {:via, Registry, {registry(state.name), Elsa.Group.CustomAcknowledger}} + + {:ok, custom_acknowledger_pid} = + Elsa.Group.CustomAcknowledger.start_link(name: name, client: state.name, group: state.group) + custom_acknowledger_pid + end + end + + defp table_name(name) do + :"#{name}_hack_table" + end end diff --git a/mix.lock b/mix.lock index ba14a76..632cce4 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,5 @@ %{ - "brod": {:hex, :brod, "3.8.0", "069f199ec3dfaade68c73529162f1d21cc58ec492c4535c71e5712b406234075", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"}, + "brod": {:hex, :brod, "3.8.1", "74426e2d27989cd7f973599bc209c899dca8588068d27a2a37ca1b4bf8823c2a", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"}, "checkov": {:hex, :checkov, "0.4.0", "a1974a5885f62f8943a9059f0d5252b222271e0e58dd872ffb60323f2e376433", [:mix], [], "hexpm"}, "crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/integration/elsa/group/custom_acknowledger_test.exs b/test/integration/elsa/group/custom_acknowledger_test.exs new file mode 100644 index 0000000..9ed1548 --- /dev/null +++ b/test/integration/elsa/group/custom_acknowledger_test.exs @@ -0,0 +1,55 @@ +defmodule Elsa.Group.CustomAcknowledgerTest do + use ExUnit.Case + use Divo + import AsyncAssertion + + defmodule MessageHandler do + use Elsa.Consumer.MessageHandler + + def handle_messages(_messages) do + :ack + end + end + + @endpoints Application.get_env(:elsa, :brokers) + @group "group-1a" + @topic "topic-1a" + + test "custom acknowledger ack over privately managed connection" do + :ok = Elsa.create_topic(@endpoints, @topic) + + {:ok, _elsa_sup_pid} = + Elsa.Group.Supervisor.start_link( + name: :test_custom_acker, + endpoints: @endpoints, + group: @group, + topics: [@topic], + handler: MessageHandler, + custom_ack_hack: true, + config: [ + begin_offset: :earliest + ] + ) + + Elsa.produce(@endpoints, @topic, {"key1", "value1"}, partition: 0) + + Process.sleep(8_000) + + assert_async(fn -> + assert 1 == get_committed_offsets(:test_custom_acker, @group, @topic, 0) + end) + end + + defp get_committed_offsets(client, group, topic, partition) do + {:ok, responses} = :brod.fetch_committed_offsets(client, group) + + case Enum.find(responses, fn %{topic: t} -> topic == t end) do + nil -> + :undefined + + topic -> + partition = Enum.find(topic.partition_responses, fn %{partition: p} -> partition == p end) + partition.offset + end + end +end diff --git a/test/unit/elsa/group/custom_acknowledger_test.exs b/test/unit/elsa/group/custom_acknowledger_test.exs new file mode 100644 index 0000000..c09076a --- /dev/null +++ b/test/unit/elsa/group/custom_acknowledger_test.exs @@ -0,0 +1,79 @@ +defmodule Elsa.Group.CustomAcknowledgerTest do + use ExUnit.Case + use Placebo + import AsyncAssertion + + alias Elsa.Group.CustomAcknowledger + + @client :brod_client + @group "group1" + + describe "ack/6 - happy path" do + setup do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:ok, :connection} + allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request + allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, :response} + + {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + [pid: pid] + end + + test "creates connection to group coordinator" do + assert_async(fn -> + assert_called :brod_client.get_group_coordinator(@client, @group) + assert_called :kpro.connect(:group_coordinator_endpoint, :group_coordinator_config) + end) + end + + test "ack get sent to group coordinator connection", %{pid: pid} do + member_id = :member_id + topic = "topic1" + partition = 0 + generation_id = 7 + offset = 32 + + :ok = CustomAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset) + + assert_called :brod_utils.request_sync(:connection, :offset_commit_kafka_request, 5_000) + end + end + + describe "ack/6 - exception paths" do + setup do + Process.flag(:trap_exit, true) + :ok + end + + test "dies when unable to find group_coordinator" do + allow :brod_client.get_group_coordinator(any(), any()), return: {:error, :something_went_wrong} + + {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :something_went_wrong} + end + + test "dies when unable to connection to group coordinator" do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:error, :bad_connection} + + {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :bad_connection} + end + end + + defp wait(pid) do + ref = Process.monitor(pid) + Process.exit(pid, :normal) + assert_receive {:DOWN, ^ref, _, _, _} + end +end From 523bea3a6c601cd4616005afa17e233d68d8b66c Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Tue, 20 Aug 2019 11:54:22 -0400 Subject: [PATCH 2/4] Parsed error responses from correctly --- lib/elsa/group/custom_acknowledger.ex | 28 ++++++++- lib/elsa/group/manager.ex | 13 ++-- .../elsa/group/custom_acknowledger_test.exs | 63 +++++++++++++++++-- test/unit/elsa/group/lifecycle_hooks_test.exs | 11 +++- 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/lib/elsa/group/custom_acknowledger.ex b/lib/elsa/group/custom_acknowledger.ex index 8cb08be..1247082 100644 --- a/lib/elsa/group/custom_acknowledger.ex +++ b/lib/elsa/group/custom_acknowledger.ex @@ -32,10 +32,13 @@ defmodule Elsa.Group.CustomAcknowledger do def handle_call({:ack, member_id, topic, partition, generation_id, offset}, _from, state) do request = make_request_body(state, member_id, topic, partition, generation_id, offset) - response = :brod_utils.request_sync(state.connection, request, @timeout) - IO.inspect(response, label: "Response") - {:reply, :ok, state} + with {:ok, response} <- :brod_utils.request_sync(state.connection, request, @timeout), + :ok <- parse_response(response) do + {:reply, :ok, state} + else + {:error, reason} -> {:stop, reason, state} + end end defp make_request_body(state, member_id, topic, partition, generation_id, offset) do @@ -64,4 +67,23 @@ defmodule Elsa.Group.CustomAcknowledger do :brod_kafka_request.offset_commit(state.connection, request_body) end + + defp parse_response(response) do + case parse_offset_commit_response(response) do + [] -> :ok + errors -> {:error, errors} + end + end + + defp parse_offset_commit_response(response) do + response.responses + |> Enum.map(&parse_partition_responses/1) + |> List.flatten() + end + + defp parse_partition_responses(%{topic: topic, partition_responses: responses}) do + responses + |> Enum.filter(fn %{error_code: code} -> code != :no_error end) + |> Enum.map(fn %{error_code: code, partition: partition} -> %{topic: topic, error: code, partition: partition} end) + end end diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index 25b1d40..010c7b3 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -226,11 +226,7 @@ defmodule Elsa.Group.Manager do table_name = table_name(state.name) :ets.insert(table_name, {:assignments, member_id, generation_id}) - new_workers = - Enum.reduce(assignments, state.workers, fn assignment, workers -> - WorkerManager.start_worker(workers, generation_id, assignment, state) - end) - + new_workers = start_workers(state, generation_id, assignments) {:reply, :ok, %{state | workers: new_workers, generation_id: generation_id}} end end @@ -276,6 +272,12 @@ defmodule Elsa.Group.Manager do end) end + defp start_workers(state, generation_id, assignments) do + Enum.reduce(assignments, state.workers, fn assignment, workers -> + WorkerManager.start_worker(workers, generation_id, assignment, state) + end) + end + defp wait_and_stop(reason, state) do Process.sleep(2_000) {:stop, reason, state} @@ -298,6 +300,7 @@ defmodule Elsa.Group.Manager do {:ok, custom_acknowledger_pid} = Elsa.Group.CustomAcknowledger.start_link(name: name, client: state.name, group: state.group) + custom_acknowledger_pid end end diff --git a/test/unit/elsa/group/custom_acknowledger_test.exs b/test/unit/elsa/group/custom_acknowledger_test.exs index c09076a..e35bbab 100644 --- a/test/unit/elsa/group/custom_acknowledger_test.exs +++ b/test/unit/elsa/group/custom_acknowledger_test.exs @@ -7,6 +7,12 @@ defmodule Elsa.Group.CustomAcknowledgerTest do @client :brod_client @group "group1" + @moduletag :capture_log + + setup do + Process.flag(:trap_exit, true) + :ok + end describe "ack/6 - happy path" do setup do @@ -15,7 +21,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :kpro.connect(any(), any()), return: {:ok, :connection} allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request - allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, :response} + allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, %{responses: []}} {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) @@ -44,11 +50,6 @@ defmodule Elsa.Group.CustomAcknowledgerTest do end describe "ack/6 - exception paths" do - setup do - Process.flag(:trap_exit, true) - :ok - end - test "dies when unable to find group_coordinator" do allow :brod_client.get_group_coordinator(any(), any()), return: {:error, :something_went_wrong} @@ -71,6 +72,56 @@ defmodule Elsa.Group.CustomAcknowledgerTest do end end + describe "bad responses from ack/6" do + setup do + allow :brod_client.get_group_coordinator(any(), any()), + return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} + + allow :kpro.connect(any(), any()), return: {:ok, :connection} + allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request + + {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + [pid: pid] + end + + test "process dies when error in talking to coordinator", %{pid: pid} do + allow :brod_utils.request_sync(any(), any(), any()), return: {:error, "some reason"} + + try do + CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited custom acknowledger") + catch + :exit, _ -> nil + end + + assert_receive {:EXIT, ^pid, "some reason"} + end + + test "process dies when ack response contains errors", %{pid: pid} do + response = %{ + responses: [ + %{topic: "topic2", partition_responses: [%{error_code: :no_error, partition: 0}]}, + %{ + topic: "topic1", + partition_responses: [%{error_code: :no_error, partition: 0}, %{error_code: :bad_stuff, partition: 1}] + } + ] + } + + allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, response} + + try do + CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited custom acknowledger") + catch + :exit, _ -> nil + end + + assert_receive {:EXIT, ^pid, [%{topic: "topic1", partition: 1, error: :bad_stuff}]} + end + end + defp wait(pid) do ref = Process.monitor(pid) Process.exit(pid, :normal) diff --git a/test/unit/elsa/group/lifecycle_hooks_test.exs b/test/unit/elsa/group/lifecycle_hooks_test.exs index 7e0db60..98c85fb 100644 --- a/test/unit/elsa/group/lifecycle_hooks_test.exs +++ b/test/unit/elsa/group/lifecycle_hooks_test.exs @@ -12,9 +12,12 @@ defmodule Elsa.Group.LifecycleHooksTest do allow WorkerManager.start_worker(any(), any(), any(), any()), return: :workers allow WorkerManager.stop_all_workers(any()), return: :workers + :ets.new(:fake_test_name_hack_table, [:set, :public, :named_table]) + test_pid = self() state = %{ + name: :fake_test_name, workers: :workers, group: "group1", assignment_received_handler: fn group, topic, partition, generation_id -> @@ -38,7 +41,7 @@ defmodule Elsa.Group.LifecycleHooksTest do ] {:reply, :ok, ^state} = - Elsa.Group.Manager.handle_call({:process_assignments, :generation_id, assignments}, self(), state) + Elsa.Group.Manager.handle_call({:process_assignments, :member_id, :generation_id, assignments}, self(), state) assert_received {:assignment_received, "group1", "topic1", 0, :generation_id} assert_received {:assignment_received, "group1", "topic1", 1, :generation_id} @@ -53,7 +56,11 @@ defmodule Elsa.Group.LifecycleHooksTest do ] {:stop, :some_reason, {:error, :some_reason}, ^error_state} = - Elsa.Group.Manager.handle_call({:process_assignments, :generation_id, assignments}, self(), error_state) + Elsa.Group.Manager.handle_call( + {:process_assignments, :member_id, :generation_id, assignments}, + self(), + error_state + ) refute_called WorkerManager.start_worker(any(), any(), any(), any()) end From 20eba320157da2fec581e5bd6d2d155520edebc0 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 21 Aug 2019 12:06:25 -0400 Subject: [PATCH 3/4] Added some logging and retry logic around connecting to the group coordinator --- lib/elsa/group/custom_acknowledger.ex | 15 ++++++++++++++- lib/elsa/group/manager.ex | 15 ++++++++++++--- test/unit/elsa/group/custom_acknowledger_test.exs | 12 ++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/lib/elsa/group/custom_acknowledger.ex b/lib/elsa/group/custom_acknowledger.ex index 1247082..e66115d 100644 --- a/lib/elsa/group/custom_acknowledger.ex +++ b/lib/elsa/group/custom_acknowledger.ex @@ -1,5 +1,6 @@ defmodule Elsa.Group.CustomAcknowledger do use GenServer + require Logger @timeout 5_000 @@ -24,9 +25,21 @@ defmodule Elsa.Group.CustomAcknowledger do def handle_continue(:connect, state) do with {:ok, {endpoint, conn_config}} <- :brod_client.get_group_coordinator(state.client, state.group), {:ok, connection} <- :kpro.connect(endpoint, conn_config) do + Logger.debug(fn -> "#{__MODULE__}: Coordinator available for group #{state.group} on client #{state.client}" end) {:noreply, Map.put(state, :connection, connection)} else - {:error, reason} -> {:stop, reason, state} + {:error, reason} when is_list(reason) -> + case Keyword.get(reason, :error_code) do + :coordinator_not_available -> + Process.sleep(1_000) + {:noreply, state, {:continue, :connect}} + + _ -> + {:stop, reason, state} + end + + {:error, reason} -> + {:stop, reason, state} end end diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index 010c7b3..5945b7c 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -144,8 +144,12 @@ defmodule Elsa.Group.Manager do custom_acknowledger = {:via, Registry, {registry(name), Elsa.Group.CustomAcknowledger}} Elsa.Group.CustomAcknowledger.ack(custom_acknowledger, member_id, topic, partition, generation_id, offset) - [] -> - Logger.warn("Invalid generation_id, ignoring ack - topic #{topic} parition #{partition} offset #{offset}") + _ -> + Logger.warn( + "Invalid generation_id(#{generation_id}), ignoring ack - topic #{topic} partition #{partition} offset #{ + offset + }" + ) end :ok @@ -248,7 +252,12 @@ defmodule Elsa.Group.Manager do {:noreply, %{state | workers: new_workers}} false -> - Logger.warn("Invalid generation_id, ignoring ack - topic #{topic} parition #{partition} offset #{offset}") + Logger.warn( + "Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{ + partition + } offset #{offset}" + ) + {:noreply, state} end end diff --git a/test/unit/elsa/group/custom_acknowledger_test.exs b/test/unit/elsa/group/custom_acknowledger_test.exs index e35bbab..e212877 100644 --- a/test/unit/elsa/group/custom_acknowledger_test.exs +++ b/test/unit/elsa/group/custom_acknowledger_test.exs @@ -59,6 +59,18 @@ defmodule Elsa.Group.CustomAcknowledgerTest do assert_receive {:EXIT, ^pid, :something_went_wrong} end + test "retries to connect to group coordinator when coordinator_not_available error" do + allow :brod_client.get_group_coordinator(any(), any()), + seq: [{:error, [error_code: :coordinator_not_available]}, {:error, :something_went_wrong}] + + {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + on_exit(fn -> wait(pid) end) + + assert_receive {:EXIT, ^pid, :something_went_wrong}, 2_000 + + assert_called :brod_client.get_group_coordinator(any(), any()), times(2) + end + test "dies when unable to connection to group coordinator" do allow :brod_client.get_group_coordinator(any(), any()), return: {:ok, {:group_coordinator_endpoint, :group_coordinator_config}} From c31c406b12553a775cf798f9fc2747fc26f209ea Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 21 Aug 2019 14:06:53 -0400 Subject: [PATCH 4/4] Renamed custom_ack_hack to direct_ack --- ...acknowledger.ex => direct_acknowledger.ex} | 2 +- lib/elsa/group/manager.ex | 32 +++++++++---------- ..._test.exs => direct_acknowledger_test.exs} | 10 +++--- ..._test.exs => direct_acknowledger_test.exs} | 24 +++++++------- test/unit/elsa/group/lifecycle_hooks_test.exs | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) rename lib/elsa/group/{custom_acknowledger.ex => direct_acknowledger.ex} (98%) rename test/integration/elsa/group/{custom_acknowledger_test.exs => direct_acknowledger_test.exs} (82%) rename test/unit/elsa/group/{custom_acknowledger_test.exs => direct_acknowledger_test.exs} (85%) diff --git a/lib/elsa/group/custom_acknowledger.ex b/lib/elsa/group/direct_acknowledger.ex similarity index 98% rename from lib/elsa/group/custom_acknowledger.ex rename to lib/elsa/group/direct_acknowledger.ex index e66115d..d7e272a 100644 --- a/lib/elsa/group/custom_acknowledger.ex +++ b/lib/elsa/group/direct_acknowledger.ex @@ -1,4 +1,4 @@ -defmodule Elsa.Group.CustomAcknowledger do +defmodule Elsa.Group.DirectAcknowledger do use GenServer require Logger diff --git a/lib/elsa/group/manager.ex b/lib/elsa/group/manager.ex index 5945b7c..fbe4434 100644 --- a/lib/elsa/group/manager.ex +++ b/lib/elsa/group/manager.ex @@ -103,7 +103,7 @@ defmodule Elsa.Group.Manager do :handler_init_args, :workers, :generation_id, - :custom_acknowledger_pid + :direct_acknowledger_pid ] end @@ -133,7 +133,7 @@ defmodule Elsa.Group.Manager do """ @spec ack(String.t(), String.t(), integer(), integer(), integer()) :: :ok def ack(name, topic, partition, generation_id, offset) do - case custom_ack_hack?(name) do + case direct_ack?(name) do false -> group_manager = {:via, Registry, {registry(name), __MODULE__}} GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset}) @@ -141,8 +141,8 @@ defmodule Elsa.Group.Manager do true -> case :ets.lookup(table_name(name), :assignments) do [{:assignments, member_id, assigned_generation_id}] when assigned_generation_id == generation_id -> - custom_acknowledger = {:via, Registry, {registry(name), Elsa.Group.CustomAcknowledger}} - Elsa.Group.CustomAcknowledger.ack(custom_acknowledger, member_id, topic, partition, generation_id, offset) + direct_acknowledger = {:via, Registry, {registry(name), Elsa.Group.DirectAcknowledger}} + Elsa.Group.DirectAcknowledger.ack(direct_acknowledger, member_id, topic, partition, generation_id, offset) _ -> Logger.warn( @@ -192,7 +192,7 @@ defmodule Elsa.Group.Manager do table_name = table_name(state.name) :ets.new(table_name, [:set, :protected, :named_table]) - :ets.insert(table_name, {:custom_ack_hack, Keyword.get(opts, :custom_ack_hack, false)}) + :ets.insert(table_name, {:direct_ack, Keyword.get(opts, :direct_ack, false)}) {:ok, state, {:continue, :start_coordinator}} end @@ -214,7 +214,7 @@ defmodule Elsa.Group.Manager do state | client_pid: client_pid, group_coordinator_pid: group_coordinator_pid, - custom_acknowledger_pid: create_custom_acknowledger(state) + direct_acknowledger_pid: create_direct_acknowledger(state) }} catch :exit, reason -> @@ -292,29 +292,29 @@ defmodule Elsa.Group.Manager do {:stop, reason, state} end - defp custom_ack_hack?(name) do - case :ets.lookup(table_name(name), :custom_ack_hack) do - [{:custom_ack_hack, result}] -> result + defp direct_ack?(name) do + case :ets.lookup(table_name(name), :direct_ack) do + [{:direct_ack, result}] -> result _ -> false end end - defp create_custom_acknowledger(state) do - case custom_ack_hack?(state.name) do + defp create_direct_acknowledger(state) do + case direct_ack?(state.name) do false -> nil true -> - name = {:via, Registry, {registry(state.name), Elsa.Group.CustomAcknowledger}} + name = {:via, Registry, {registry(state.name), Elsa.Group.DirectAcknowledger}} - {:ok, custom_acknowledger_pid} = - Elsa.Group.CustomAcknowledger.start_link(name: name, client: state.name, group: state.group) + {:ok, direct_acknowledger_pid} = + Elsa.Group.DirectAcknowledger.start_link(name: name, client: state.name, group: state.group) - custom_acknowledger_pid + direct_acknowledger_pid end end defp table_name(name) do - :"#{name}_hack_table" + :"#{name}_elsa_table" end end diff --git a/test/integration/elsa/group/custom_acknowledger_test.exs b/test/integration/elsa/group/direct_acknowledger_test.exs similarity index 82% rename from test/integration/elsa/group/custom_acknowledger_test.exs rename to test/integration/elsa/group/direct_acknowledger_test.exs index 9ed1548..21244a5 100644 --- a/test/integration/elsa/group/custom_acknowledger_test.exs +++ b/test/integration/elsa/group/direct_acknowledger_test.exs @@ -1,4 +1,4 @@ -defmodule Elsa.Group.CustomAcknowledgerTest do +defmodule Elsa.Group.DirectAcknowledgerTest do use ExUnit.Case use Divo import AsyncAssertion @@ -15,17 +15,17 @@ defmodule Elsa.Group.CustomAcknowledgerTest do @group "group-1a" @topic "topic-1a" - test "custom acknowledger ack over privately managed connection" do + test "direct acknowledger ack over privately managed connection" do :ok = Elsa.create_topic(@endpoints, @topic) {:ok, _elsa_sup_pid} = Elsa.Group.Supervisor.start_link( - name: :test_custom_acker, + name: :test_direct_acker, endpoints: @endpoints, group: @group, topics: [@topic], handler: MessageHandler, - custom_ack_hack: true, + direct_ack: true, config: [ begin_offset: :earliest ] @@ -36,7 +36,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do Process.sleep(8_000) assert_async(fn -> - assert 1 == get_committed_offsets(:test_custom_acker, @group, @topic, 0) + assert 1 == get_committed_offsets(:test_direct_acker, @group, @topic, 0) end) end diff --git a/test/unit/elsa/group/custom_acknowledger_test.exs b/test/unit/elsa/group/direct_acknowledger_test.exs similarity index 85% rename from test/unit/elsa/group/custom_acknowledger_test.exs rename to test/unit/elsa/group/direct_acknowledger_test.exs index e212877..ddb6000 100644 --- a/test/unit/elsa/group/custom_acknowledger_test.exs +++ b/test/unit/elsa/group/direct_acknowledger_test.exs @@ -1,9 +1,9 @@ -defmodule Elsa.Group.CustomAcknowledgerTest do +defmodule Elsa.Group.DirectAcknowledgerTest do use ExUnit.Case use Placebo import AsyncAssertion - alias Elsa.Group.CustomAcknowledger + alias Elsa.Group.DirectAcknowledger @client :brod_client @group "group1" @@ -23,7 +23,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, %{responses: []}} - {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) [pid: pid] @@ -43,7 +43,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do generation_id = 7 offset = 32 - :ok = CustomAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset) + :ok = DirectAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset) assert_called :brod_utils.request_sync(:connection, :offset_commit_kafka_request, 5_000) end @@ -53,7 +53,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do test "dies when unable to find group_coordinator" do allow :brod_client.get_group_coordinator(any(), any()), return: {:error, :something_went_wrong} - {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) assert_receive {:EXIT, ^pid, :something_went_wrong} @@ -63,7 +63,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :brod_client.get_group_coordinator(any(), any()), seq: [{:error, [error_code: :coordinator_not_available]}, {:error, :something_went_wrong}] - {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) assert_receive {:EXIT, ^pid, :something_went_wrong}, 2_000 @@ -77,7 +77,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :kpro.connect(any(), any()), return: {:error, :bad_connection} - {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) assert_receive {:EXIT, ^pid, :bad_connection} @@ -92,7 +92,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :kpro.connect(any(), any()), return: {:ok, :connection} allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request - {:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) + {:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group) on_exit(fn -> wait(pid) end) [pid: pid] end @@ -101,8 +101,8 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :brod_utils.request_sync(any(), any(), any()), return: {:error, "some reason"} try do - CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) - flunk("Should have exited custom acknowledger") + DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited direct acknowledger") catch :exit, _ -> nil end @@ -124,8 +124,8 @@ defmodule Elsa.Group.CustomAcknowledgerTest do allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, response} try do - CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) - flunk("Should have exited custom acknowledger") + DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32) + flunk("Should have exited direct acknowledger") catch :exit, _ -> nil end diff --git a/test/unit/elsa/group/lifecycle_hooks_test.exs b/test/unit/elsa/group/lifecycle_hooks_test.exs index 98c85fb..44b32e0 100644 --- a/test/unit/elsa/group/lifecycle_hooks_test.exs +++ b/test/unit/elsa/group/lifecycle_hooks_test.exs @@ -12,7 +12,7 @@ defmodule Elsa.Group.LifecycleHooksTest do allow WorkerManager.start_worker(any(), any(), any(), any()), return: :workers allow WorkerManager.stop_all_workers(any()), return: :workers - :ets.new(:fake_test_name_hack_table, [:set, :public, :named_table]) + :ets.new(:fake_test_name_elsa_table, [:set, :public, :named_table]) test_pid = self()