From 4f4c205ff0cb54e4c0405f4bffa6ba8601a1f44f Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Mon, 14 Aug 2023 14:05:09 +0100 Subject: [PATCH] Skip control batches (dpkp/kafka-python#2361) --- kafka/consumer/fetcher.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5434c36a2..99d11f274 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -464,6 +464,13 @@ def _unpack_message_set(self, tp, records): except AttributeError: pass + # Control messages are used to enable transactions in Kafka and are generated by the + # broker. Clients should not return control batches (ie. those with this bit set) to + # applications. (since 0.11.0.0) + if getattr(batch, "is_control_batch", False): + batch = records.next_batch() + continue + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1