Skip to content

Commit e5c7479

Browse files
feat(tests): test s3 wal in automq enhanced e2e tests (AutoMQ#1712)
* feat(tests): enable s3 wal in e2e test Signed-off-by: SSpirits <admin@lv5.moe> * feat(tests): test s3 wal in automq enhanced e2e tests Signed-off-by: SSpirits <admin@lv5.moe> --------- Signed-off-by: SSpirits <admin@lv5.moe>
1 parent f4d718b commit e5c7479

File tree

4 files changed

+30
-23
lines changed

4 files changed

+30
-23
lines changed

tests/kafkatest/automq/autobalancer_test.py

+15-12
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from ducktape.mark import parametrize
1010
from ducktape.mark.resource import cluster
1111
from ducktape.tests.test import Test
12-
from kafkatest.automq.automq_e2e_util import (run_simple_load, TOPIC, append_info)
12+
from ducktape.mark import matrix
13+
from kafkatest.automq.automq_e2e_util import (FILE_WAL, S3_WAL, run_simple_load, TOPIC, append_info)
1314
from kafkatest.services.kafka import KafkaService
1415

1516
# Configuration constants for the AutoBalancer
@@ -94,7 +95,7 @@ def __init__(self, test_context):
9495
self.avg_deviation = 0.2
9596
self.maximum_broker_deviation_percentage = 0.15
9697

97-
def create_kafka(self, num_nodes=1, partition=1, exclude_broker=None, exclude_topic=None, replica_assignment=None):
98+
def create_kafka(self, num_nodes=1, partition=1, exclude_broker=None, exclude_topic=None, replica_assignment=None, wal='file'):
9899
"""
99100
Create and configure a Kafka cluster for testing.
100101
@@ -123,6 +124,7 @@ def create_kafka(self, num_nodes=1, partition=1, exclude_broker=None, exclude_to
123124
[REPORT_INTERVAL, str(4000)],
124125
[DETECT_INTERVAL, str(8000)],
125126
[METRIC_REPORTERS, 'kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter'],
127+
['s3.wal.path', FILE_WAL if wal == 'file' else S3_WAL],
126128
]
127129

128130
if exclude_broker:
@@ -153,16 +155,16 @@ def create_kafka(self, num_nodes=1, partition=1, exclude_broker=None, exclude_to
153155
self.start = True
154156

155157
@cluster(num_nodes=5)
156-
@parametrize(automq_num_nodes=2, partition=4, replica_assignment='1,1,1,2')
157-
def test_action(self, automq_num_nodes, partition, replica_assignment):
158+
@matrix(automq_num_nodes=[2], partition=[4], replica_assignment=['1,1,1,2'], wal=['file', 's3'])
159+
def test_action(self, automq_num_nodes, partition, replica_assignment, wal):
158160
"""
159161
Test throughput distribution across brokers
160162
:param automq_num_nodes: Number of automq
161163
:param partition: Number of partitions
162164
:param replica_assignment: Replica assignment for partitions
163165
"""
164166
success, msg = True, ''
165-
self.create_kafka(num_nodes=automq_num_nodes, partition=partition, replica_assignment=replica_assignment)
167+
self.create_kafka(num_nodes=automq_num_nodes, partition=partition, replica_assignment=replica_assignment, wal=wal)
166168
self.kafka.start()
167169

168170
run_simple_load(test_context=self.context, kafka=self.kafka, logger=self.logger, topic=self.topic,
@@ -176,8 +178,8 @@ def test_action(self, automq_num_nodes, partition, replica_assignment):
176178
assert success, msg
177179

178180
@cluster(num_nodes=4)
179-
@parametrize(automq_num_nodes=2, exclude_broker='2', partition=4, replica_assignment='1,1,1,2')
180-
def test_broker_white_list(self, automq_num_nodes, exclude_broker, partition, replica_assignment):
181+
@matrix(automq_num_nodes=[2], exclude_broker=['2'], partition=[4], replica_assignment=['1,1,1,2'], wal=['file', 's3'])
182+
def test_broker_white_list(self, automq_num_nodes, exclude_broker, partition, replica_assignment, wal):
181183
"""
182184
Test broker exclusion functionality
183185
:param automq_num_nodes: Number of automq
@@ -187,22 +189,23 @@ def test_broker_white_list(self, automq_num_nodes, exclude_broker, partition, re
187189
"""
188190
success, msg = True, ''
189191
self.create_kafka(num_nodes=automq_num_nodes, exclude_broker=exclude_broker, partition=partition,
190-
replica_assignment=replica_assignment)
192+
replica_assignment=replica_assignment, wal=wal)
191193
self.kafka.start()
192194
before = self.kafka.parse_describe_topic(self.kafka.describe_topic(TOPIC))
193195
run_simple_load(test_context=self.context, kafka=self.kafka, logger=self.logger, topic=self.topic,
194196
num_records=20000, throughput=1300)
195197
after = self.kafka.parse_describe_topic(self.kafka.describe_topic(TOPIC))
196198

197-
success_, msg_ = check_partition_eq(topic_info1=before, topic_info2=after)
199+
success_, msg_ = check_partition_eq(topic_info1=before,
200+
topic_info2=after)
198201
success = success and success_
199202
msg = append_info(msg, success_, msg_)
200203

201204
assert success, msg
202205

203206
@cluster(num_nodes=6)
204-
@parametrize(automq_num_nodes=2)
205-
def test_topic_white_list(self, automq_num_nodes):
207+
@matrix(automq_num_nodes=[2], wal=['file', 's3'])
208+
def test_topic_white_list(self, automq_num_nodes, wal):
206209
"""
207210
Test topic exclusion functionality
208211
:param automq_num_nodes: Number of automq
@@ -224,7 +227,7 @@ def test_topic_white_list(self, automq_num_nodes):
224227
"configs": {"min.insync.replicas": 1},
225228
"replica-assignment": '1,1,1,2',
226229
}
227-
self.create_kafka(num_nodes=automq_num_nodes, exclude_topic=topic1, partition=1, replica_assignment='1')
230+
self.create_kafka(num_nodes=automq_num_nodes, exclude_topic=topic1, partition=1, replica_assignment='1', wal=wal)
228231
self.kafka.start()
229232
self.kafka.create_topic(topic_cfg1)
230233
self.kafka.create_topic(topic_cfg2)

tests/kafkatest/automq/automq_e2e_util.py

+2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ def publish_broker_configuration(kafka, producer_byte_rate, consumer_byte_rate,
123123
JMX_TOPIC_IN = f'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={TOPIC}'
124124
JMX_TOPIC_OUT = f'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic={TOPIC}'
125125
JMX_ONE_MIN = ':OneMinuteRate'
126+
FILE_WAL = '0@file:///mnt/kafka/kafka-data-logs-1/s3wal'
127+
S3_WAL = '0@s3://ko3?region=us-east-1&endpoint=http://10.5.0.2:4566&pathStyle=false&batchInterval=100&maxBytesInBatch=4194304&maxUnflushedBytes=1073741824&maxInflightUploadCount=50'
126128

127129

128130
def run_perf_producer(test_context, kafka, num_records=RECORD_NUM, throughput=DEFAULT_THROUGHPUT,

tests/kafkatest/automq/memory_occupancy_test.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafkatest.services.kafka import KafkaService
2020
from kafkatest.version import DEV_BRANCH, KafkaVersion
2121
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService
22-
from kafkatest.automq.automq_e2e_util import formatted_time, parse_log_entry, parse_producer_performance_stdout
22+
from kafkatest.automq.automq_e2e_util import FILE_WAL, S3_WAL, formatted_time, parse_log_entry, parse_producer_performance_stdout
2323

2424

2525
class TestJVMMemoryOccupancy(Test):
@@ -35,7 +35,7 @@ def __init__(self, test_context):
3535
self.consume_group = 'test_group'
3636
self.records_consumed = []
3737

38-
def create_kafka(self, num_nodes=1, partition=None, log_size=None, block_size=None, **kwargs):
38+
def create_kafka(self, num_nodes=1, partition=None, log_size=None, block_size=None, wal='file', **kwargs):
3939
"""
4040
Create and configure Kafka service.
4141
@@ -52,7 +52,8 @@ def create_kafka(self, num_nodes=1, partition=None, log_size=None, block_size=No
5252
['s3.wal.cache.size', str(log_size)],
5353
['s3.wal.capacity', str(log_size)],
5454
['s3.wal.upload.threshold', str(log_size // 4)],
55-
['s3.block.cache.size', str(block_size)]
55+
['s3.block.cache.size', str(block_size)],
56+
['s3.wal.path', FILE_WAL if wal == 'file' else S3_WAL],
5657
]
5758

5859
self.kafka = KafkaService(
@@ -105,8 +106,8 @@ def check_the_consumption_quantity(self, records):
105106
assert int(receive_num) == records, f"Receive count does not match the expected records count: expected {records}, but got {receive_num}"
106107

107108
@cluster(num_nodes=3)
108-
@matrix(partition=[128, 512], log_size=[256 * 1024 * 1024], block_size=[128 * 1024 * 1024, 256 * 1024 * 1024])
109-
def test(self, partition, log_size, block_size):
109+
@matrix(partition=[128, 512], log_size=[256 * 1024 * 1024], block_size=[128 * 1024 * 1024, 256 * 1024 * 1024], wal=['file', 's3'])
110+
def test(self, partition, log_size, block_size, wal):
110111
"""
111112
At any time, 1/writable record in Metric<=log cache size+100MB
112113
At any time, 11/block_cache in Metric<=block cache size
@@ -116,7 +117,7 @@ def test(self, partition, log_size, block_size):
116117
:param block_size: Block size for Kafka configuration.
117118
"""
118119
# Start Kafka
119-
self.create_kafka(partition=partition, log_size=log_size, block_size=block_size)
120+
self.create_kafka(partition=partition, log_size=log_size, block_size=block_size, wal=wal)
120121
self.kafka.start()
121122

122123
# Check Kafka configuration

tests/kafkatest/automq/quota_test.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from kafkatest.services.kafka import KafkaService
2121
from kafkatest.services.performance import ProducerPerformanceService
2222
from kafkatest.version import DEV_BRANCH
23-
from kafkatest.automq.automq_e2e_util import publish_broker_configuration
23+
from kafkatest.automq.automq_e2e_util import FILE_WAL, S3_WAL, publish_broker_configuration
2424

2525

2626
class QuotaTest(Test):
@@ -46,7 +46,7 @@ def __init__(self, test_context):
4646
self.success = True
4747
self.msg = ''
4848

49-
def create_kafka(self, test_context, broker_quota_in, broker_quota_out):
49+
def create_kafka(self, test_context, broker_quota_in, broker_quota_out, broker_wal):
5050
log_size = 256 * 1024 * 1024
5151
block_size = 256 * 1024 * 1024
5252
server_prop_overrides = [
@@ -57,6 +57,7 @@ def create_kafka(self, test_context, broker_quota_in, broker_quota_out):
5757
['s3.wal.capacity', str(log_size)],
5858
['s3.wal.upload.threshold', str(log_size // 4)],
5959
['s3.block.cache.size', str(block_size)],
60+
['s3.wal.path', FILE_WAL if broker_wal == 'file' else S3_WAL],
6061
]
6162
self.kafka = KafkaService(test_context, num_nodes=1, zk=None,
6263
kafka_heap_opts="-Xmx2048m -Xms2048m",
@@ -100,9 +101,9 @@ def start_console_consumer(self):
100101
assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
101102

102103
@cluster(num_nodes=5)
103-
@matrix(broker_in=[2500000], broker_out=[2000000])
104-
def test_quota(self, broker_in, broker_out):
105-
self.create_kafka(self.test_context, broker_in, broker_out)
104+
@matrix(broker_in=[2500000], broker_out=[2000000], wal=['file', 's3'])
105+
def test_quota(self, broker_in, broker_out, wal):
106+
self.create_kafka(self.test_context, broker_in, broker_out, wal)
106107
self.kafka.start()
107108
records = 50000
108109
self.logger.info(f'update to {broker_in},{broker_out}')

0 commit comments

Comments
 (0)