Skip to content

Commit

Permalink
KAFKA-17925 Convert Kafka Client integration tests to use KRaft (apac…
Browse files Browse the repository at this point in the history
…he#17670)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
kirktrue authored Nov 10, 2024
1 parent 0bc91be commit 42ea29c
Show file tree
Hide file tree
Showing 89 changed files with 981 additions and 946 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testListMaxTimestampWithEmptyLog(quorum: String): Unit = {
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName)
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset())
Expand All @@ -88,7 +88,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {


@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
produceMessagesInOneBatch("gzip")
verifyListOffsets()
Expand All @@ -102,7 +102,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
produceMessagesInOneBatch()
verifyListOffsets()
Expand All @@ -117,7 +117,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeNonCompressedRecordsInSeparateBatch(quorum: String): Unit = {
produceMessagesInSeparateBatch()
verifyListOffsets()
Expand Down Expand Up @@ -162,7 +162,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
val props: Properties = new Properties()
props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
Expand All @@ -172,7 +172,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
val props: Properties = new Properties()
props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
Expand All @@ -182,7 +182,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testThreeCompressedRecordsInSeparateBatch(quorum: String): Unit = {
produceMessagesInSeparateBatch("gzip")
verifyListOffsets()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -86,7 +86,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -98,7 +98,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
Expand All @@ -110,7 +110,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
Expand All @@ -122,7 +122,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionTime(quorum: String): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
Expand All @@ -134,7 +134,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateRemoteTopicWithInvalidRetentionSize(quorum: String): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
Expand All @@ -146,7 +146,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testCreateCompactedRemoteStorage(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand Down Expand Up @@ -301,7 +301,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
Expand All @@ -319,7 +319,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(quorum: String): Unit = {
val admin = createAdminClient()

Expand All @@ -343,7 +343,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionTimeTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
Expand All @@ -364,7 +364,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithValidRetentionSizeTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
Expand All @@ -385,7 +385,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionTime(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
Expand All @@ -405,7 +405,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionSize(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
Expand Down Expand Up @@ -474,7 +474,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testTopicDeletion(quorum: String): Unit = {
MyRemoteStorageManager.deleteSegmentEventCounter.set(0)
val numPartitions = 2
Expand All @@ -493,7 +493,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -515,7 +515,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)
Expand Down
Loading

0 comments on commit 42ea29c

Please sign in to comment.