From 99983385dacb212285b3fa944ab6474b9b472cbe Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Wed, 17 Jan 2024 14:57:49 +0200 Subject: [PATCH] Fix spec for GroupCoordinatorResponse_v1 --- aiokafka/admin/client.py | 7 ++----- aiokafka/protocol/commit.py | 10 ++++++++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/aiokafka/admin/client.py b/aiokafka/admin/client.py index cb436a47..ece22478 100644 --- a/aiokafka/admin/client.py +++ b/aiokafka/admin/client.py @@ -46,7 +46,7 @@ class AIOKafkaAdminClient: each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to - consumer group administration. Default: 'kafka-python-{version}' + consumer group administration. Default: 'aiokafka-{version}' request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. connections_max_idle_ms: Close idle connections after the number of @@ -508,10 +508,7 @@ async def find_coordinator(self, group_id: str, coordinator_type: int = 0) -> in :return int: the acting coordinator broker id """ - # FIXME GroupCoordinatorRequest_v1 in kafka-python 2.0.2 doesn't match - # spec causing "ValueError: Buffer underrun decoding string" - # version = self._matching_api_version(GroupCoordinatorRequest) - version = self._matching_api_version(GroupCoordinatorRequest[:1]) + version = self._matching_api_version(GroupCoordinatorRequest) if version == 0 and coordinator_type: raise IncompatibleBrokerVersion( "Cannot query for transaction id on current broker version" diff --git a/aiokafka/protocol/commit.py b/aiokafka/protocol/commit.py index 81185397..b0fda8c3 100644 --- a/aiokafka/protocol/commit.py +++ b/aiokafka/protocol/commit.py @@ -280,6 +280,7 @@ class GroupCoordinatorResponse_v1(Response): API_KEY = 10 API_VERSION = 1 SCHEMA = Schema( + ("throttle_time_ms", Int32), ("error_code", Int16), ("error_message", String("utf-8")), ("coordinator_id", Int32), @@ -292,14 +293,19 @@ class GroupCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 RESPONSE_TYPE = GroupCoordinatorResponse_v0 - SCHEMA = Schema(("consumer_group", String("utf-8"))) + SCHEMA = Schema( + ("consumer_group", String("utf-8")), + ) class GroupCoordinatorRequest_v1(Request): API_KEY = 10 API_VERSION = 1 RESPONSE_TYPE = GroupCoordinatorResponse_v1 - SCHEMA = Schema(("coordinator_key", String("utf-8")), ("coordinator_type", Int8)) + SCHEMA = Schema( + ("coordinator_key", String("utf-8")), + ("coordinator_type", Int8), + ) GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]