Skip to content

Commit

Permalink
Renamed custom_ack_hack to direct_ack
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Balser committed Aug 21, 2019
1 parent 20eba32 commit c31c406
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Elsa.Group.CustomAcknowledger do
defmodule Elsa.Group.DirectAcknowledger do
use GenServer
require Logger

Expand Down
32 changes: 16 additions & 16 deletions lib/elsa/group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule Elsa.Group.Manager do
:handler_init_args,
:workers,
:generation_id,
:custom_acknowledger_pid
:direct_acknowledger_pid
]
end

Expand Down Expand Up @@ -133,16 +133,16 @@ defmodule Elsa.Group.Manager do
"""
@spec ack(String.t(), String.t(), integer(), integer(), integer()) :: :ok
def ack(name, topic, partition, generation_id, offset) do
case custom_ack_hack?(name) do
case direct_ack?(name) do
false ->
group_manager = {:via, Registry, {registry(name), __MODULE__}}
GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset})

true ->
case :ets.lookup(table_name(name), :assignments) do
[{:assignments, member_id, assigned_generation_id}] when assigned_generation_id == generation_id ->
custom_acknowledger = {:via, Registry, {registry(name), Elsa.Group.CustomAcknowledger}}
Elsa.Group.CustomAcknowledger.ack(custom_acknowledger, member_id, topic, partition, generation_id, offset)
direct_acknowledger = {:via, Registry, {registry(name), Elsa.Group.DirectAcknowledger}}
Elsa.Group.DirectAcknowledger.ack(direct_acknowledger, member_id, topic, partition, generation_id, offset)

_ ->
Logger.warn(
Expand Down Expand Up @@ -192,7 +192,7 @@ defmodule Elsa.Group.Manager do

table_name = table_name(state.name)
:ets.new(table_name, [:set, :protected, :named_table])
:ets.insert(table_name, {:custom_ack_hack, Keyword.get(opts, :custom_ack_hack, false)})
:ets.insert(table_name, {:direct_ack, Keyword.get(opts, :direct_ack, false)})

{:ok, state, {:continue, :start_coordinator}}
end
Expand All @@ -214,7 +214,7 @@ defmodule Elsa.Group.Manager do
state
| client_pid: client_pid,
group_coordinator_pid: group_coordinator_pid,
custom_acknowledger_pid: create_custom_acknowledger(state)
direct_acknowledger_pid: create_direct_acknowledger(state)
}}
catch
:exit, reason ->
Expand Down Expand Up @@ -292,29 +292,29 @@ defmodule Elsa.Group.Manager do
{:stop, reason, state}
end

defp custom_ack_hack?(name) do
case :ets.lookup(table_name(name), :custom_ack_hack) do
[{:custom_ack_hack, result}] -> result
defp direct_ack?(name) do
case :ets.lookup(table_name(name), :direct_ack) do
[{:direct_ack, result}] -> result
_ -> false
end
end

defp create_custom_acknowledger(state) do
case custom_ack_hack?(state.name) do
defp create_direct_acknowledger(state) do
case direct_ack?(state.name) do
false ->
nil

true ->
name = {:via, Registry, {registry(state.name), Elsa.Group.CustomAcknowledger}}
name = {:via, Registry, {registry(state.name), Elsa.Group.DirectAcknowledger}}

{:ok, custom_acknowledger_pid} =
Elsa.Group.CustomAcknowledger.start_link(name: name, client: state.name, group: state.group)
{:ok, direct_acknowledger_pid} =
Elsa.Group.DirectAcknowledger.start_link(name: name, client: state.name, group: state.group)

custom_acknowledger_pid
direct_acknowledger_pid
end
end

defp table_name(name) do
:"#{name}_hack_table"
:"#{name}_elsa_table"
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Elsa.Group.CustomAcknowledgerTest do
defmodule Elsa.Group.DirectAcknowledgerTest do
use ExUnit.Case
use Divo
import AsyncAssertion
Expand All @@ -15,17 +15,17 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
@group "group-1a"
@topic "topic-1a"

test "custom acknowledger ack over privately managed connection" do
test "direct acknowledger ack over privately managed connection" do
:ok = Elsa.create_topic(@endpoints, @topic)

{:ok, _elsa_sup_pid} =
Elsa.Group.Supervisor.start_link(
name: :test_custom_acker,
name: :test_direct_acker,
endpoints: @endpoints,
group: @group,
topics: [@topic],
handler: MessageHandler,
custom_ack_hack: true,
direct_ack: true,
config: [
begin_offset: :earliest
]
Expand All @@ -36,7 +36,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
Process.sleep(8_000)

assert_async(fn ->
assert 1 == get_committed_offsets(:test_custom_acker, @group, @topic, 0)
assert 1 == get_committed_offsets(:test_direct_acker, @group, @topic, 0)
end)
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
defmodule Elsa.Group.CustomAcknowledgerTest do
defmodule Elsa.Group.DirectAcknowledgerTest do
use ExUnit.Case
use Placebo
import AsyncAssertion

alias Elsa.Group.CustomAcknowledger
alias Elsa.Group.DirectAcknowledger

@client :brod_client
@group "group1"
Expand All @@ -23,7 +23,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request
allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, %{responses: []}}

{:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
{:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
on_exit(fn -> wait(pid) end)

[pid: pid]
Expand All @@ -43,7 +43,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
generation_id = 7
offset = 32

:ok = CustomAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset)
:ok = DirectAcknowledger.ack(pid, member_id, topic, partition, generation_id, offset)

assert_called :brod_utils.request_sync(:connection, :offset_commit_kafka_request, 5_000)
end
Expand All @@ -53,7 +53,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
test "dies when unable to find group_coordinator" do
allow :brod_client.get_group_coordinator(any(), any()), return: {:error, :something_went_wrong}

{:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
{:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
on_exit(fn -> wait(pid) end)

assert_receive {:EXIT, ^pid, :something_went_wrong}
Expand All @@ -63,7 +63,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
allow :brod_client.get_group_coordinator(any(), any()),
seq: [{:error, [error_code: :coordinator_not_available]}, {:error, :something_went_wrong}]

{:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
{:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
on_exit(fn -> wait(pid) end)

assert_receive {:EXIT, ^pid, :something_went_wrong}, 2_000
Expand All @@ -77,7 +77,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do

allow :kpro.connect(any(), any()), return: {:error, :bad_connection}

{:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
{:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
on_exit(fn -> wait(pid) end)

assert_receive {:EXIT, ^pid, :bad_connection}
Expand All @@ -92,7 +92,7 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
allow :kpro.connect(any(), any()), return: {:ok, :connection}
allow :brod_kafka_request.offset_commit(any(), any()), return: :offset_commit_kafka_request

{:ok, pid} = CustomAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
{:ok, pid} = DirectAcknowledger.start_link(name: __MODULE__, client: @client, group: @group)
on_exit(fn -> wait(pid) end)
[pid: pid]
end
Expand All @@ -101,8 +101,8 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
allow :brod_utils.request_sync(any(), any(), any()), return: {:error, "some reason"}

try do
CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32)
flunk("Should have exited custom acknowledger")
DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32)
flunk("Should have exited direct acknowledger")
catch
:exit, _ -> nil
end
Expand All @@ -124,8 +124,8 @@ defmodule Elsa.Group.CustomAcknowledgerTest do
allow :brod_utils.request_sync(any(), any(), any()), return: {:ok, response}

try do
CustomAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32)
flunk("Should have exited custom acknowledger")
DirectAcknowledger.ack(pid, :member_id, "topic1", 0, 7, 32)
flunk("Should have exited direct acknowledger")
catch
:exit, _ -> nil
end
Expand Down
2 changes: 1 addition & 1 deletion test/unit/elsa/group/lifecycle_hooks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Elsa.Group.LifecycleHooksTest do
allow WorkerManager.start_worker(any(), any(), any(), any()), return: :workers
allow WorkerManager.stop_all_workers(any()), return: :workers

:ets.new(:fake_test_name_hack_table, [:set, :public, :named_table])
:ets.new(:fake_test_name_elsa_table, [:set, :public, :named_table])

test_pid = self()

Expand Down

1 comment on commit c31c406

@jeffgrunewald
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Please sign in to comment.