From cfa60f9f2d4b3ec7b2dc73938fb922e14d11da10 Mon Sep 17 00:00:00 2001 From: chopatate Date: Sun, 21 Nov 2021 16:55:55 +0100 Subject: [PATCH] Support Describe log dirs I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal. --- kafka/admin/client.py | 18 +++++++++++++++++- kafka/protocol/admin.py | 42 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index fd4d66110..084421b91 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -20,7 +20,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest + DeleteGroupsRequest, DescribeLogDirsRequest ) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest @@ -1344,3 +1344,19 @@ def _wait_for_futures(self, futures): if future.failed(): raise future.exception # pylint: disable-msg=raising-bad-type + + def describe_log_dirs(self): + """Send a DescribeLogDirsRequest request to a broker. + + :return: A message future + """ + version = self._matching_api_version(DescribeLogDirsRequest) + if version <= 1: + request = DescribeLogDirsRequest[version]() + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + else: + raise NotImplementedError( + "Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return future.value diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..fb93b26ab 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -790,6 +790,48 @@ class DescribeConfigsRequest_v2(Request): ] +class DescribeLogDirsResponse_v0(Response): + API_KEY = 35 + API_VERSION = 0 + FLEXIBLE_VERSION = True + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('log_dirs', Array( + ('error_code', Int16), + ('log_dir', String('utf-8')), + ('topics', Array( + ('name', String('utf-8')), + ('partitions', Array( + ('partition_index', Int32), + ('partition_size', Int64), + ('offset_lag', Int64), + ('is_future_key', Boolean) + )) + )) + )) + ) + + +class DescribeLogDirsRequest_v0(Request): + API_KEY = 35 + API_VERSION = 0 + RESPONSE_TYPE = DescribeLogDirsResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Int32) + )) + ) + + +DescribeLogDirsResponse = [ + DescribeLogDirsResponse_v0, +] +DescribeLogDirsRequest = [ + DescribeLogDirsRequest_v0, +] + + class SaslAuthenticateResponse_v0(Response): API_KEY = 36 API_VERSION = 0