Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add consumer lag utility for brod #131

Merged
merged 19 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions lib/kafee/consumer/brod_monitor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
defmodule Kafee.Consumer.BrodMonitor do
@moduledoc """
Utility module for monitoring consumer lag

In addition to existing telemetry data, it is highly beneficial to track the consumer lag as well.

As a refresher, a “consumer lag” equals the latest message offset that has reached
the broker (i.e. last message published) minus last message
that has been consumed for the consumer group.
Calculating this per partition is the goal.

"""
require Logger

@doc """
Returns the consumer lag per consumer group per partition.
A “consumer lag” equals the latest message offset that has reached
the broker (i.e. last message published) minus last message
that has been consumed for the consumer group.

Options:
* includes the connection options: [ssl: __, sasl: ___]

## Note on committed offsets: the metadata has the Node name, and it takes time to be up to date.

There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field
reporting with delay on which node name the partition is at.

What this means in an example:


iex(warehouse@10.4.1.4)95> committed_offsets = BrodMonitor.get_committed_offsets(client_id, consumer_group_id)
iex(warehouse@10.4.1.4)95> topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic))
%{
name: "wms-service--firehose",
partitions: [
%{
# This metadata value could have a node name that is different from what shows up in Confluent cloud.
# Either one could be the stale one - we don't know that deep.
# The assumption is the value will eventually sync up to the correct node name.
metadata: "+1/'warehouse@10.4.2.142'/<0.7948.0>",
error_code: :no_error,
partition_index: 2,
committed_offset: 14052635
},
...


## Note on how to use this information:

In observations and tests, it seems fine to trigger a shutdown of the process `*.Broadway.ProducerSupervisor`
on any of the node in the cluster - doing so will trigger a rebalancing across the cluster which will restart the supervisor tree, thereby
kickstarting the consumption on the lagging partitions.

Above comment is based on `Kafee.Consumer.BroadwayAdapter`, but since the functions are using `:brod`, it would be true
also for `Kafee.Consumer.BrodAdapter`.

"""
@spec get_consumer_lag(
client_id :: pid(),
endpoints :: [:brod.endpoint()],
topic :: binary(),
consumer_group_id :: binary(),
:brod.conn_config()
) ::
{:ok, %{(partition :: integer()) => consumer_lag :: integer()}}
def get_consumer_lag(client_id, endpoints, topic, consumer_group_id, options \\ []) do
seungjinstord marked this conversation as resolved.
Show resolved Hide resolved
# Step 1: Get partitions and latest offsets
connection_options = Keyword.take(options, [:ssl, :sasl])
partitions = get_partitions(endpoints, topic, connection_options)
latest_offsets = get_latest_offsets(endpoints, topic, partitions, connection_options)

# Step 2: Get committed offsets and filter to current node
{:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id)
topic_offsets_map = Enum.find(committed_offsets, %{partitions: []}, &(&1.name == topic))

# Step 3: Calculate lag
calculate_lag(latest_offsets, topic_offsets_map)
end

@spec get_committed_offsets(client_id :: pid(), consumer_group_id :: binary()) ::
{:ok, [:kpro.struct()]} | {:error, any()}
def get_committed_offsets(client_id, consumer_group_id) do
:brod.fetch_committed_offsets(client_id, consumer_group_id)
end

defp get_partitions(endpoints, topic, options) do
case :brod.get_metadata(endpoints, [topic], options) do
{:ok, %{topics: [%{partitions: partitions}]}} ->
Enum.map(partitions, fn %{partition_index: id} -> id end)

_ ->
[]
end
end

@spec get_latest_offsets(
endpoints :: [:brod.endpoint()],
topic :: binary(),
partitions :: list(integer()),
:brod.conn_config()
) ::
list({partition :: integer(), offset :: integer()})
def get_latest_offsets(endpoints, topic, partitions, options) do
Enum.map(partitions, fn partition ->
case :brod.resolve_offset(endpoints, topic, partition, :latest, options) do
{:ok, offset} ->
{partition, offset}

{:error, reason} ->
Logger.warning("Error getting offset for partition #{partition}: #{inspect(reason)}")
{partition, 0}
end
end)
end

defp calculate_lag(latest_offsets, %{partitions: _} = topic_offsets) do
partition_to_committed_offsets_map = committed_offsets_by_partition(topic_offsets.partitions)

partition_to_latest_offsets_map = Enum.into(latest_offsets, %{})

committed_offsets_keys_mapset = partition_to_committed_offsets_map |> Map.keys() |> MapSet.new()

common_map_keys =
partition_to_latest_offsets_map
|> Map.keys()
|> MapSet.new()
|> MapSet.intersection(committed_offsets_keys_mapset)
|> MapSet.to_list()

lags_map =
partition_to_latest_offsets_map
|> Map.merge(partition_to_committed_offsets_map, fn _k, latest, committed ->
latest - committed
end)
|> Map.take(common_map_keys)

{:ok, lags_map}
end

defp committed_offsets_by_partition(committed_offsets) do
committed_offsets
|> Enum.map(fn %{partition_index: partition, committed_offset: committed_offset} ->
{partition, committed_offset}
end)
|> Enum.into(%{})
end
end
132 changes: 132 additions & 0 deletions test/kafee/consumer/brod_monitor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
defmodule Kafee.Consumer.BrodMonitorTest do
use Kafee.KafkaCase
alias Kafee.Consumer.BrodMonitor

defmodule MyProducer do
use Kafee.Producer,
adapter: Kafee.Producer.AsyncAdapter,
partition_fun: :random

def publish(_type, messages) do
:ok = produce(messages)
end
end

defmodule MyLaggyConsumer do
use Kafee.Consumer,
adapter: {Kafee.Consumer.BroadwayAdapter, []}

def handle_message(%Kafee.Consumer.Message{} = message) do
test_pid = Application.get_env(:kafee, :test_pid, self())
# Each message processing is slow
Process.sleep(100)
send(test_pid, {:consume_message, message})
end
end

setup %{topic: topic} do
start_supervised!(
{MyProducer,
[
host: KafkaApi.host(),
port: KafkaApi.port(),
topic: topic
]}
)

[topic: topic]
end

setup do
consumer_group_id = KafkaApi.generate_consumer_group_id()
[consumer_group_id: consumer_group_id]
end

describe "get_consumer_lag/5" do
setup %{topic: topic, consumer_group_id: consumer_group_id} do
Application.put_env(:kafee, :test_pid, self())

consumer_pid =
start_supervised!(
{MyLaggyConsumer,
[
host: KafkaApi.host(),
port: KafkaApi.port(),
topic: topic,
consumer_group_id: consumer_group_id
]}
)

# takes some time to start up the consumer
Process.sleep(10_000)
[consumer_pid: consumer_pid]
end

test "should correctly handle zero consumed case", %{
brod_client_id: brod_client_id,
topic: topic,
partitions: partitions,
consumer_group_id: consumer_group_id
} do
assert :ok =
MyProducer.publish(:some_type, [
%Kafee.Producer.Message{
key: "some_key",
value: Jason.encode!(%{"some" => "message", "test_pid" => inspect(self())}),
topic: topic,
partition: 0
}
])

partitions_list = Enum.map(0..(partitions - 1), & &1)
assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 1}])

assert {:ok, %{}} =
BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id)
end

test "should correctly return consumer lag", %{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😮 🧠 💥

brod_client_id: brod_client_id,
topic: topic,
partitions: partitions,
consumer_group_id: consumer_group_id
} do
# producing 100 messages
# since LaggyConsumer takes some time to process each message, we'll see lags
for i <- 1..100 do
:ok = :brod.produce_sync(brod_client_id, topic, :hash, "key-#{i}", "test value")
end

assert_receive {:consume_message, _}

partitions_list = Enum.map(0..(partitions - 1), & &1)
# last argument in poll_until_offset_tick() is the expected target latest offset number for the partition.
# therefore we're waiting until all the messages are received by the broker.
assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 100}])

# wait a bit for consumer lag to build up
Process.sleep(100)

assert {:ok, %{0 => lag}} =
BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id)

# assert lag is above 20, meaning the offset number difference between last published message and last consumed message is
# more than 20 messages apart
assert lag > 20
end
end

defp poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left \\ 5)
defp poll_until_offset_tick(_topic, _partitions_list, _expected_result, 0), do: {:error, :timeout}

defp poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left) do
case BrodMonitor.get_latest_offsets(Kafee.BrodApi.endpoints(), topic, partitions_list, []) do
^expected_result ->
:ok

_ ->
Process.sleep(100)
poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left - 1)
end
end
end
Loading