diff --git a/lib/kafee/consumer/broadway_adapter.ex b/lib/kafee/consumer/broadway_adapter.ex index 224644e..9836b79 100644 --- a/lib/kafee/consumer/broadway_adapter.ex +++ b/lib/kafee/consumer/broadway_adapter.ex @@ -288,8 +288,12 @@ defmodule Kafee.Consumer.BroadwayAdapter do # function above because `Kafee.Consumer.Adapter.push_message/2` will catch any # errors. - error = %RuntimeError{message: "Error converting a Broadway message to Kafee.Consumer.Message"} - messages |> List.wrap() |> Enum.each(&consumer.handle_failure(error, &1)) + messages + |> List.wrap() + |> Enum.each(fn message -> + error = %RuntimeError{message: "Error occurred processing a message - #{inspect(message.status)}"} + consumer.handle_failure(error, message) + end) messages end diff --git a/test/kafee/consumer/broadway_adapter_integration_test.exs b/test/kafee/consumer/broadway_adapter_integration_test.exs index 67a587c..39173bd 100644 --- a/test/kafee/consumer/broadway_adapter_integration_test.exs +++ b/test/kafee/consumer/broadway_adapter_integration_test.exs @@ -130,7 +130,9 @@ defmodule Kafee.Consumer.BroadwayAdapterIntegrationTest do assert_receive message IO.inspect(message) - assert_receive {:error_reason, "%RuntimeError{message: \"Error handling a message for key-fail-2\"}"} + + assert_receive {:error_reason, + "%RuntimeError{message: \"Error converting a Broadway message to Kafee.Consumer.Message\"}"} end end end