Skip to content

Commit b46383f

Browse files
committed
feat(wal):reduce concurrent conflicts between block write operations and poll operations (AutoMQ#1550)
1 parent 1ccbeff commit b46383f

File tree

2 files changed

+11
-16
lines changed

2 files changed

+11
-16
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockBatch.java

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class BlockBatch {
2121
private final Collection<Block> blocks;
2222
private final long startOffset;
2323
private final long endOffset;
24+
private final long blockBatchSize;
2425

2526
public BlockBatch(Collection<Block> blocks) {
2627
assert !blocks.isEmpty();
@@ -33,6 +34,9 @@ public BlockBatch(Collection<Block> blocks) {
3334
.map(b -> b.startOffset() + b.size())
3435
.max(Long::compareTo)
3536
.orElseThrow();
37+
this.blockBatchSize = blocks.stream()
38+
.mapToLong(Block::size)
39+
.sum();
3640
}
3741

3842
public long startOffset() {
@@ -47,6 +51,10 @@ public Collection<Block> blocks() {
4751
return Collections.unmodifiableCollection(blocks);
4852
}
4953

54+
public long blockBatchSize(){
55+
return blockBatchSize;
56+
}
57+
5058
public Iterator<CompletableFuture<AppendResult.CallbackResult>> futures() {
5159
return new Iterator<>() {
5260
private final Iterator<Block> blockIterator = blocks.iterator();

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java

+3-16
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import com.automq.stream.utils.Threads;
2323
import java.util.Collection;
2424
import java.util.LinkedList;
25-
import java.util.PriorityQueue;
2625
import java.util.Queue;
2726
import java.util.concurrent.BlockingQueue;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.PriorityBlockingQueue;
3030
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +71,7 @@ public class SlidingWindowService {
7171
/**
7272
* Blocks that are being written.
7373
*/
74-
private final Queue<Long> writingBlocks = new PriorityQueue<>();
74+
private final Queue<Long> writingBlocks = new PriorityBlockingQueue<>();
7575
/**
7676
* Whether the service is initialized.
7777
* After the service is initialized, data in {@link #windowCoreData} is valid.
@@ -331,23 +331,10 @@ private BlockBatch pollBlocksLocked() {
331331
* Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
332332
*/
333333
private long wroteBlocks(BlockBatch wroteBlocks) {
334-
blockLock.lock();
335-
try {
336-
return wroteBlocksLocked(wroteBlocks);
337-
} finally {
338-
blockLock.unlock();
339-
}
340-
}
341-
342-
/**
343-
* Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
344-
* Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked.
345-
*/
346-
private long wroteBlocksLocked(BlockBatch wroteBlocks) {
347334
boolean removed = writingBlocks.remove(wroteBlocks.startOffset());
348335
assert removed;
349336
if (writingBlocks.isEmpty()) {
350-
return getCurrentBlockLocked().startOffset();
337+
return wroteBlocks.startOffset() + WALUtil.alignLargeByBlockSize(wroteBlocks.blockBatchSize());
351338
}
352339
return writingBlocks.peek();
353340
}

0 commit comments

Comments
 (0)