From 1d98363f8970dad7ce355fff1734e76caddd388e Mon Sep 17 00:00:00 2001 From: seungjinstord <121889101+seungjinstord@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:08:22 -0700 Subject: [PATCH] fix: UPDATE: kafee async worker to log only the headers + topic for large message error instead of the payload as it can go over the datadog limit (#108) ## Related Ticket(s) SIGNAL-7183 ## Checklist - [x] Code conforms to the [Elixir Styleguide](https://github.com/christopheradams/elixir_style_guide) ## Problem https://stord.atlassian.net/wiki/spaces/PD/pages/3528982601/WMS+integration+bridge+receipt+confirmations+triaging We had an issue where large messages didn't even log, it silently omitted. Suspicion is the message being too large for Datadog. ## Details We need at least something to help out with tracing which message or data the missed message was for. Therefore, instead of the actual body of the message, we will log the metadata + topic of the message. --- lib/kafee/producer/async_worker.ex | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/kafee/producer/async_worker.ex b/lib/kafee/producer/async_worker.ex index c5158a8..d2cb483 100644 --- a/lib/kafee/producer/async_worker.ex +++ b/lib/kafee/producer/async_worker.ex @@ -177,7 +177,7 @@ defmodule Kafee.Producer.AsyncWorker do case error do {:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}} when length(sent_messages) == 1 -> - Logger.error("Message in queue is too large", data: sent_messages) + Logger.error("Message in queue is too large", data: data_to_log_for_large_message_error(sent_messages)) %{state | queue: :queue.drop(queue)} {:error, {:producer_down, {:not_retriable, {_, _, _, _, :message_too_large}}}} -> @@ -363,7 +363,9 @@ defmodule Kafee.Producer.AsyncWorker do messages_beyond_max_bytes = Enum.reverse(messages_beyond_max_bytes_reversed) Enum.each(messages_beyond_max_bytes, fn message -> - Logger.error("Message in queue is too large, will not push to Kafka", data: message) + Logger.error("Message in queue is too large, will not push to Kafka", + data: data_to_log_for_large_message_error(message) + ) end) messages_within_max_bytes_queue @@ -373,6 +375,14 @@ defmodule Kafee.Producer.AsyncWorker do max_request_bytes > kafka_message_size_bytes(message) end + defp data_to_log_for_large_message_error(%Kafee.Producer.Message{} = message) do + message.headers |> Enum.into(%{}) |> Map.put(:topic, message.topic) + end + + defp data_to_log_for_large_message_error([%Kafee.Producer.Message{} = message]) do + data_to_log_for_large_message_error(message) + end + @spec emit_queue_telemetry(State.t(), non_neg_integer()) :: :ok defp emit_queue_telemetry(state, count) do :telemetry.execute([:kafee, :queue], %{count: count}, %{