Skip to content

Commit

Permalink
Brod 9 acking (#53)
Browse files Browse the repository at this point in the history
Removing acknowledgement bottleneck of `Group.Manager` process during disruptive events such as rebalances.
  • Loading branch information
jeffgrunewald authored Nov 19, 2019
1 parent 3d2b5ea commit 90e50a9
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 101 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The package can be installed by adding `elsa` to your list of dependencies in `m
```elixir
def deps do
[
{:elsa, "~> 0.10"}
{:elsa, "~> 0.11"}
]
end
```
Expand Down Expand Up @@ -49,6 +49,12 @@ config :my_app, :elsa,
config: [
begin_offset: :earliest
]
],
consumer: [
topic: "incoming-stream",
partition: 0,
begin_offset: :earliest,
handler: MyApp.MessageHandler
]
```

Expand All @@ -60,8 +66,8 @@ registry).
Producers may be a single producer or a list of producers, differentiated by their
topic, therefore Elsa allows a one-to-many association of supervision tree to producers.

Consumer groups, in contrast, have a one-to-one relationship to an Elsa supervision
tree, therefore you cannot define a nested list of `group_consumer` keyword arguments
Consumers and consumer groups, in contrast, have a one-to-one relationship to an Elsa supervision
tree, therefore you cannot define a nested list of `consumer` or `group_consumer` keyword arguments
within your Elsa configuration.

## Testing
Expand Down
2 changes: 1 addition & 1 deletion lib/elsa/consumer/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ defmodule Elsa.Consumer.Worker do
end

defp ack_messages(topic, partition, offset, state) do
Elsa.Group.Manager.ack(state.connection, topic, partition, state.generation_id, offset)
Elsa.Group.Acknowledger.ack(state.connection, topic, partition, state.generation_id, offset)

offset
end
Expand Down
117 changes: 117 additions & 0 deletions lib/elsa/group/acknowledger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule Elsa.Group.Acknowledger do
@moduledoc """
Handles acknowledgement of messages to the
group coordinator to prevent the group manager
from queuing up messages for acknowledgement
when events such as a rebalance occur.
"""
use GenServer
require Logger
import Elsa.Supervisor, only: [registry: 1]

@doc """
Trigger acknowledgement of processed messages back to the cluster.
"""
@spec ack(String.t(), Elsa.topic(), Elsa.partition(), Elsa.Group.Manager.generation_id(), integer()) :: :ok
def ack(connection, topic, partition, generation_id, offset) do
acknowledger = {:via, Elsa.Registry, {registry(connection), __MODULE__}}
GenServer.cast(acknowledger, {:ack, topic, partition, generation_id, offset})
end

@doc """
Trigger acknowledgement of processed messages back to the cluster.
"""
@spec ack(String.t(), %{
topic: Elsa.topic(),
partition: Elsa.partition(),
generation_id: Elsa.Group.Manager.generation_id(),
offset: integer()
}) :: :ok
def ack(connection, %{topic: topic, partition: partition, generation_id: generation_id, offset: offset}) do
ack(connection, topic, partition, generation_id, offset)
end

@doc """
Sync the group generation ID back to the acknowledger state for validation.
"""
@spec update_generation_id(GenServer.server(), Elsa.Group.Manager.generation_id()) :: :ok
def update_generation_id(acknowledger, generation_id) do
GenServer.cast(acknowledger, {:update_generation, generation_id})
end

@doc """
Retrieve the latest offset for a topic and partition. Primarily used for reinitializing
consumer workers to the latest unacknowledged offset after a rebalance or other disruption.
"""
@spec get_latest_offset(GenServer.server(), Elsa.topic(), Elsa.partition()) :: Elsa.Group.Manager.begin_offset()
def get_latest_offset(acknowledger, topic, partition) do
GenServer.call(acknowledger, {:get_latest_offset, topic, partition})
end

@doc """
Instantiate an acknowledger process and register it to the Elsa registry.
"""
@spec start_link(term()) :: GenServer.on_start()
def start_link(opts) do
connection = Keyword.fetch!(opts, :connection)
GenServer.start_link(__MODULE__, opts, name: {:via, Elsa.Registry, {registry(connection), __MODULE__}})
end

@impl GenServer
def init(opts) do
connection = Keyword.fetch!(opts, :connection)

state = %{
connection: connection,
current_offsets: %{},
generation_id: nil,
group_coordinator_pid: nil
}

{:ok, state, {:continue, :get_coordinator}}
end

@impl GenServer
def handle_continue(:get_coordinator, state) do
group_coordinator_pid = Elsa.Registry.whereis_name({registry(state.connection), :brod_group_coordinator})

{:noreply, %{state | group_coordinator_pid: group_coordinator_pid}}
end

@impl GenServer
def handle_call({:get_latest_offset, topic, partition}, _pid, %{current_offsets: offsets} = state) do
latest_offset = Map.get(offsets, {topic, partition})

{:reply, latest_offset, state}
end

@impl GenServer
def handle_cast({:update_generation, generation_id}, state) do
{:noreply, %{state | generation_id: generation_id}}
end

@impl GenServer
def handle_cast({:ack, topic, partition, generation_id, offset}, state) do
case state.generation_id == generation_id do
true ->
:ok = :brod_group_coordinator.ack(state.group_coordinator_pid, generation_id, topic, partition, offset)
:ok = Elsa.Consumer.ack(state.connection, topic, partition, offset)

new_offsets = update_offset(state.current_offsets, topic, partition, offset)
{:noreply, %{state | current_offsets: new_offsets}}

false ->
Logger.warn(
"Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{
partition
} offset #{offset}"
)

{:noreply, state}
end
end

defp update_offset(offsets, topic, partition, offset) do
Map.put(offsets, {topic, partition}, offset + 1)
end
end
97 changes: 28 additions & 69 deletions lib/elsa/group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ defmodule Elsa.Group.Manager do
config: consumer_config()
]

@default_delay 5_000

defmodule State do
@moduledoc """
The running state of the consumer group manager process.
Expand All @@ -86,25 +88,32 @@ defmodule Elsa.Group.Manager do
:group,
:topics,
:config,
:group_coordinator_pid,
:supervisor_pid,
:assignment_received_handler,
:assignments_revoked_handler,
:start_time,
:delay,
:handler,
:handler_init_args,
:workers,
:generation_id
]
end

@doc """
Provides convenience for backward compatibility with previous versions of Elsa where acking for
a consumer group was handled by the Elsa.Group.Manager module.
"""
defdelegate ack(connection, topic, partition, generation_id, offset), to: Elsa.Group.Acknowledger

def get_committed_offsets(_pid, _topic) do
{:ok, []}
end

@doc """
Trigger the assignment of workers to a given topic and partition
"""
@spec assignments_received(pid(), term(), generation_id(), [tuple()]) :: :ok
@spec assignments_received(GenServer.server(), term(), generation_id(), [tuple()]) :: :ok
def assignments_received(pid, group_member_id, generation_id, assignments) do
GenServer.call(pid, {:process_assignments, group_member_id, generation_id, assignments})
end
Expand All @@ -113,33 +122,11 @@ defmodule Elsa.Group.Manager do
Trigger deallocation of all workers from the consumer group and stop
worker processes.
"""
@spec assignments_revoked(pid()) :: :ok
@spec assignments_revoked(GenServer.server()) :: :ok
def assignments_revoked(pid) do
GenServer.call(pid, :revoke_assignments, 30_000)
end

@doc """
Trigger acknowledgement of processed messages back to the cluster.
"""
@spec ack(String.t(), Elsa.topic(), Elsa.partition(), generation_id(), integer()) :: :ok
def ack(connection, topic, partition, generation_id, offset) do
group_manager = {:via, Elsa.Registry, {registry(connection), __MODULE__}}
GenServer.cast(group_manager, {:ack, topic, partition, generation_id, offset})
end

@doc """
Trigger acknowledgement of processed messages back to the cluster.
"""
@spec ack(String.t(), %{
topic: Elsa.topic(),
partition: Elsa.partition(),
generation_id: generation_id(),
offset: integer()
}) :: :ok
def ack(connection, %{topic: topic, partition: partition, generation_id: generation_id, offset: offset}) do
ack(connection, topic, partition, generation_id, offset)
end

@doc """
Start the group manager process and register a name with the process registry.
"""
Expand All @@ -159,25 +146,15 @@ defmodule Elsa.Group.Manager do
supervisor_pid: Keyword.fetch!(opts, :supervisor_pid),
assignment_received_handler: Keyword.get(opts, :assignment_received_handler, fn _g, _t, _p, _gen -> :ok end),
assignments_revoked_handler: Keyword.get(opts, :assignments_revoked_handler, fn -> :ok end),
start_time: :erlang.system_time(:milli_seconds),
delay: Keyword.get(opts, :delay, @default_delay),
handler: Keyword.fetch!(opts, :handler),
handler_init_args: Keyword.get(opts, :handler_init_args, %{}),
config: Keyword.get(opts, :config, []),
workers: %{}
}

{:ok, state, {:continue, :start_coordinator}}
end

def handle_continue(:start_coordinator, state) do
{:ok, group_coordinator_pid} =
:brod_group_coordinator.start_link(state.connection, state.group, state.topics, state.config, __MODULE__, self())

Elsa.Registry.register_name({registry(state.connection), :brod_group_coordinator}, group_coordinator_pid)

{:noreply, %{state | group_coordinator_pid: group_coordinator_pid}}
catch
:exit, reason ->
wait_and_stop(reason, state)
{:ok, state}
end

def handle_call({:process_assignments, _member_id, generation_id, assignments}, _from, state) do
Expand All @@ -186,6 +163,11 @@ defmodule Elsa.Group.Manager do
{:stop, reason, {:error, reason}, state}

:ok ->
Elsa.Group.Acknowledger.update_generation_id(
{:via, Elsa.Registry, {registry(state.connection), Elsa.Group.Acknowledger}},
generation_id
)

new_workers = start_workers(state, generation_id, assignments)
{:reply, :ok, %{state | workers: new_workers, generation_id: generation_id}}
end
Expand All @@ -198,43 +180,25 @@ defmodule Elsa.Group.Manager do
{:reply, :ok, %{state | workers: new_workers, generation_id: nil}}
end

def handle_cast({:ack, topic, partition, generation_id, offset}, state) do
case state.generation_id == generation_id do
true ->
:ok = :brod_group_coordinator.ack(state.group_coordinator_pid, generation_id, topic, partition, offset)
:ok = Elsa.Consumer.ack(state.connection, topic, partition, offset)
new_workers = WorkerManager.update_offset(state.workers, topic, partition, offset)
{:noreply, %{state | workers: new_workers}}

false ->
Logger.warn(
"Invalid generation_id #{state.generation_id} == #{generation_id}, ignoring ack - topic #{topic} partition #{
partition
} offset #{offset}"
)

{:noreply, state}
end
end

def handle_info({:DOWN, ref, :process, _object, _reason}, state) do
new_workers = WorkerManager.restart_worker(state.workers, ref, state)

{:noreply, %{state | workers: new_workers}}
end

def handle_info({:EXIT, _from, reason}, state) do
wait_and_stop(reason, state)
def handle_info({:EXIT, _pid, reason}, %State{delay: delay, start_time: started} = state) do
lifetime = :erlang.system_time(:milli_seconds) - started

max(delay - lifetime, 0)
|> Process.sleep()

{:stop, reason, state}
end

def terminate(reason, %{group_coordinator_pid: group_coordinator_pid} = state) do
def terminate(reason, state) do
Logger.info("#{__MODULE__} : Terminating #{state.connection}")
WorkerManager.stop_all_workers(state.workers)

if group_coordinator_pid != nil && Process.alive?(group_coordinator_pid) do
Process.exit(group_coordinator_pid, reason)
end

reason
end

Expand All @@ -252,9 +216,4 @@ defmodule Elsa.Group.Manager do
WorkerManager.start_worker(workers, generation_id, assignment, state)
end)
end

defp wait_and_stop(reason, state) do
Process.sleep(2_000)
{:stop, reason, state}
end
end
21 changes: 9 additions & 12 deletions lib/elsa/group/manager/worker_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,12 @@ defmodule Elsa.Group.Manager.WorkerManager do
Retrieve the generation id, used in tracking assignments of workers to topic/partition,
from the worker state map.
"""
@spec get_generation_id(map(), Elsa.topic(), Elsa.partition()) :: integer()
@spec get_generation_id(map(), Elsa.topic(), Elsa.partition()) :: Elsa.Group.Manager.generation_id()
def get_generation_id(workers, topic, partition) do
Map.get(workers, {topic, partition})
|> Map.get(:generation_id)
end

@doc """
Update the current offset for a given worker with respect to messages consumed
from its topic/partition.
"""
@spec update_offset(map(), Elsa.topic(), Elsa.partition(), integer()) :: map() | no_return()
def update_offset(workers, topic, partition, offset) do
Map.update!(workers, {topic, partition}, fn worker -> %{worker | latest_offset: offset + 1} end)
end

@doc """
Iterate over all workers managed by the group manager and issue the unsubscribe call
to disengage from the topic/partition and shut down gracefully.
Expand All @@ -59,8 +50,14 @@ defmodule Elsa.Group.Manager.WorkerManager do
def restart_worker(workers, ref, %Elsa.Group.Manager.State{} = state) do
worker = get_by_ref(workers, ref)

assignment =
brod_received_assignment(topic: worker.topic, partition: worker.partition, begin_offset: worker.latest_offset)
latest_offset =
Elsa.Group.Acknowledger.get_latest_offset(
{:via, Elsa.Registry, {registry(state.connection), Elsa.Group.Acknowledger}},
worker.topic,
worker.partition
)

assignment = brod_received_assignment(topic: worker.topic, partition: worker.partition, begin_offset: latest_offset)

start_worker(workers, worker.generation_id, assignment, state)
end
Expand Down
Loading

0 comments on commit 90e50a9

Please sign in to comment.