Skip to content

Commit

Permalink
MINOR: Convert kafka.api.MetricsTest to KRaft (apache#17744)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
cmccabe authored Nov 11, 2024
1 parent a57b7d8 commit beb790e
Showing 1 changed file with 22 additions and 52 deletions.
74 changes: 22 additions & 52 deletions core/src/test/scala/integration/kafka/api/MetricsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@ package kafka.api

import com.yammer.metrics.core.{Gauge, Histogram, Meter}
import kafka.security.JaasTestUtils
import kafka.server.KafkaServer
import kafka.server.KafkaBroker
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.{SaslConfigs, TopicConfig}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource


import java.util.{Locale, Properties}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

class MetricsTest extends IntegrationTestHarness with SaslSetup {
Expand All @@ -45,9 +45,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}"
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "false")
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
// intentionally slow message down conversion via gzip compression to ensure we can measure the time it takes
this.producerConfig.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
Expand All @@ -65,6 +63,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
}
this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp(testInfo)
Expand All @@ -80,22 +79,21 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
/**
* Verifies some of the metrics of producer, consumer as well as server.
*/
@nowarn("cat=deprecation")
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.systemRemoteStorageEnabled={2}")
@CsvSource(Array("zk,classic,true", "zk,classic,false"))
def testMetrics(quorum: String, groupProtocol: String, systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "topicWithOldMessageFormat"
val props = new Properties
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
createTopic(topic, numPartitions = 1, replicationFactor = 1, props)
@ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {1}")
@CsvSource(Array("kraft, true", "kraft, false"))
def testMetrics(quorum: String, systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "mytopic"
createTopic(topic,
numPartitions = 1,
replicationFactor = 1,
listenerName = interBrokerListenerName,
adminClientConfig = adminClientConfig)
val tp = new TopicPartition(topic, 0)

// Produce and consume some records
val numRecords = 10
val recordSize = 100000
val prop = new Properties()
// idempotence producer doesn't support old version of messages
prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
val producer = createProducer(configOverrides = prop)
sendRecords(producer, numRecords, recordSize, tp)

Expand All @@ -108,10 +106,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
verifyClientVersionMetrics(consumer.metrics, "Consumer")
verifyClientVersionMetrics(producer.metrics, "Producer")

val server = servers.head
verifyBrokerMessageConversionMetrics(server, recordSize, tp)
verifyBrokerErrorMetrics(servers.head)
verifyBrokerZkMetrics(server, topic)
val server = brokers.head
verifyBrokerMessageMetrics(server, recordSize, tp)
verifyBrokerErrorMetrics(server)

generateAuthenticationFailure(tp)
verifyBrokerAuthenticationMetrics(server)
Expand Down Expand Up @@ -195,7 +192,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
}

private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
private def verifyBrokerAuthenticationMetrics(server: KafkaBroker): Unit = {
val metrics = server.metrics.metrics
TestUtils.waitUntilTrue(() =>
maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) > 0,
Expand All @@ -206,37 +203,20 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics"))
}

private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = {
private def verifyBrokerMessageMetrics(server: KafkaBroker, recordSize: Int, tp: TopicPartition): Unit = {
val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
assertTrue(tempBytes >= recordSize, s"Unexpected temporary memory size requestBytes $requestBytes tempBytes $tempBytes")

verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
// if message conversion run too fast, the Math.round(value) may be 0.0, so using count to check whether the metric is updated
assertTrue(yammerHistogram(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").count() > 0, "MessageConversionsTimeMs count should be > 0")
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0)

// request size recorded for all request types, check one
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
}

private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
val histogram = yammerHistogram("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
// Latency is rounded to milliseconds, so check the count instead
val initialCount = histogram.count
servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
val newCount = histogram.count
assertTrue(newCount > initialCount, "ZooKeeper latency not recorded")

val min = histogram.min
assertTrue(min >= 0, s"Min latency should not be negative: $min")

assertEquals("CONNECTED", yammerMetricValue("SessionState"), s"Unexpected ZK state")
}

private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
private def verifyBrokerErrorMetrics(server: KafkaBroker): Unit = {

def errorMetricCount = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName == "ErrorsPerSec")

Expand All @@ -255,7 +235,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
// Check that error metrics are registered dynamically
val currentErrorMetricCount = errorMetricCount
assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
assertTrue(currentErrorMetricCount < 10, s"Too many error metrics $currentErrorMetricCount")
assertTrue(currentErrorMetricCount < 14, s"Too many error metrics $currentErrorMetricCount")

try {
consumer.partitionsFor("non-existing-topic")
Expand Down Expand Up @@ -300,16 +280,6 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
}

private def yammerHistogram(name: String): Histogram = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Histogram => m
case m => throw new AssertionError(s"Unexpected broker metric of class ${m.getClass}")
}
}

private def verifyYammerMetricRecorded(name: String, verify: Double => Boolean = d => d > 0): Double = {
val metricValue = yammerMetricValue(name).asInstanceOf[Double]
assertTrue(verify(metricValue), s"Broker metric not recorded correctly for $name value $metricValue")
Expand Down

0 comments on commit beb790e

Please sign in to comment.