Skip to content

Commit

Permalink
fix: UPDATE: kafee async worker to log only the headers + topic for l…
Browse files Browse the repository at this point in the history
…arge message error instead of the payload as it can go over the datadog limit (#108)

## Related Ticket(s)
SIGNAL-7183
<!--
Enter the Jira issue below in the following format: PROJECT-##
-->

## Checklist

<!--
For each bullet, ensure your pr meets the criteria and write a note
explaining how this PR relates. Mark them as complete as they are done.
All top-level checkboxes should be checked regardless of their relevance
to the pr with a note explaining whether they are relevant or not.
-->

- [x] Code conforms to the [Elixir
Styleguide](https://github.com/christopheradams/elixir_style_guide)

## Problem

https://stord.atlassian.net/wiki/spaces/PD/pages/3528982601/WMS+integration+bridge+receipt+confirmations+triaging

We had an issue where large messages didn't even log, it silently
omitted.
Suspicion is the message being too large for Datadog.

## Details

We need at least something to help out with tracing which message or
data the missed message was for.
Therefore, instead of the actual body of the message, we will log the
metadata + topic of the message.
  • Loading branch information
seungjinstord authored Oct 17, 2024
1 parent 7eef0e0 commit 1d98363
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions lib/kafee/producer/async_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ defmodule Kafee.Producer.AsyncWorker do
case error do
{:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}}
when length(sent_messages) == 1 ->
Logger.error("Message in queue is too large", data: sent_messages)
Logger.error("Message in queue is too large", data: data_to_log_for_large_message_error(sent_messages))
%{state | queue: :queue.drop(queue)}

{:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}} ->
Expand Down Expand Up @@ -363,7 +363,9 @@ defmodule Kafee.Producer.AsyncWorker do
messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed)

Enum.each(messages_beyond_max_bytes, fn message ->
Logger.error("Message in queue is too large, will not push to Kafka", data: message)
Logger.error("Message in queue is too large, will not push to Kafka",
data: data_to_log_for_large_message_error(message)
)
end)

messages_within_max_bytes_queue
Expand All @@ -373,6 +375,14 @@ defmodule Kafee.Producer.AsyncWorker do
max_request_bytes > kafka_message_size_bytes(message)
end

defp data_to_log_for_large_message_error(%Kafee.Producer.Message{} = message) do
message.headers |> Enum.into(%{}) |> Map.put(:topic, message.topic)
end

defp data_to_log_for_large_message_error([%Kafee.Producer.Message{} = message]) do
data_to_log_for_large_message_error(message)
end

@spec emit_queue_telemetry(State.t(), non_neg_integer()) :: :ok
defp emit_queue_telemetry(state, count) do
:telemetry.execute([:kafee, :queue], %{count: count}, %{
Expand Down

0 comments on commit 1d98363

Please sign in to comment.