Skip to content

Commit 0847d72

Browse files
committed
feat(wal):reduce concurrent conflicts between block write operations and poll operations (AutoMQ#1550)
1 parent b46383f commit 0847d72

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java

-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ public class BlockBatch {
2121
private final Collection<Block> blocks;
2222
private final long startOffset;
2323
private final long endOffset;
24-
private final long blockBatchSize;
2524

2625
public BlockBatch(Collection<Block> blocks) {
2726
assert !blocks.isEmpty();
@@ -34,9 +33,6 @@ public BlockBatch(Collection<Block> blocks) {
3433
.map(b -> b.startOffset() + b.size())
3534
.max(Long::compareTo)
3635
.orElseThrow();
37-
this.blockBatchSize = blocks.stream()
38-
.mapToLong(Block::size)
39-
.sum();
4036
}
4137

4238
public long startOffset() {
@@ -51,10 +47,6 @@ public Collection<Block> blocks() {
5147
return Collections.unmodifiableCollection(blocks);
5248
}
5349

54-
public long blockBatchSize(){
55-
return blockBatchSize;
56-
}
57-
5850
public Iterator<CompletableFuture<AppendResult.CallbackResult>> futures() {
5951
return new Iterator<>() {
6052
private final Iterator<Block> blockIterator = blocks.iterator();

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import java.util.Collection;
2424
import java.util.LinkedList;
2525
import java.util.Queue;
26+
import java.util.PriorityQueue;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.LinkedBlockingQueue;
29-
import java.util.concurrent.PriorityBlockingQueue;
3030
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +71,7 @@ public class SlidingWindowService {
7171
/**
7272
* Blocks that are being written.
7373
*/
74-
private final Queue<Long> writingBlocks = new PriorityBlockingQueue<>();
74+
private final Queue<Long> writingBlocks = new PriorityQueue<>();
7575
/**
7676
* Whether the service is initialized.
7777
* After the service is initialized, data in {@link #windowCoreData} is valid.
@@ -106,6 +106,11 @@ public class SlidingWindowService {
106106
*/
107107
private volatile long lastWriteTimeNanos = 0;
108108

109+
/**
110+
* The maximum offset currently written into writeBlocks.*
111+
*/
112+
private long maxWriteBlockOffset = 0;
113+
109114
public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit,
110115
long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) {
111116
this.walChannel = walChannel;
@@ -323,6 +328,7 @@ private BlockBatch pollBlocksLocked() {
323328

324329
BlockBatch blockBatch = new BlockBatch(blocks);
325330
writingBlocks.add(blockBatch.startOffset());
331+
maxWriteBlockOffset = blockBatch.endOffset();
326332

327333
return blockBatch;
328334
}
@@ -331,10 +337,22 @@ private BlockBatch pollBlocksLocked() {
331337
* Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
332338
*/
333339
private long wroteBlocks(BlockBatch wroteBlocks) {
340+
this.pollBlockLock.lock();
341+
try {
342+
return wroteBlocksLocked(wroteBlocks);
343+
} finally {
344+
this.pollBlockLock.unlock();
345+
}
346+
}
347+
/**
348+
* Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
349+
* Note: this method is NOT thread safe, and it should be called with {@link #pollBlockLock} locked.
350+
*/
351+
private long wroteBlocksLocked(BlockBatch wroteBlocks) {
334352
boolean removed = writingBlocks.remove(wroteBlocks.startOffset());
335353
assert removed;
336354
if (writingBlocks.isEmpty()) {
337-
return wroteBlocks.startOffset() + WALUtil.alignLargeByBlockSize(wroteBlocks.blockBatchSize());
355+
return this.maxWriteBlockOffset;
338356
}
339357
return writingBlocks.peek();
340358
}

0 commit comments

Comments
 (0)