From beb790e256db15174757c57f4288560be9c5c077 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Sun, 10 Nov 2024 19:54:26 -0800 Subject: [PATCH] MINOR: Convert kafka.api.MetricsTest to KRaft (#17744) Reviewers: Chia-Ping Tsai --- .../integration/kafka/api/MetricsTest.scala | 74 ++++++------------- 1 file changed, 22 insertions(+), 52 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index baf0e4bd4536a..71d2764aee856 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -14,17 +14,17 @@ package kafka.api import com.yammer.metrics.core.{Gauge, Histogram, Meter} import kafka.security.JaasTestUtils -import kafka.server.KafkaServer +import kafka.server.KafkaBroker import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.config.{SaslConfigs, TopicConfig} +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPartitionException} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.authenticator.TestJaasConfig import org.apache.kafka.common.{Metric, MetricName, TopicPartition} -import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions._ @@ -32,8 +32,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource + import java.util.{Locale, Properties} -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -45,9 +45,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) private val kafkaServerJaasEntryName = s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}" - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false") this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false") - this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8") this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10") // intentionally slow message down conversion via gzip compression to ensure we can measure the time it takes this.producerConfig.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip") @@ -65,6 +63,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) } + this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic") verifyNoRequestMetrics("Request metrics not removed in a previous test") startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)) super.setUp(testInfo) @@ -80,22 +79,21 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { /** * Verifies some of the metrics of producer, consumer as well as server. */ - @nowarn("cat=deprecation") - @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.systemRemoteStorageEnabled={2}") - @CsvSource(Array("zk,classic,true", "zk,classic,false")) - def testMetrics(quorum: String, groupProtocol: String, systemRemoteStorageEnabled: Boolean): Unit = { - val topic = "topicWithOldMessageFormat" - val props = new Properties - props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") - createTopic(topic, numPartitions = 1, replicationFactor = 1, props) + @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {1}") + @CsvSource(Array("kraft, true", "kraft, false")) + def testMetrics(quorum: String, systemRemoteStorageEnabled: Boolean): Unit = { + val topic = "mytopic" + createTopic(topic, + numPartitions = 1, + replicationFactor = 1, + listenerName = interBrokerListenerName, + adminClientConfig = adminClientConfig) val tp = new TopicPartition(topic, 0) // Produce and consume some records val numRecords = 10 val recordSize = 100000 val prop = new Properties() - // idempotence producer doesn't support old version of messages - prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") val producer = createProducer(configOverrides = prop) sendRecords(producer, numRecords, recordSize, tp) @@ -108,10 +106,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyClientVersionMetrics(consumer.metrics, "Consumer") verifyClientVersionMetrics(producer.metrics, "Producer") - val server = servers.head - verifyBrokerMessageConversionMetrics(server, recordSize, tp) - verifyBrokerErrorMetrics(servers.head) - verifyBrokerZkMetrics(server, topic) + val server = brokers.head + verifyBrokerMessageMetrics(server, recordSize, tp) + verifyBrokerErrorMetrics(server) generateAuthenticationFailure(tp) verifyBrokerAuthenticationMetrics(server) @@ -195,7 +192,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { } } - private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = { + private def verifyBrokerAuthenticationMetrics(server: KafkaBroker): Unit = { val metrics = server.metrics.metrics TestUtils.waitUntilTrue(() => maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) > 0, @@ -206,15 +203,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) } - private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = { + private def verifyBrokerMessageMetrics(server: KafkaBroker, recordSize: Int, tp: TopicPartition): Unit = { val requestMetricsPrefix = "kafka.network:type=RequestMetrics" val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce") val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce") assertTrue(tempBytes >= recordSize, s"Unexpected temporary memory size requestBytes $requestBytes tempBytes $tempBytes") - verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec") - // if message conversion run too fast, the Math.round(value) may be 0.0, so using count to check whether the metric is updated - assertTrue(yammerHistogram(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").count() > 0, "MessageConversionsTimeMs count should be > 0") verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch") verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0) @@ -222,21 +216,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata") } - private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = { - val histogram = yammerHistogram("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs") - // Latency is rounded to milliseconds, so check the count instead - val initialCount = histogram.count - servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) - val newCount = histogram.count - assertTrue(newCount > initialCount, "ZooKeeper latency not recorded") - - val min = histogram.min - assertTrue(min >= 0, s"Min latency should not be negative: $min") - - assertEquals("CONNECTED", yammerMetricValue("SessionState"), s"Unexpected ZK state") - } - - private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = { + private def verifyBrokerErrorMetrics(server: KafkaBroker): Unit = { def errorMetricCount = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName == "ErrorsPerSec") @@ -255,7 +235,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { // Check that error metrics are registered dynamically val currentErrorMetricCount = errorMetricCount assertEquals(startErrorMetricCount + 1, currentErrorMetricCount) - assertTrue(currentErrorMetricCount < 10, s"Too many error metrics $currentErrorMetricCount") + assertTrue(currentErrorMetricCount < 14, s"Too many error metrics $currentErrorMetricCount") try { consumer.partitionsFor("non-existing-topic") @@ -300,16 +280,6 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { } } - private def yammerHistogram(name: String): Histogram = { - val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala - val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) } - .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) - metric match { - case m: Histogram => m - case m => throw new AssertionError(s"Unexpected broker metric of class ${m.getClass}") - } - } - private def verifyYammerMetricRecorded(name: String, verify: Double => Boolean = d => d > 0): Double = { val metricValue = yammerMetricValue(name).asInstanceOf[Double] assertTrue(verify(metricValue), s"Broker metric not recorded correctly for $name value $metricValue")