Skip to content

Commit

Permalink
REMOVE: the filtering capability because data can be outdated and mig…
Browse files Browse the repository at this point in the history
…ht hide the most up-to-date topology
  • Loading branch information
seungjinstord committed Jan 9, 2025
1 parent 568ad30 commit 4cc9549
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions lib/kafee/consumer/brod_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ defmodule Kafee.Consumer.BrodMonitor do
Options:
* includes the connection options: [ssl: __, sasl: ___]
* `:node_only`: boolean, default `false`. If `true`, filter to only partitions handled in node.
- note: this is eventually going to be up to date. There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field
reporting with delay on which node name the partition is at.
What this means in an example:
## Note on committed offsets: the metadata has the Node name, and it takes time to be up to date.
There is a lag due to reporting coming from `:brod.fetch_committed_offsets/2`'s metadata field
reporting with delay on which node name the partition is at.
What this means in an example:
```
iex(warehouse@10.4.1.4)95> committed_offsets = BrodMonitor.get_committed_offsets(client_id, consumer_group_id)
iex(warehouse@10.4.1.4)95> topic_offsets_map = Enum.find(committed_offsets, &(&1.name == topic))
%{
Expand All @@ -43,7 +45,6 @@ defmodule Kafee.Consumer.BrodMonitor do
},
...
```
## Note on how to use this information:
Expand All @@ -55,6 +56,14 @@ defmodule Kafee.Consumer.BrodMonitor do
also for `Kafee.Consumer.BrodAdapter`.
"""
@spec get_consumer_lag(
client_id :: pid(),
endpoints :: [{host :: binary(), port :: integer()}],
topic :: binary(),
consumer_group_id :: binary(),
:brod.conn_config()
) ::
{:ok, %{(partition :: integer()) => consumer_lag :: integer()}}
def get_consumer_lag(client_id, endpoints, topic, consumer_group_id, options \\ []) do
# Step 1: Get partitions and latest offsets
connection_options = Keyword.take(options, [:ssl, :sasl])
Expand All @@ -64,26 +73,9 @@ defmodule Kafee.Consumer.BrodMonitor do
# Step 2: Get committed offsets and filter to current node
{:ok, committed_offsets} = get_committed_offsets(client_id, consumer_group_id)
topic_offsets_map = Enum.find(committed_offsets, %{partitions: []}, &(&1.name == topic))
node_name = Atom.to_string(Node.self())

filtered_committed_offsets =
if options[:node_only],
do: limit_committed_offset_data_to_current_node(topic_offsets_map, node_name),
else: topic_offsets_map

# Step 3: Calculate lag
calculate_lag(latest_offsets, filtered_committed_offsets)
end

def limit_committed_offset_data_to_current_node(committed_offsets_map, node_name) do
committed_offsets_per_partitions = committed_offsets_map.partitions

offsets_on_node =
Enum.filter(committed_offsets_per_partitions, fn %{metadata: metadata} ->
String.contains?(metadata, node_name)
end)

%{committed_offsets_map | partitions: offsets_on_node}
calculate_lag(latest_offsets, topic_offsets_map)
end

def get_committed_offsets(client_id, consumer_group_id) do
Expand Down

0 comments on commit 4cc9549

Please sign in to comment.