20
20
import com .automq .stream .utils .FutureUtil ;
21
21
import com .automq .stream .utils .ThreadUtils ;
22
22
import com .automq .stream .utils .Threads ;
23
- import java .util .Collection ;
24
23
import java .util .LinkedList ;
24
+ import java .util .List ;
25
25
import java .util .Queue ;
26
- import java .util .PriorityQueue ;
27
26
import java .util .concurrent .BlockingQueue ;
28
27
import java .util .concurrent .ExecutorService ;
29
28
import java .util .concurrent .LinkedBlockingQueue ;
29
+ import java .util .concurrent .PriorityBlockingQueue ;
30
30
import java .util .concurrent .ScheduledExecutorService ;
31
31
import java .util .concurrent .TimeUnit ;
32
32
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -64,14 +64,10 @@ public class SlidingWindowService {
64
64
* The lock of {@link #pendingBlocks}, {@link #currentBlock}.
65
65
*/
66
66
private final Lock blockLock = new ReentrantLock ();
67
- /**
68
- * The lock of {@link #pendingBlocks}, {@link #writingBlocks}.
69
- */
70
- private final Lock pollBlockLock = new ReentrantLock ();
71
67
/**
72
68
* Blocks that are being written.
73
69
*/
74
- private final Queue <Long > writingBlocks = new PriorityQueue <>();
70
+ private final Queue <Long > writingBlocks = new PriorityBlockingQueue <>();
75
71
/**
76
72
* Whether the service is initialized.
77
73
* After the service is initialized, data in {@link #windowCoreData} is valid.
@@ -109,7 +105,7 @@ public class SlidingWindowService {
109
105
/**
110
106
* The maximum alignment offset in {@link #writingBlocks}.*
111
107
*/
112
- private long maxAlignWriteBlockOffset = 0 ;
108
+ private volatile long maxAlignWriteBlockOffset = 0 ;
113
109
114
110
public SlidingWindowService (WALChannel walChannel , int ioThreadNums , long upperLimit , long scaleUnit ,
115
111
long blockSoftLimit , int writeRateLimit , WALHeaderFlusher flusher ) {
@@ -161,6 +157,18 @@ public boolean shutdown(long timeout, TimeUnit unit) {
161
157
return gracefulShutdown ;
162
158
}
163
159
160
+ /**
161
+ * Try to wake up the @{@link #pollBlockScheduler}.*
162
+ */
163
+ public void tryWakeupPoll () {
164
+ // Avoid frequently wake-ups
165
+ long now = System .nanoTime ();
166
+ if (now - lastWriteTimeNanos >= minWriteIntervalNanos ) {
167
+ this .pollBlockScheduler .schedule (this ::tryWriteBlock , 0 , TimeUnit .NANOSECONDS );
168
+ }
169
+
170
+ }
171
+
164
172
/**
165
173
* Try to write a block. If it exceeds the rate limit, it will return immediately.
166
174
*/
@@ -179,7 +187,7 @@ public void tryWriteBlock() {
179
187
/**
180
188
* Try to acquire the write rate limit.
181
189
*/
182
- synchronized private boolean tryAcquireWriteRateLimit () {
190
+ private boolean tryAcquireWriteRateLimit () {
183
191
long now = System .nanoTime ();
184
192
if (now - lastWriteTimeNanos < minWriteIntervalNanos ) {
185
193
return false ;
@@ -287,21 +295,27 @@ private Block nextBlock(Block previousBlock) {
287
295
* Get all blocks to be written. If there is no non-empty block, return null.
288
296
*/
289
297
private BlockBatch pollBlocks () {
290
- if (this .pollBlockLock .tryLock ()) {
291
- try {
292
- return pollBlocksLocked ();
293
- } finally {
294
- this .pollBlockLock .unlock ();
295
- }
298
+ fetchFromCurrentBlock ();
299
+ if (pendingBlocks .isEmpty ()) {
300
+ return null ;
301
+ }
302
+ List <Block > blocks = new LinkedList <>();
303
+
304
+ while (!pendingBlocks .isEmpty ()) {
305
+ blocks .add (pendingBlocks .poll ());
296
306
}
297
- return null ;
307
+ BlockBatch blockBatch = new BlockBatch (blocks );
308
+ writingBlocks .add (blockBatch .startOffset ());
309
+
310
+ maxAlignWriteBlockOffset = nextBlockStartOffset (blocks .get (blocks .size () - 1 ));
311
+
312
+ return blockBatch ;
298
313
}
299
314
300
315
/**
301
- * Get all blocks to be written. If there is no non-empty block, return null.
302
- * Note: this method is NOT thread safe, and it should be called with {@link #pollBlockLock} locked.
316
+ * Fetch a block that is not empty and has been created for a duration longer than `minWriteIntervalNanos`.
303
317
*/
304
- private BlockBatch pollBlocksLocked () {
318
+ private void fetchFromCurrentBlock () {
305
319
Block currentBlock = this .currentBlock ;
306
320
boolean isCurrentBlockNotEmpty = currentBlock != null && !currentBlock .isEmpty ();
307
321
if (isCurrentBlockNotEmpty ) {
@@ -320,39 +334,12 @@ private BlockBatch pollBlocksLocked() {
320
334
}
321
335
}
322
336
}
323
-
324
- if (pendingBlocks .isEmpty ()) {
325
- return null ;
326
- }
327
- Collection <Block > blocks = new LinkedList <>();
328
- Block leastBlock = null ;
329
- while (!pendingBlocks .isEmpty ()) {
330
- leastBlock = pendingBlocks .poll ();
331
- blocks .add (leastBlock );
332
- }
333
- BlockBatch blockBatch = new BlockBatch (blocks );
334
- writingBlocks .add (blockBatch .startOffset ());
335
- maxAlignWriteBlockOffset = nextBlockStartOffset (leastBlock );
336
-
337
- return blockBatch ;
338
337
}
339
338
340
339
/**
341
340
* Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
342
341
*/
343
342
private long wroteBlocks (BlockBatch wroteBlocks ) {
344
- this .pollBlockLock .lock ();
345
- try {
346
- return wroteBlocksLocked (wroteBlocks );
347
- } finally {
348
- this .pollBlockLock .unlock ();
349
- }
350
- }
351
- /**
352
- * Finish the given block batch, and return the start offset of the first block which has not been flushed yet.
353
- * Note: this method is NOT thread safe, and it should be called with {@link #pollBlockLock} locked.
354
- */
355
- private long wroteBlocksLocked (BlockBatch wroteBlocks ) {
356
343
boolean removed = writingBlocks .remove (wroteBlocks .startOffset ());
357
344
assert removed ;
358
345
if (writingBlocks .isEmpty ()) {
0 commit comments