Skip to content

Commit 2758656

Browse files
committed
feat(wal):reduce concurrent conflicts between block write operations and poll operations AutoMQ#1550
1 parent 7d92cc0 commit 2758656

File tree

3 files changed

+49
-25
lines changed

3 files changed

+49
-25
lines changed

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/Block.java

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public interface Block {
2828
*/
2929
long startOffset();
3030

31+
/**
32+
* The start time of this block*
33+
* @return
34+
*/
35+
long startTime();
3136
/**
3237
* Append a record to this block.
3338
* Cannot be called after {@link #data()} is called.

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public long startOffset() {
6565
return startOffset;
6666
}
6767

68+
@Override
69+
public long startTime() {
70+
return this.startTime;
71+
}
72+
6873
/**
6974
* Note: this method is NOT thread safe.
7075
*/
@@ -120,4 +125,9 @@ public long size() {
120125
public void polled() {
121126
StorageOperationStats.getInstance().appendWALBlockPolledStats.record(TimerUtil.durationElapsedAs(startTime, TimeUnit.NANOSECONDS));
122127
}
128+
129+
@Override
130+
public boolean isEmpty() {
131+
return records.isEmpty();
132+
}
123133
}

s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java

+34-25
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.LinkedList;
2525
import java.util.PriorityQueue;
2626
import java.util.Queue;
27+
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.LinkedBlockingQueue;
2830
import java.util.concurrent.ScheduledExecutorService;
2931
import java.util.concurrent.TimeUnit;
3032
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +64,8 @@ public class SlidingWindowService {
6264
* The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}.
6365
*/
6466
private final Lock blockLock = new ReentrantLock();
67+
68+
private final Lock pollBlocKLock = new ReentrantLock();
6569
/**
6670
* Blocks that are being written.
6771
*/
@@ -80,11 +84,11 @@ public class SlidingWindowService {
8084
* Blocks that are waiting to be written.
8185
* All blocks in this queue are ordered by the start offset.
8286
*/
83-
private Queue<Block> pendingBlocks = new LinkedList<>();
87+
private BlockingQueue<Block> pendingBlocks = new LinkedBlockingQueue<>();
8488
/**
8589
* The current block, records are added to this block.
8690
*/
87-
private Block currentBlock;
91+
private volatile Block currentBlock;
8892

8993
/**
9094
* The thread pool for write operations.
@@ -98,7 +102,7 @@ public class SlidingWindowService {
98102
/**
99103
* The last time when a batch of blocks is written to the disk.
100104
*/
101-
private long lastWriteTimeNanos = 0;
105+
private volatile long lastWriteTimeNanos = 0;
102106

103107
public SlidingWindowService(WALChannel walChannel, int ioThreadNums, long upperLimit, long scaleUnit,
104108
long blockSoftLimit, int writeRateLimit, WALHeaderFlusher flusher) {
@@ -276,38 +280,43 @@ private Block nextBlock(Block previousBlock) {
276280
* Get all blocks to be written. If there is no non-empty block, return null.
277281
*/
278282
private BlockBatch pollBlocks() {
279-
blockLock.lock();
280-
try {
281-
return pollBlocksLocked();
282-
} finally {
283-
blockLock.unlock();
283+
if (this.pollBlocKLock.tryLock()) {
284+
try {
285+
return pollBlocksLocked();
286+
} finally {
287+
this.pollBlocKLock.unlock();
288+
}
284289
}
290+
return null;
285291
}
286292

287293
/**
288294
* Get all blocks to be written. If there is no non-empty block, return null.
289295
* Note: this method is NOT thread safe, and it should be called with {@link #blockLock} locked.
290296
*/
291297
private BlockBatch pollBlocksLocked() {
292-
Block currentBlock = getCurrentBlockLocked();
293-
294-
boolean isPendingBlockEmpty = pendingBlocks.isEmpty();
295-
boolean isCurrentBlockEmpty = currentBlock == null || currentBlock.isEmpty();
296-
if (isPendingBlockEmpty && isCurrentBlockEmpty) {
297-
// No record to be written
298-
return null;
298+
Block currentBlock = this.currentBlock;
299+
boolean isCurrentBlockNotEmpty = currentBlock != null && !currentBlock.isEmpty();
300+
if (isCurrentBlockNotEmpty) {
301+
long time = System.nanoTime();
302+
if (time - currentBlock.startTime() > minWriteIntervalNanos) {
303+
if (this.blockLock.tryLock()) {
304+
try {
305+
currentBlock = this.getCurrentBlockLocked();
306+
if (time - currentBlock.startTime() > minWriteIntervalNanos) {
307+
pendingBlocks.add(currentBlock);
308+
setCurrentBlockLocked(nextBlock(currentBlock));
309+
}
310+
} finally {
311+
this.blockLock.unlock();
312+
}
313+
}
314+
}
299315
}
300316

301-
Collection<Block> blocks;
302-
if (!isPendingBlockEmpty) {
303-
blocks = pendingBlocks;
304-
pendingBlocks = new LinkedList<>();
305-
} else {
306-
blocks = new LinkedList<>();
307-
}
308-
if (!isCurrentBlockEmpty) {
309-
blocks.add(currentBlock);
310-
setCurrentBlockLocked(nextBlock(currentBlock));
317+
Collection<Block> blocks = new LinkedList<>();
318+
while (!pendingBlocks.isEmpty()) {
319+
blocks.add(pendingBlocks.poll());
311320
}
312321

313322
BlockBatch blockBatch = new BlockBatch(blocks);

0 commit comments

Comments
 (0)