Skip to content

Commit 8ffec25

Browse files
committed
feat(wal): reduce concurrent conflicts between block write operations and poll operations (AutoMQ#1550)
1 parent 3cc61aa commit 8ffec25

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ public class SlidingWindowService {
107107
private volatile long lastWriteTimeNanos = 0;
108108

109109
/**
110-
* The maximum offset currently written into writeBlocks.*
110+
* The maximum alignment offset in {@link #writingBlocks}.*
111111
*/
112-
private long maxWriteBlockOffset = 0;
112+
private long maxAlignWriteBlockOffset = 0;
113113

114114
public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit,
115115
long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) {
@@ -321,14 +321,18 @@ private BlockBatch pollBlocksLocked() {
321321
}
322322
}
323323

324+
if (pendingBlocks.isEmpty()) {
325+
return null;
326+
}
324327
Collection<Block> blocks = new LinkedList<>();
328+
Block leastBlock = null;
325329
while (!pendingBlocks.isEmpty()) {
326-
blocks.add(pendingBlocks.poll());
330+
leastBlock = pendingBlocks.poll();
331+
blocks.add(leastBlock);
327332
}
328-
329333
BlockBatch blockBatch = new BlockBatch(blocks);
330334
writingBlocks.add(blockBatch.startOffset());
331-
maxWriteBlockOffset = blockBatch.endOffset();
335+
maxAlignWriteBlockOffset = nextBlockStartOffset(leastBlock);
332336

333337
return blockBatch;
334338
}
@@ -352,7 +356,7 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) {
352356
boolean removed = writingBlocks.remove(wroteBlocks.startOffset());
353357
assert removed;
354358
if (writingBlocks.isEmpty()) {
355-
return this.maxWriteBlockOffset;
359+
return this.maxAlignWriteBlockOffset;
356360
}
357361
return writingBlocks.peek();
358362
}

0 commit comments

Comments
 (0)