From d51f981a20ed4c316e1c899b2fc493937411c742 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 16:08:36 -0800 Subject: [PATCH 01/18] ADD: module logic to get the consumer lag --- lib/kafee/consumer/broadway_monitor.ex | 91 ++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 lib/kafee/consumer/broadway_monitor.ex diff --git a/lib/kafee/consumer/broadway_monitor.ex b/lib/kafee/consumer/broadway_monitor.ex new file mode 100644 index 0000000..962a0cb --- /dev/null +++ b/lib/kafee/consumer/broadway_monitor.ex @@ -0,0 +1,91 @@ +defmodule Kafee.Consumer.BroadwayMonitor do + @moduledoc """ + Utility module for monitoring Broadway - covering consumer lags + + On top of the [telemetry](https://hexdocs.pm/broadway_kafka/BroadwayKafka.Producer.html#module-telemetry) that BroadwayKafka gives us, + it is highly beneficial to track the consumer lag as well. + + As a refresher, a “consumer lag” equals the latest message offset that has reached + the broker (i.e. last message published) minus last message + that has been consumed for the consumer group. + Calculating this per partition is the goal. + + """ + require Logger + + def get_consumer_lag(client_id, endpoints, topic, consumer_group_id, options \\ []) do + # Step 1: Get partitions and latest offsets + + partitions = get_partitions(endpoints, topic, options) + latest_offsets = get_latest_offsets(endpoints, topic, partitions, options) + # Step 2: Get committed offsets and filter to current node + {:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id) + topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic)) + node_name = Atom.to_string(Node.self()) + + filtered_committed_offsets = + limit_committed_offset_data_to_current_node(topic_offsets_map, node_name) + + # Step 3: Calculate lag + calculate_lag(latest_offsets, filtered_committed_offsets) + end + + def limit_committed_offset_data_to_current_node(committed_offsets_map, node_name) do + committed_offsets_per_partitions = committed_offsets_map.partitions + + offsets_on_node = + Enum.filter(committed_offsets_per_partitions, fn %{metadata: metadata} -> + String.contains?(metadata, node_name) + end) + + %{committed_offsets_map | partitions: offsets_on_node} + end + + def get_committed_offsets(client_id, consumer_group_id) do + :brod.fetch_committed_offsets(client_id, consumer_group_id) + end + + def get_partitions(endpoints, topic, options \\ []) do + case :brod.get_metadata(endpoints, [topic], options) do + {:ok, %{topics: [%{partitions: partitions}]}} -> + Enum.map(partitions, fn %{partition_index: id} -> id end) + + _ -> + [] + end + end + + def get_latest_offsets(endpoints, topic, partitions, options) do + Enum.map(partitions, fn partition -> + case :brod.resolve_offset(endpoints, topic, partition, :latest, options) do + {:ok, offset} -> + {partition, offset} + + {:error, reason} -> + Logger.warning("Error getting offset for partition #{partition}: #{inspect(reason)}") + {partition, 0} + end + end) + end + + def committed_offsets_by_partition(committed_offsets) do + committed_offsets + |> Enum.map(fn %{partition_index: partition, committed_offset: committed_offset} -> + {partition, committed_offset} + end) + |> Enum.into(%{}) + end + + defp calculate_lag(latest_offsets, %{partitions: _} = topic_offsets) do + partition_to_committed_offsets_map = committed_offsets_by_partition(topic_offsets.partitions) + + partition_to_latest_offsets_map = Enum.into(latest_offsets, %{}) + + lags_map = + Map.intersect(partition_to_latest_offsets_map, partition_to_committed_offsets_map, fn _k, latest, committed -> + latest - committed + end) + + {:ok, lags_map} + end +end From ce4eac85649c4413e756146306eecc68161f661a Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 16:15:12 -0800 Subject: [PATCH 02/18] RENAME: module --- lib/kafee/consumer/{broadway_monitor.ex => brod_monitor.ex} | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename lib/kafee/consumer/{broadway_monitor.ex => brod_monitor.ex} (92%) diff --git a/lib/kafee/consumer/broadway_monitor.ex b/lib/kafee/consumer/brod_monitor.ex similarity index 92% rename from lib/kafee/consumer/broadway_monitor.ex rename to lib/kafee/consumer/brod_monitor.ex index 962a0cb..797ef72 100644 --- a/lib/kafee/consumer/broadway_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -1,9 +1,8 @@ -defmodule Kafee.Consumer.BroadwayMonitor do +defmodule Kafee.Consumer.BrodMonitor do @moduledoc """ Utility module for monitoring Broadway - covering consumer lags - On top of the [telemetry](https://hexdocs.pm/broadway_kafka/BroadwayKafka.Producer.html#module-telemetry) that BroadwayKafka gives us, - it is highly beneficial to track the consumer lag as well. + In addition to existing telemetry data, it is highly beneficial to track the consumer lag as well. As a refresher, a “consumer lag” equals the latest message offset that has reached the broker (i.e. last message published) minus last message From e4f104d41975fd0233058f93b3ad7ab10ab05961 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 16:16:05 -0800 Subject: [PATCH 03/18] ADD: test for module --- test/kafee/consumer/brod_monitor_test.exs | 27 +++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 test/kafee/consumer/brod_monitor_test.exs diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs new file mode 100644 index 0000000..133bc45 --- /dev/null +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -0,0 +1,27 @@ +defmodule Kafee.Consumer.BrodMonitorTest do + use Kafee.BrodCase + alias Kafee.Consumer.{Adapter, BrodMonitor} + + setup do + spy(Kafee.Consumer.Adapter) + on_exit(fn -> restore(Kafee.Consumer.Adapter) end) + + spy(Datadog.DataStreams.Integrations.Kafka) + on_exit(fn -> restore(Datadog.DataStreams.Integrations.Kafka) end) + end + + describe "get_consumer_lag/5" do + test "should correctly return consumer lags per partition", %{brod_client_id: brod_client_id, topic: topic} do + message = + Kafee.BrodApi.generate_consumer_message( + consumer_group: "my-consumer-group", + topic: "test-topic", + partition: 2, + offset: 4 + ) + + BroadwayMonitor.get_consumer_lag(brod_client_id, endpoints, topic, consumer_group_id, options \\ []) + :ok = Adapter.push_message(MyConsumer, @consumer_options, message) + end + end +end From f92b31b1d67f61305b0cf59403aa4c15f7797ae9 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 17:05:18 -0800 Subject: [PATCH 04/18] UPDATE: tweak code and test to work for no consumption case --- lib/kafee/consumer/brod_monitor.ex | 2 +- test/kafee/consumer/brod_monitor_test.exs | 96 +++++++++++++++++++---- 2 files changed, 80 insertions(+), 18 deletions(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 797ef72..efa40e6 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -19,7 +19,7 @@ defmodule Kafee.Consumer.BrodMonitor do latest_offsets = get_latest_offsets(endpoints, topic, partitions, options) # Step 2: Get committed offsets and filter to current node {:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id) - topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic)) + topic_offsets_map = Enum.find(committed_offsets, %{partitions: []}, &(&1.name == topic)) node_name = Atom.to_string(Node.self()) filtered_committed_offsets = diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index 133bc45..d7e68a6 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -1,27 +1,89 @@ defmodule Kafee.Consumer.BrodMonitorTest do - use Kafee.BrodCase + use Kafee.KafkaCase alias Kafee.Consumer.{Adapter, BrodMonitor} - setup do - spy(Kafee.Consumer.Adapter) - on_exit(fn -> restore(Kafee.Consumer.Adapter) end) + defmodule MyProducer do + use Kafee.Producer, + adapter: Kafee.Producer.AsyncAdapter, + partition_fun: :random - spy(Datadog.DataStreams.Integrations.Kafka) - on_exit(fn -> restore(Datadog.DataStreams.Integrations.Kafka) end) + def publish(_type, messages) do + :ok = produce(messages) + end + end + + setup %{topic: topic} do + start_supervised!( + {MyProducer, + [ + host: KafkaApi.host(), + port: KafkaApi.port(), + topic: topic + ]} + ) + + :ok + end + + setup(%{topic: topic}) do + consumer_group_id = Kafee.KafkaApi.generate_consumer_group_id() + + consumer_pid = + start_supervised!(%{ + id: MyConsumer, + start: + {Kafee.Consumer, :start_link, + [ + MyConsumer, + [ + adapter: Kafee.Consumer.BroadwayAdapter, + host: Kafee.KafkaApi.host(), + port: Kafee.KafkaApi.port(), + consumer_group_id: consumer_group_id, + topic: topic + ] + ]} + }) + + [consumer_pid: consumer_pid, consumer_group_id: consumer_group_id] end describe "get_consumer_lag/5" do - test "should correctly return consumer lags per partition", %{brod_client_id: brod_client_id, topic: topic} do - message = - Kafee.BrodApi.generate_consumer_message( - consumer_group: "my-consumer-group", - topic: "test-topic", - partition: 2, - offset: 4 - ) - - BroadwayMonitor.get_consumer_lag(brod_client_id, endpoints, topic, consumer_group_id, options \\ []) - :ok = Adapter.push_message(MyConsumer, @consumer_options, message) + test "should correctly handle zero consumed case", %{ + brod_client_id: brod_client_id, + topic: topic, + partitions: partitions, + consumer_group_id: consumer_group_id + } do + assert :ok = + MyProducer.publish(:some_type, [ + %Kafee.Producer.Message{ + key: "something_huge_above_4mb", + value: %{"some" => "message", "test_pid" => inspect(self())} |> Jason.encode!(), + topic: topic, + partition: 0 + } + ]) + + partitions_list = Enum.map(0..(partitions - 1), & &1) + poll_until_offset_tick(topic, partitions_list, [{0, 1}]) + + assert {:ok, %{}} = + BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id) + end + end + + defp poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left \\ 5) + defp poll_until_offset_tick(_topic, _partitions_list, _expected_result, 0), do: {:error, :timeout} + + defp poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left) do + case BrodMonitor.get_latest_offsets(Kafee.BrodApi.endpoints(), topic, partitions_list, []) do + ^expected_result -> + :ok + + _ -> + Process.sleep(100) + poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left - 1) end end end From e78d1f8d4e7591081c9848543d2018882aa2229b Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 17:05:48 -0800 Subject: [PATCH 05/18] UPDATE: example --- test/kafee/consumer/brod_monitor_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index d7e68a6..d9a25df 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -58,7 +58,7 @@ defmodule Kafee.Consumer.BrodMonitorTest do assert :ok = MyProducer.publish(:some_type, [ %Kafee.Producer.Message{ - key: "something_huge_above_4mb", + key: "some_key", value: %{"some" => "message", "test_pid" => inspect(self())} |> Jason.encode!(), topic: topic, partition: 0 From e234a99ce655ab4ec67b91c093b72546119f4b3a Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 17:51:04 -0800 Subject: [PATCH 06/18] UPDATE: lag asseerting test correctly asserts --- test/kafee/consumer/brod_monitor_test.exs | 81 +++++++++++++++++------ 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index d9a25df..1299c67 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -1,6 +1,6 @@ defmodule Kafee.Consumer.BrodMonitorTest do use Kafee.KafkaCase - alias Kafee.Consumer.{Adapter, BrodMonitor} + alias Kafee.Consumer.BrodMonitor defmodule MyProducer do use Kafee.Producer, @@ -12,6 +12,17 @@ defmodule Kafee.Consumer.BrodMonitorTest do end end + defmodule MyLaggyConsumer do + use Kafee.Consumer, + adapter: {Kafee.Consumer.BroadwayAdapter, []} + + def handle_message(%Kafee.Consumer.Message{} = message) do + test_pid = Application.get_env(:kafee, :test_pid, self()) + Process.sleep(100) + send(test_pid, {:consume_message, message}) + end + end + setup %{topic: topic} do start_supervised!( {MyProducer, @@ -22,33 +33,34 @@ defmodule Kafee.Consumer.BrodMonitorTest do ]} ) - :ok + [topic: topic] end - setup(%{topic: topic}) do - consumer_group_id = Kafee.KafkaApi.generate_consumer_group_id() + setup %{topic: topic} do + consumer_group_id = KafkaApi.generate_consumer_group_id() + + [consumer_group_id: consumer_group_id] + end - consumer_pid = - start_supervised!(%{ - id: MyConsumer, - start: - {Kafee.Consumer, :start_link, + describe "get_consumer_lag/5" do + setup %{topic: topic, consumer_group_id: consumer_group_id} do + Application.put_env(:kafee, :test_pid, self()) + + consumer_pid = + start_supervised!( + {MyLaggyConsumer, [ - MyConsumer, - [ - adapter: Kafee.Consumer.BroadwayAdapter, - host: Kafee.KafkaApi.host(), - port: Kafee.KafkaApi.port(), - consumer_group_id: consumer_group_id, - topic: topic - ] + host: KafkaApi.host(), + port: KafkaApi.port(), + topic: topic, + consumer_group_id: consumer_group_id ]} - }) + ) - [consumer_pid: consumer_pid, consumer_group_id: consumer_group_id] - end + Process.sleep(10_000) + [consumer_pid: consumer_pid] + end - describe "get_consumer_lag/5" do test "should correctly handle zero consumed case", %{ brod_client_id: brod_client_id, topic: topic, @@ -66,11 +78,36 @@ defmodule Kafee.Consumer.BrodMonitorTest do ]) partitions_list = Enum.map(0..(partitions - 1), & &1) - poll_until_offset_tick(topic, partitions_list, [{0, 1}]) + assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 1}]) assert {:ok, %{}} = BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id) end + + test "should correctly return consumer lag", %{ + brod_client_id: brod_client_id, + topic: topic, + partitions: partitions, + consumer_group_id: consumer_group_id + } do + for i <- 1..100 do + :ok = :brod.produce_sync(brod_client_id, topic, :hash, "key-#{i}", "test value") + end + + assert_receive {:consume_message, _} + + partitions_list = Enum.map(0..(partitions - 1), & &1) + assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 100}]) + + # wait a bit for consumer lag to build up + Process.sleep(100) + + # since LaggyConsumer takes some time to process each message, we'll see lags + assert {:ok, %{0 => lag}} = + BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id) + + assert lag > 20 + end end defp poll_until_offset_tick(topic, partitions_list, expected_result, attempt_left \\ 5) From 29f993409d660051183e94784a01d5ee90903982 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 2 Jan 2025 17:52:05 -0800 Subject: [PATCH 07/18] UPDATE: comments --- test/kafee/consumer/brod_monitor_test.exs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index 1299c67..a8cc793 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -18,6 +18,7 @@ defmodule Kafee.Consumer.BrodMonitorTest do def handle_message(%Kafee.Consumer.Message{} = message) do test_pid = Application.get_env(:kafee, :test_pid, self()) + # Each message processing is slow Process.sleep(100) send(test_pid, {:consume_message, message}) end @@ -90,6 +91,8 @@ defmodule Kafee.Consumer.BrodMonitorTest do partitions: partitions, consumer_group_id: consumer_group_id } do + # producing 100 messages + # since LaggyConsumer takes some time to process each message, we'll see lags for i <- 1..100 do :ok = :brod.produce_sync(brod_client_id, topic, :hash, "key-#{i}", "test value") end @@ -102,7 +105,6 @@ defmodule Kafee.Consumer.BrodMonitorTest do # wait a bit for consumer lag to build up Process.sleep(100) - # since LaggyConsumer takes some time to process each message, we'll see lags assert {:ok, %{0 => lag}} = BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id) From 98e22933ea467745c6e182bbff169c97f44880ca Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Fri, 3 Jan 2025 17:01:11 -0800 Subject: [PATCH 08/18] UDPATE: clean up code --- lib/kafee/consumer/brod_monitor.ex | 68 +++++++++++++++++++---- test/kafee/consumer/brod_monitor_test.exs | 4 +- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index efa40e6..2281051 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -12,18 +12,64 @@ defmodule Kafee.Consumer.BrodMonitor do """ require Logger + @doc """ + Returns the consumer lag per consumer group per partition. + A “consumer lag” equals the latest message offset that has reached + the broker (i.e. last message published) minus last message + that has been consumed for the consumer group. + + Options: + * includes the connection options: [ssl: __, sasl: ___] + * `:node_only`: boolean, default `false`. If `true`, filter to only partitions handled in node. + - note: this is eventually going to be up to date. There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field + reporting with delay on which node name the partition is at. + + What this means in an example: + + ``` + iex(warehouse@10.4.1.4)95> committed_offsets = BrodMonitor.get_committed_offsets(client_id, consumer_group_id) + iex(warehouse@10.4.1.4)95> topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic)) + %{ + name: "wms-service--firehose", + partitions: [ + %{ + # This metadata value could have a node name that is different from what shows up in Confluent cloud. + # Either one could be the stale one - we don't know that deep. + # The assumption is the value will eventually sync up to the correct node name. + metadata: "+1/'warehouse@10.4.2.142'/<0.7948.0>", + error_code: :no_error, + partition_index: 2, + committed_offset: 14052635 + }, + ... + + ``` + + ## Note on how to use this information: + + In observations and tests, it seems fine to trigger a shutdown of the process `*.Broadway.ProducerSupervisor` + on any of the node in the cluster - doing so will trigger a rebalancing across the cluster which will restart the supervisor tree, thereby + kickstarting the consumption on the lagging partitions. + + Above comment is based on `Kafee.Consumer.BroadwayAdapter`, but since the functions are using `:brod`, it would be true + also for `Kafee.Consumer.BrodAdapter`. + + """ def get_consumer_lag(client_id, endpoints, topic, consumer_group_id, options \\ []) do # Step 1: Get partitions and latest offsets + connection_options = Keyword.take(options, [:ssl, :sasl]) + partitions = get_partitions(endpoints, topic, connection_options) + latest_offsets = get_latest_offsets(endpoints, topic, partitions, connection_options) - partitions = get_partitions(endpoints, topic, options) - latest_offsets = get_latest_offsets(endpoints, topic, partitions, options) # Step 2: Get committed offsets and filter to current node {:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id) topic_offsets_map = Enum.find(committed_offsets, %{partitions: []}, &(&1.name == topic)) node_name = Atom.to_string(Node.self()) filtered_committed_offsets = - limit_committed_offset_data_to_current_node(topic_offsets_map, node_name) + if options[:node_only], + do: limit_committed_offset_data_to_current_node(topic_offsets_map, node_name), + else: topic_offsets_map # Step 3: Calculate lag calculate_lag(latest_offsets, filtered_committed_offsets) @@ -67,14 +113,6 @@ defmodule Kafee.Consumer.BrodMonitor do end) end - def committed_offsets_by_partition(committed_offsets) do - committed_offsets - |> Enum.map(fn %{partition_index: partition, committed_offset: committed_offset} -> - {partition, committed_offset} - end) - |> Enum.into(%{}) - end - defp calculate_lag(latest_offsets, %{partitions: _} = topic_offsets) do partition_to_committed_offsets_map = committed_offsets_by_partition(topic_offsets.partitions) @@ -87,4 +125,12 @@ defmodule Kafee.Consumer.BrodMonitor do {:ok, lags_map} end + + def committed_offsets_by_partition(committed_offsets) do + committed_offsets + |> Enum.map(fn %{partition_index: partition, committed_offset: committed_offset} -> + {partition, committed_offset} + end) + |> Enum.into(%{}) + end end diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index a8cc793..2c251a2 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -37,9 +37,8 @@ defmodule Kafee.Consumer.BrodMonitorTest do [topic: topic] end - setup %{topic: topic} do + setup do consumer_group_id = KafkaApi.generate_consumer_group_id() - [consumer_group_id: consumer_group_id] end @@ -58,6 +57,7 @@ defmodule Kafee.Consumer.BrodMonitorTest do ]} ) + # takes some time to start up the consumer Process.sleep(10_000) [consumer_pid: consumer_pid] end From 55caa6fbfd90a653dc439a054f716dcbe6b03fc2 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Fri, 3 Jan 2025 17:04:49 -0800 Subject: [PATCH 09/18] UPDATE: credo fix --- test/kafee/consumer/brod_monitor_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index 2c251a2..9433350 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -72,7 +72,7 @@ defmodule Kafee.Consumer.BrodMonitorTest do MyProducer.publish(:some_type, [ %Kafee.Producer.Message{ key: "some_key", - value: %{"some" => "message", "test_pid" => inspect(self())} |> Jason.encode!(), + value: Jason.encode!(%{"some" => "message", "test_pid" => inspect(self())}), topic: topic, partition: 0 } From 1575faedc88dfcf601be203414be83c42487d259 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Fri, 3 Jan 2025 17:14:42 -0800 Subject: [PATCH 10/18] UPDATE: do not use Map.intersect --- lib/kafee/consumer/brod_monitor.ex | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 2281051..9e1d50a 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -118,10 +118,19 @@ defmodule Kafee.Consumer.BrodMonitor do partition_to_latest_offsets_map = Enum.into(latest_offsets, %{}) + common_map_keys = + partition_to_latest_offsets_map + |> Map.keys() + |> MapSet.new() + |> MapSet.intersection(MapSet.new(Map.keys(partition_to_committed_offsets_map))) + |> MapSet.to_list() + lags_map = - Map.intersect(partition_to_latest_offsets_map, partition_to_committed_offsets_map, fn _k, latest, committed -> + partition_to_latest_offsets_map + |> Map.merge(partition_to_committed_offsets_map, fn _k, latest, committed -> latest - committed end) + |> Map.take(common_map_keys) {:ok, lags_map} end From 89e21a61ed10fd95d48321b471c3d590e3239b24 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Fri, 3 Jan 2025 17:20:07 -0800 Subject: [PATCH 11/18] UDPATE: credo fix --- lib/kafee/consumer/brod_monitor.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 9e1d50a..0856c5b 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -118,11 +118,13 @@ defmodule Kafee.Consumer.BrodMonitor do partition_to_latest_offsets_map = Enum.into(latest_offsets, %{}) + committed_offsets_keys_mapset = partition_to_committed_offsets_map |> Map.keys() |> MapSet.new() + common_map_keys = partition_to_latest_offsets_map |> Map.keys() |> MapSet.new() - |> MapSet.intersection(MapSet.new(Map.keys(partition_to_committed_offsets_map))) + |> MapSet.intersection(committed_offsets_keys_mapset) |> MapSet.to_list() lags_map = From 568ad308db1b91b821f9787227a0d76d14039f54 Mon Sep 17 00:00:00 2001 From: seungjinstord <121889101+seungjinstord@users.noreply.github.com> Date: Wed, 8 Jan 2025 09:00:03 -0800 Subject: [PATCH 12/18] Update lib/kafee/consumer/brod_monitor.ex Co-authored-by: Matt Sutkowski --- lib/kafee/consumer/brod_monitor.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 0856c5b..1890026 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -1,6 +1,6 @@ defmodule Kafee.Consumer.BrodMonitor do @moduledoc """ - Utility module for monitoring Broadway - covering consumer lags + Utility module for monitoring consumer lag In addition to existing telemetry data, it is highly beneficial to track the consumer lag as well. From 4cc9549c7bd925577af8cdbc77aa0a778278787c Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 9 Jan 2025 11:06:00 -0800 Subject: [PATCH 13/18] REMOVE: the filtering capability because data can be outdated and might hide the most up-to-date topology --- lib/kafee/consumer/brod_monitor.ex | 40 ++++++++++++------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 1890026..0248d01 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -20,13 +20,15 @@ defmodule Kafee.Consumer.BrodMonitor do Options: * includes the connection options: [ssl: __, sasl: ___] - * `:node_only`: boolean, default `false`. If `true`, filter to only partitions handled in node. - - note: this is eventually going to be up to date. There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field - reporting with delay on which node name the partition is at. - What this means in an example: + ## Note on committed offsets: the metadata has the Node name, and it takes time to be up to date. + + There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field + reporting with delay on which node name the partition is at. + + What this means in an example: + - ``` iex(warehouse@10.4.1.4)95> committed_offsets = BrodMonitor.get_committed_offsets(client_id, consumer_group_id) iex(warehouse@10.4.1.4)95> topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic)) %{ @@ -43,7 +45,6 @@ defmodule Kafee.Consumer.BrodMonitor do }, ... - ``` ## Note on how to use this information: @@ -55,6 +56,14 @@ defmodule Kafee.Consumer.BrodMonitor do also for `Kafee.Consumer.BrodAdapter`. """ + @spec get_consumer_lag( + client_id :: pid(), + endpoints :: [{host :: binary(), port :: integer()}], + topic :: binary(), + consumer_group_id :: binary(), + :brod.conn_config() + ) :: + {:ok, %{(partition :: integer()) => consumer_lag :: integer()}} def get_consumer_lag(client_id, endpoints, topic, consumer_group_id, options \\ []) do # Step 1: Get partitions and latest offsets connection_options = Keyword.take(options, [:ssl, :sasl]) @@ -64,26 +73,9 @@ defmodule Kafee.Consumer.BrodMonitor do # Step 2: Get committed offsets and filter to current node {:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id) topic_offsets_map = Enum.find(committed_offsets, %{partitions: []}, &(&1.name == topic)) - node_name = Atom.to_string(Node.self()) - - filtered_committed_offsets = - if options[:node_only], - do: limit_committed_offset_data_to_current_node(topic_offsets_map, node_name), - else: topic_offsets_map # Step 3: Calculate lag - calculate_lag(latest_offsets, filtered_committed_offsets) - end - - def limit_committed_offset_data_to_current_node(committed_offsets_map, node_name) do - committed_offsets_per_partitions = committed_offsets_map.partitions - - offsets_on_node = - Enum.filter(committed_offsets_per_partitions, fn %{metadata: metadata} -> - String.contains?(metadata, node_name) - end) - - %{committed_offsets_map | partitions: offsets_on_node} + calculate_lag(latest_offsets, topic_offsets_map) end def get_committed_offsets(client_id, consumer_group_id) do From a08d6250262d9d6d1545f6ba7fa3cffc99ba0105 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 9 Jan 2025 11:12:10 -0800 Subject: [PATCH 14/18] UDPATE: privatize get_partitions() --- lib/kafee/consumer/brod_monitor.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 0248d01..5162e04 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -82,7 +82,7 @@ defmodule Kafee.Consumer.BrodMonitor do :brod.fetch_committed_offsets(client_id, consumer_group_id) end - def get_partitions(endpoints, topic, options \\ []) do + defp get_partitions(endpoints, topic, options) do case :brod.get_metadata(endpoints, [topic], options) do {:ok, %{topics: [%{partitions: partitions}]}} -> Enum.map(partitions, fn %{partition_index: id} -> id end) @@ -129,7 +129,7 @@ defmodule Kafee.Consumer.BrodMonitor do {:ok, lags_map} end - def committed_offsets_by_partition(committed_offsets) do + defp committed_offsets_by_partition(committed_offsets) do committed_offsets |> Enum.map(fn %{partition_index: partition, committed_offset: committed_offset} -> {partition, committed_offset} From 5b80e316deb5fd4ebd5ac427b59ed8d88d81804e Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Thu, 9 Jan 2025 11:18:20 -0800 Subject: [PATCH 15/18] ADD: more specs --- lib/kafee/consumer/brod_monitor.ex | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/kafee/consumer/brod_monitor.ex b/lib/kafee/consumer/brod_monitor.ex index 5162e04..4de460e 100644 --- a/lib/kafee/consumer/brod_monitor.ex +++ b/lib/kafee/consumer/brod_monitor.ex @@ -58,7 +58,7 @@ defmodule Kafee.Consumer.BrodMonitor do """ @spec get_consumer_lag( client_id :: pid(), - endpoints :: [{host :: binary(), port :: integer()}], + endpoints :: [:brod.endpoint()], topic :: binary(), consumer_group_id :: binary(), :brod.conn_config() @@ -78,6 +78,8 @@ defmodule Kafee.Consumer.BrodMonitor do calculate_lag(latest_offsets, topic_offsets_map) end + @spec get_committed_offsets(client_id :: pid(), consumer_group_id :: binary()) :: + {:ok, [:kpro.struct()]} | {:error, any()} def get_committed_offsets(client_id, consumer_group_id) do :brod.fetch_committed_offsets(client_id, consumer_group_id) end @@ -92,6 +94,13 @@ defmodule Kafee.Consumer.BrodMonitor do end end + @spec get_latest_offsets( + endpoints :: [:brod.endpoint()], + topic :: binary(), + partitions :: list(integer()), + :brod.conn_config() + ) :: + list({partition :: integer(), offset :: integer()}) def get_latest_offsets(endpoints, topic, partitions, options) do Enum.map(partitions, fn partition -> case :brod.resolve_offset(endpoints, topic, partition, :latest, options) do From 3c0ac446dec1864d125bf3580dd6804a2de8bd51 Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Tue, 14 Jan 2025 09:51:15 -0800 Subject: [PATCH 16/18] UPDATE: with more explanation of what is being tested --- test/kafee/consumer/brod_monitor_test.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index 9433350..bf6b58b 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -108,6 +108,8 @@ defmodule Kafee.Consumer.BrodMonitorTest do assert {:ok, %{0 => lag}} = BrodMonitor.get_consumer_lag(brod_client_id, Kafee.BrodApi.endpoints(), topic, consumer_group_id) + # assert lag is above 20, meaning the offset number difference between last published message and last consumed message is + # more than 20 messages apart assert lag > 20 end end From 94e19b73266aba3a32112564a3bb08297388228d Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Tue, 14 Jan 2025 09:54:03 -0800 Subject: [PATCH 17/18] ADD: more comment in the test --- test/kafee/consumer/brod_monitor_test.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index bf6b58b..9a20c77 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -100,6 +100,8 @@ defmodule Kafee.Consumer.BrodMonitorTest do assert_receive {:consume_message, _} partitions_list = Enum.map(0..(partitions - 1), & &1) + # last argument in poll_until_offset_tick() the expected target latest offset number for the partition. + # therefore we're waiting until all the messages received by the broker. assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 100}]) # wait a bit for consumer lag to build up From 009e776f2f0722d95de9a0ae31499851f69ed25b Mon Sep 17 00:00:00 2001 From: Seungjin Kim Date: Tue, 14 Jan 2025 09:56:29 -0800 Subject: [PATCH 18/18] UPDATE: typo no 2 --- test/kafee/consumer/brod_monitor_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/kafee/consumer/brod_monitor_test.exs b/test/kafee/consumer/brod_monitor_test.exs index 9a20c77..613c231 100644 --- a/test/kafee/consumer/brod_monitor_test.exs +++ b/test/kafee/consumer/brod_monitor_test.exs @@ -100,8 +100,8 @@ defmodule Kafee.Consumer.BrodMonitorTest do assert_receive {:consume_message, _} partitions_list = Enum.map(0..(partitions - 1), & &1) - # last argument in poll_until_offset_tick() the expected target latest offset number for the partition. - # therefore we're waiting until all the messages received by the broker. + # last argument in poll_until_offset_tick() is the expected target latest offset number for the partition. + # therefore we're waiting until all the messages are received by the broker. assert :ok = poll_until_offset_tick(topic, partitions_list, [{0, 100}]) # wait a bit for consumer lag to build up