Skip to content

Commit f059474

Browse files
feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel (AutoMQ#1941)
* feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel * feat(s3stream): support limit write bandwidth for WALBlockDeviceChannel
1 parent 40f0c64 commit f059474

File tree

5 files changed

+141
-5
lines changed

5 files changed

+141
-5
lines changed

core/src/main/java/kafka/automq/AutoMQConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class AutoMQConfig {
5353
public static final String S3_OPS_BUCKETS_DOC = "With the same format as s3.data.buckets";
5454

5555
public static final String S3_WAL_PATH_CONFIG = "s3.wal.path";
56-
public static final String S3_WAL_PATH_DOC = "The local WAL path for AutoMQ can be set to a block device path such as 0@file:///dev/xxx?iops=3000&iodepth=8 or a filesystem file path." +
56+
public static final String S3_WAL_PATH_DOC = "The local WAL path for AutoMQ can be set to a block device path such as 0@file:///dev/xxx?iops=3000&iodepth=8&iobandwidth=157286400 or a filesystem file path." +
5757
"It is recommended to use a block device for better write performance.";
5858

5959
public static final String S3_WAL_CACHE_SIZE_CONFIG = "s3.wal.cache.size";

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java

+11
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public static BlockWALServiceBuilder builder(IdURI uri) {
155155
BlockWALService.BlockWALServiceBuilder builder = BlockWALService.builder(uri.path(), uri.extensionLong("capacity", 2147483648L));
156156
Optional.ofNullable(uri.extensionString("iops")).filter(StringUtils::isNumeric).ifPresent(v -> builder.writeRateLimit(Integer.parseInt(v)));
157157
Optional.ofNullable(uri.extensionString("iodepth")).filter(StringUtils::isNumeric).ifPresent(v -> builder.ioThreadNums(Integer.parseInt(v)));
158+
Optional.ofNullable(uri.extensionString("iobandwidth")).filter(StringUtils::isNumeric).ifPresent(v -> builder.writeBandwidthLimit(Long.parseLong(v)));
158159
return builder;
159160
}
160161

@@ -556,7 +557,10 @@ public static class BlockWALServiceBuilder {
556557
private long slidingWindowUpperLimit = 1 << 29; // 512MiB
557558
private long slidingWindowScaleUnit = 1 << 22; // 4MiB
558559
private long blockSoftLimit = 1 << 18; // 256KiB
560+
// wal io request limit
559561
private int writeRateLimit = 3000;
562+
// wal io bandwidth limit
563+
private long writeBandwidthLimit = 1024 * 1024 * 1024; // 1GB/s
560564
private int nodeId = NOOP_NODE_ID;
561565
private long epoch = NOOP_EPOCH;
562566
private boolean recoveryMode = false;
@@ -631,6 +635,11 @@ public BlockWALServiceBuilder writeRateLimit(int writeRateLimit) {
631635
return this;
632636
}
633637

638+
public BlockWALServiceBuilder writeBandwidthLimit(long writeBandwidthLimit) {
639+
this.writeBandwidthLimit = writeBandwidthLimit;
640+
return this;
641+
}
642+
634643
public BlockWALServiceBuilder nodeId(int nodeId) {
635644
this.nodeId = nodeId;
636645
return this;
@@ -654,6 +663,7 @@ public BlockWALService build() {
654663
BlockWALService blockWALService = new BlockWALService();
655664

656665
WALChannel.WALChannelBuilder walChannelBuilder = WALChannel.builder(blockDevicePath)
666+
.writeBandWidthLimit(writeBandwidthLimit)
657667
.capacity(blockDeviceCapacityWant)
658668
.initBufferSize(initBufferSize)
659669
.maxBufferSize(maxBufferSize)
@@ -710,6 +720,7 @@ public String toString() {
710720
+ ", slidingWindowScaleUnit=" + slidingWindowScaleUnit
711721
+ ", blockSoftLimit=" + blockSoftLimit
712722
+ ", writeRateLimit=" + writeRateLimit
723+
+ ", writeBandwidthLimit=" + writeBandwidthLimit
713724
+ ", nodeId=" + nodeId
714725
+ ", epoch=" + epoch
715726
+ ", recoveryMode=" + recoveryMode

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java

+60-3
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib;
1717
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils;
1818
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile;
19+
import com.google.common.annotations.VisibleForTesting;
20+
import io.github.bucket4j.BlockingBucket;
21+
import io.github.bucket4j.Bucket;
1922
import io.netty.buffer.ByteBuf;
2023
import java.io.File;
2124
import java.io.IOException;
2225
import java.nio.ByteBuffer;
26+
import java.time.Duration;
2327
import java.util.concurrent.ExecutionException;
2428
import io.netty.util.concurrent.FastThreadLocal;
2529
import org.slf4j.Logger;
@@ -31,6 +35,10 @@
3135
public class WALBlockDeviceChannel extends AbstractWALChannel {
3236
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
3337
private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available";
38+
public static final long MAX_RATE_LIMIT_BYTES_PER_MS = 1_000_000L;
39+
// the write size is expected to be at least BLOCK_SIZE (4096)
40+
// use scale when acquire bandwidth rate limiter.
41+
public static final long BANDWIDTH_RATE_LIMIT_SCALE_FACTOR = 2;
3442
final String path;
3543
final long capacityWant;
3644
final boolean recoveryMode;
@@ -59,12 +67,16 @@ protected ByteBuffer initialValue() {
5967
}
6068
};
6169

70+
private final long writeBandwidthLimit;
71+
private final BlockingBucket bucket;
72+
6273
public WALBlockDeviceChannel(String path, long capacityWant) {
63-
this(path, capacityWant, 0, 0, false);
74+
this(path, capacityWant, 0, 0, false,
75+
MAX_RATE_LIMIT_BYTES_PER_MS * 1000L);
6476
}
6577

6678
public WALBlockDeviceChannel(String path, long capacityWant, int initTempBufferSize, int maxTempBufferSize,
67-
boolean recoveryMode) {
79+
boolean recoveryMode, long writeBandwidthBytePerSecondLimit) {
6880
this.path = path;
6981
this.recoveryMode = recoveryMode;
7082
if (recoveryMode) {
@@ -88,9 +100,44 @@ public WALBlockDeviceChannel(String path, long capacityWant, int initTempBufferS
88100
throw new RuntimeException(String.format("block size %d is not a multiple of %d, update it by jvm option: -D%s=%d",
89101
WALUtil.BLOCK_SIZE, blockSize, WALUtil.BLOCK_SIZE_PROPERTY, blockSize));
90102
}
103+
104+
this.writeBandwidthLimit = writeBandwidthBytePerSecondLimit;
105+
this.bucket = buildBandWidthRateLimiter(writeBandwidthBytePerSecondLimit, blockSize).asBlocking();
91106
this.directIOLib = lib;
92107
}
93108

109+
/**
110+
* bucket4j limit max 1 token/nanosecond
111+
* after scale real limit is 4 bytes/nanosecond = 3814.7 MB/s.
112+
*/
113+
@VisibleForTesting
114+
public static long validRefillPerMs(long writeBandwidthBytePerSecondLimit, int blockSize) {
115+
if (WALUtil.BLOCK_SIZE % BANDWIDTH_RATE_LIMIT_SCALE_FACTOR != 0
116+
|| WALUtil.BLOCK_SIZE >> BANDWIDTH_RATE_LIMIT_SCALE_FACTOR <= 0) {
117+
throw new RuntimeException(String.format("block size %d is not a multiple of %d, update it by jvm option: -D%s=%d",
118+
WALUtil.BLOCK_SIZE, BANDWIDTH_RATE_LIMIT_SCALE_FACTOR, WALUtil.BLOCK_SIZE_PROPERTY, blockSize));
119+
}
120+
121+
long refillPerMs = (writeBandwidthBytePerSecondLimit / 1000) >> BANDWIDTH_RATE_LIMIT_SCALE_FACTOR;
122+
123+
if (refillPerMs > MAX_RATE_LIMIT_BYTES_PER_MS || refillPerMs <= 0) {
124+
throw new RuntimeException(String.format("block device writeBandwidthLimit %d not valid after scale by %d refillPerMs %d",
125+
writeBandwidthBytePerSecondLimit, BANDWIDTH_RATE_LIMIT_SCALE_FACTOR, refillPerMs));
126+
}
127+
128+
return refillPerMs;
129+
}
130+
131+
@VisibleForTesting
132+
public static Bucket buildBandWidthRateLimiter(long writeBandwidthBytePerSecondLimit, int blockSize) {
133+
long refillPerMs = validRefillPerMs(writeBandwidthBytePerSecondLimit, blockSize);
134+
135+
return Bucket.builder().addLimit(limit -> limit
136+
.capacity(writeBandwidthBytePerSecondLimit >> BANDWIDTH_RATE_LIMIT_SCALE_FACTOR)
137+
.refillIntervally(refillPerMs, Duration.ofMillis(1)))
138+
.build();
139+
}
140+
94141
/**
95142
* Check whether the {@link WALBlockDeviceChannel} is available for the given path.
96143
*
@@ -283,7 +330,17 @@ private void unalignedWrite(ByteBuf src, long position) throws IOException {
283330
}
284331

285332
private int write(ByteBuffer src, long position) throws IOException {
286-
assert WALUtil.isAligned(src.remaining());
333+
int size = src.remaining();
334+
assert WALUtil.isAligned(size);
335+
336+
if (size > 0) {
337+
try {
338+
// the write size is isAligned to BLOCK_SIZE so it is ok to scale
339+
bucket.consume(size >> BANDWIDTH_RATE_LIMIT_SCALE_FACTOR);
340+
} catch (InterruptedException e) {
341+
throw new IOException("write rate limit consume interrupted", e);
342+
}
343+
}
287344

288345
int bytesWritten = 0;
289346
while (src.hasRemaining()) {

s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.slf4j.LoggerFactory;
2222

2323
import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
24+
import static com.automq.stream.s3.wal.util.WALBlockDeviceChannel.MAX_RATE_LIMIT_BYTES_PER_MS;
2425
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;
2526

2627
/**
@@ -131,6 +132,7 @@ class WALChannelBuilder {
131132
private int initBufferSize;
132133
private int maxBufferSize;
133134
private boolean recoveryMode;
135+
private long writeBandwidthLimit = MAX_RATE_LIMIT_BYTES_PER_MS * 1000L;
134136

135137
private WALChannelBuilder(String path) {
136138
this.path = path;
@@ -162,6 +164,12 @@ public WALChannelBuilder recoveryMode(boolean recoveryMode) {
162164
return this;
163165
}
164166

167+
168+
public WALChannelBuilder writeBandWidthLimit(long writeBandwidthLimit) {
169+
this.writeBandwidthLimit = writeBandwidthLimit;
170+
return this;
171+
}
172+
165173
public WALChannel build() {
166174
String directNotAvailableMsg = WALBlockDeviceChannel.checkAvailable(path);
167175
boolean isBlockDevice = isBlockDevice(path);
@@ -186,7 +194,7 @@ public WALChannel build() {
186194
}
187195

188196
if (useDirect) {
189-
return new WALBlockDeviceChannel(path, capacity, initBufferSize, maxBufferSize, recoveryMode);
197+
return new WALBlockDeviceChannel(path, capacity, initBufferSize, maxBufferSize, recoveryMode, writeBandwidthLimit);
190198
} else {
191199
LOGGER.warn("Direct IO not used for WAL, which may cause performance degradation. path: {}, isBlockDevice: {}, reason: {}",
192200
new File(path).getAbsolutePath(), isBlockDevice, directNotAvailableMsg);

s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java

+60
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,28 @@
1212
package com.automq.stream.s3.wal.util;
1313

1414
import com.automq.stream.s3.TestUtils;
15+
import io.github.bucket4j.Bucket;
1516
import io.netty.buffer.ByteBuf;
1617
import io.netty.buffer.Unpooled;
1718
import java.io.IOException;
1819
import java.nio.ByteBuffer;
20+
import java.util.concurrent.TimeUnit;
21+
1922
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.Assertions;
2024
import org.junit.jupiter.api.BeforeEach;
2125
import org.junit.jupiter.api.Tag;
2226
import org.junit.jupiter.api.Test;
2327

28+
import static org.junit.jupiter.api.Assertions.assertTrue;
29+
2430
@Tag("S3Unit")
2531
public class WALChannelTest {
2632
public static final String TEST_BLOCK_DEVICE_KEY = "WAL_TEST_BLOCK_DEVICE";
2733

34+
long gbPerSeconds = 1024 * 1024 * 1024L;
35+
long mbPerSeconds = 1024 * 1024;
36+
2837
WALChannel walChannel;
2938

3039
@BeforeEach
@@ -78,4 +87,55 @@ void testWriteAndRead() throws IOException {
7887
assert read == content.length();
7988
assert readString.equals(content);
8089
}
90+
91+
@Test
92+
void testWALBandwidthLimit() throws InterruptedException {
93+
Bucket bucket = WALBlockDeviceChannel.buildBandWidthRateLimiter(150 * mbPerSeconds, 4096);
94+
TimeUnit.MILLISECONDS.sleep(10);
95+
Assertions.assertFalse(bucket.tryConsume((150 * mbPerSeconds >> WALBlockDeviceChannel.BANDWIDTH_RATE_LIMIT_SCALE_FACTOR) + 2));
96+
Assertions.assertTrue(bucket.tryConsume(150 * mbPerSeconds >> WALBlockDeviceChannel.BANDWIDTH_RATE_LIMIT_SCALE_FACTOR));
97+
}
98+
99+
@Test
100+
void testWALBandwidthLimitConfig() {
101+
try {
102+
WALBlockDeviceChannel.validRefillPerMs(
103+
150 * mbPerSeconds,
104+
512);
105+
106+
WALBlockDeviceChannel.validRefillPerMs(
107+
150 * mbPerSeconds,
108+
4096);
109+
110+
WALBlockDeviceChannel.validRefillPerMs(
111+
3 * gbPerSeconds,
112+
512);
113+
114+
WALBlockDeviceChannel.validRefillPerMs(
115+
3 * gbPerSeconds,
116+
4096);
117+
118+
WALBlockDeviceChannel.validRefillPerMs(
119+
WALBlockDeviceChannel.MAX_RATE_LIMIT_BYTES_PER_MS * 1000L,
120+
512);
121+
122+
WALBlockDeviceChannel.validRefillPerMs(
123+
WALBlockDeviceChannel.MAX_RATE_LIMIT_BYTES_PER_MS * 1000L,
124+
4096);
125+
126+
} catch (Exception e) {
127+
Assertions.fail("should not throw exception", e);
128+
}
129+
130+
boolean exceptionThrows = false;
131+
try {
132+
WALBlockDeviceChannel.validRefillPerMs(Long.MAX_VALUE, 4096);
133+
} catch (Exception e) {
134+
// expected.
135+
exceptionThrows = true;
136+
}
137+
138+
assertTrue(exceptionThrows);
139+
140+
}
81141
}

0 commit comments

Comments
 (0)