Skip to content

Commit

Permalink
Merge pull request #32 from bbalser/ack_hack
Browse files Browse the repository at this point in the history
Custom Ack hack - when configured elsa will ack messages directly and not go through the :brod_group_coordinator
  • Loading branch information
Brian Balser authored Aug 21, 2019
2 parents e5cfbc4 + c31c406 commit 36a1a49
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 15 deletions.
102 changes: 102 additions & 0 deletions lib/elsa/group/direct_acknowledger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Elsa.Group.DirectAcknowledger do
use GenServer
require Logger

@timeout 5_000

def ack(server, member_id, topic, partition, generation_id, offset) do
GenServer.call(server, {:ack, member_id, topic, partition, generation_id, offset})
end

def start_link(opts) do
name = Keyword.fetch!(opts, :name)
GenServer.start_link(__MODULE__, opts, name: name)
end

def init(opts) do
state = %{
client: Keyword.fetch!(opts, :client),
group: Keyword.fetch!(opts, :group)
}

{:ok, state, {:continue, :connect}}
end

def handle_continue(:connect, state) do
with {:ok, {endpoint, conn_config}} <- :brod_client.get_group_coordinator(state.client, state.group),
{:ok, connection} <- :kpro.connect(endpoint, conn_config) do
Logger.debug(fn -> "#{__MODULE__}: Coordinator available for group #{state.group} on client #{state.client}" end)
{:noreply, Map.put(state, :connection, connection)}
else
{:error, reason} when is_list(reason) ->
case Keyword.get(reason, :error_code) do
:coordinator_not_available ->
Process.sleep(1_000)
{:noreply, state, {:continue, :connect}}

_ ->
{:stop, reason, state}
end

{:error, reason} ->
{:stop, reason, state}
end
end

def handle_call({:ack, member_id, topic, partition, generation_id, offset}, _from, state) do
request = make_request_body(state, member_id, topic, partition, generation_id, offset)

with {:ok, response} <- :brod_utils.request_sync(state.connection, request, @timeout),
:ok <- parse_response(response) do
{:reply, :ok, state}
else
{:error, reason} -> {:stop, reason, state}
end
end

defp make_request_body(state, member_id, topic, partition, generation_id, offset) do
partitions = [
%{
partition: partition,
offset: offset + 1,
metadata: "+1/#{:io_lib.format("~p/~p", [node(), self()])}"
}
]

topics = [
%{
topic: topic,
partitions: partitions
}
]

request_body = %{
group_id: state.group,
generation_id: generation_id,
member_id: member_id,
retention_time: -1,
topics: topics
}

:brod_kafka_request.offset_commit(state.connection, request_body)
end

defp parse_response(response) do
case parse_offset_commit_response(response) do
[] -> :ok
errors -> {:error, errors}
end
end

defp parse_offset_commit_response(response) do
response.responses
|> Enum.map(&parse_partition_responses/1)
|> List.flatten()
end

defp parse_partition_responses(%{topic: topic, partition_responses: responses}) do
responses
|> Enum.filter(fn %{error_code: code} -> code != :no_error end)
|> Enum.map(fn %{error_code: code, partition: partition} -> %{topic: topic, error: code, partition: partition} end)
end
end
91 changes: 79 additions & 12 deletions lib/elsa/group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ defmodule Elsa.Group.Manager do
:handler,
:handler_init_args,
:workers,
:generation_id
:generation_id,
:direct_acknowledger_pid
]
end

Expand All @@ -114,8 +115,8 @@ defmodule Elsa.Group.Manager do
Trigger the assignment of workers to a given topic and partition
"""
@spec assignments_received(pid(), term(), integer(), [tuple()]) :: :ok
def assignments_received(pid, _group, generation_id, assignments) do
GenServer.call(pid, {:process_assignments, generation_id, assignments})
def assignments_received(pid, group_member_id, generation_id, assignments) do
GenServer.call(pid, {:process_assignments, group_member_id, generation_id, assignments})
end

@doc """
Expand All @@ -132,8 +133,27 @@ defmodule Elsa.Group.Manager do
"""
@spec ack(String.t(), String.t(), integer(), integer(), integer()) :: :ok
def ack(name, topic, partition, generation_id, offset) do
group_manager = {:via, Registry, {registry(name), __MODULE__}}
GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset})
case direct_ack?(name) do
false ->
group_manager = {:via, Registry, {registry(name), __MODULE__}}
GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset})

true ->
case :ets.lookup(table_name(name), :assignments) do
[{:assignments, member_id, assigned_generation_id}] when assigned_generation_id == generation_id ->
direct_acknowledger = {:via, Registry, {registry(name), Elsa.Group.DirectAcknowledger}}
Elsa.Group.DirectAcknowledger.ack(direct_acknowledger, member_id, topic, partition, generation_id, offset)

_ ->
Logger.warn(
"Invalid generation_id(#{generation_id}), ignoring ack - topic #{topic} partition #{partition} offset #{
offset
}"
)
end

:ok
end
end

@doc """
Expand Down Expand Up @@ -170,6 +190,10 @@ defmodule Elsa.Group.Manager do
workers: %{}
}

table_name = table_name(state.name)
:ets.new(table_name, [:set, :protected, :named_table])
:ets.insert(table_name, {:direct_ack, Keyword.get(opts, :direct_ack, false)})

{:ok, state, {:continue, :start_coordinator}}
end

Expand All @@ -185,23 +209,28 @@ defmodule Elsa.Group.Manager do

Registry.put_meta(registry(state.name), :group_coordinator, group_coordinator_pid)

{:noreply, %{state | client_pid: client_pid, group_coordinator_pid: group_coordinator_pid}}
{:noreply,
%{
state
| client_pid: client_pid,
group_coordinator_pid: group_coordinator_pid,
direct_acknowledger_pid: create_direct_acknowledger(state)
}}
catch
:exit, reason ->
wait_and_stop(reason, state)
end

def handle_call({:process_assignments, generation_id, assignments}, _from, state) do
def handle_call({:process_assignments, member_id, generation_id, assignments}, _from, state) do
case call_lifecycle_assignment_received(state, assignments, generation_id) do
{:error, reason} ->
{:stop, reason, {:error, reason}, state}

:ok ->
new_workers =
Enum.reduce(assignments, state.workers, fn assignment, workers ->
WorkerManager.start_worker(workers, generation_id, assignment, state)
end)
table_name = table_name(state.name)
:ets.insert(table_name, {:assignments, member_id, generation_id})

new_workers = start_workers(state, generation_id, assignments)
{:reply, :ok, %{state | workers: new_workers, generation_id: generation_id}}
end
end
Expand All @@ -210,6 +239,7 @@ defmodule Elsa.Group.Manager do
Logger.info("Assignments revoked for group #{state.group}")
new_workers = WorkerManager.stop_all_workers(state.workers)
:ok = apply(state.assignments_revoked_handler, [])
:ets.delete(table_name(state.name), :assignments)
{:reply, :ok, %{state | workers: new_workers, generation_id: nil}}
end

Expand All @@ -222,7 +252,12 @@ defmodule Elsa.Group.Manager do
{:noreply, %{state | workers: new_workers}}

false ->
Logger.warn("Invalid generation_id, ignoring ack - topic #{topic} parition #{partition} offset #{offset}")
Logger.warn(
"Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{
partition
} offset #{offset}"
)

{:noreply, state}
end
end
Expand All @@ -246,8 +281,40 @@ defmodule Elsa.Group.Manager do
end)
end

defp start_workers(state, generation_id, assignments) do
Enum.reduce(assignments, state.workers, fn assignment, workers ->
WorkerManager.start_worker(workers, generation_id, assignment, state)
end)
end

defp wait_and_stop(reason, state) do
Process.sleep(2_000)
{:stop, reason, state}
end

defp direct_ack?(name) do
case :ets.lookup(table_name(name), :direct_ack) do
[{:direct_ack, result}] -> result
_ -> false
end
end

defp create_direct_acknowledger(state) do
case direct_ack?(state.name) do
false ->
nil

true ->
name = {:via, Registry, {registry(state.name), Elsa.Group.DirectAcknowledger}}

{:ok, direct_acknowledger_pid} =
Elsa.Group.DirectAcknowledger.start_link(name: name, client: state.name, group: state.group)

direct_acknowledger_pid
end
end

defp table_name(name) do
:"#{name}_elsa_table"
end
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%{
"brod": {:hex, :brod, "3.8.0", "069f199ec3dfaade68c73529162f1d21cc58ec492c4535c71e5712b406234075", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"},
"brod": {:hex, :brod, "3.8.1", "74426e2d27989cd7f973599bc209c899dca8588068d27a2a37ca1b4bf8823c2a", [:make, :rebar, :rebar3], [{:kafka_protocol, "2.2.9", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.8", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"},
"checkov": {:hex, :checkov, "0.4.0", "a1974a5885f62f8943a9059f0d5252b222271e0e58dd872ffb60323f2e376433", [:mix], [], "hexpm"},
"crc32cer": {:hex, :crc32cer, "0.1.3", "8984906c4b4fae6aa292c48f286a1c83b19ad44bd102287acb94d696015967ce", [:make, :rebar, :rebar3], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
Expand Down
55 changes: 55 additions & 0 deletions test/integration/elsa/group/direct_acknowledger_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule Elsa.Group.DirectAcknowledgerTest do
use ExUnit.Case
use Divo
import AsyncAssertion

defmodule MessageHandler do
use Elsa.Consumer.MessageHandler

def handle_messages(_messages) do
:ack
end
end

@endpoints Application.get_env(:elsa, :brokers)
@group "group-1a"
@topic "topic-1a"

test "direct acknowledger ack over privately managed connection" do
:ok = Elsa.create_topic(@endpoints, @topic)

{:ok, _elsa_sup_pid} =
Elsa.Group.Supervisor.start_link(
name: :test_direct_acker,
endpoints: @endpoints,
group: @group,
topics: [@topic],
handler: MessageHandler,
direct_ack: true,
config: [
begin_offset: :earliest
]
)

Elsa.produce(@endpoints, @topic, {"key1", "value1"}, partition: 0)

Process.sleep(8_000)

assert_async(fn ->
assert 1 == get_committed_offsets(:test_direct_acker, @group, @topic, 0)
end)
end

defp get_committed_offsets(client, group, topic, partition) do
{:ok, responses} = :brod.fetch_committed_offsets(client, group)

case Enum.find(responses, fn %{topic: t} -> topic == t end) do
nil ->
:undefined

topic ->
partition = Enum.find(topic.partition_responses, fn %{partition: p} -> partition == p end)
partition.offset
end
end
end
Loading

0 comments on commit 36a1a49

Please sign in to comment.