26
26
import java .util .concurrent .ScheduledExecutorService ;
27
27
import java .util .concurrent .ScheduledFuture ;
28
28
import java .util .concurrent .TimeUnit ;
29
+ import java .util .concurrent .locks .ReentrantLock ;
29
30
import java .util .function .Predicate ;
30
31
31
32
import org .jctools .maps .NonBlockingHashMap ;
@@ -215,7 +216,11 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) {
215
216
@ Nullable
216
217
private final String portName ;
217
218
private final Predicate <? super NodeAddress > nodeAddressFilter ;
219
+ private final long maxWatchAgeMillis ;
218
220
221
+ // watchLock may acquire the lock for a long time, so we need to use a separate lock.
222
+ @ SuppressWarnings ("PreferReentrantShortLock" )
223
+ private final ReentrantLock watchLock = new ReentrantLock ();
219
224
@ Nullable
220
225
private volatile Watch nodeWatch ;
221
226
@ Nullable
@@ -230,10 +235,19 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) {
230
235
@ Nullable
231
236
private volatile Integer nodePort ;
232
237
233
- private final ReentrantShortLock lock = new ReentrantShortLock ();
234
- @ GuardedBy ("lock " )
238
+ private final ReentrantShortLock schedulerLock = new ReentrantShortLock ();
239
+ @ GuardedBy ("schedulerLock " )
235
240
@ Nullable
236
- private ScheduledFuture <?> scheduledFuture ;
241
+ private ScheduledFuture <?> updateScheduledFuture ;
242
+ @ GuardedBy ("schedulerLock" )
243
+ @ Nullable
244
+ private ScheduledFuture <?> serviceScheduledFuture ;
245
+ @ GuardedBy ("schedulerLock" )
246
+ @ Nullable
247
+ private ScheduledFuture <?> nodeScheduledFuture ;
248
+ @ GuardedBy ("schedulerLock" )
249
+ @ Nullable
250
+ private ScheduledFuture <?> podScheduledFuture ;
237
251
238
252
private volatile boolean closed ;
239
253
private volatile int numServiceFailures ;
@@ -243,40 +257,52 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) {
243
257
KubernetesEndpointGroup (KubernetesClient client , @ Nullable String namespace , String serviceName ,
244
258
@ Nullable String portName , Predicate <? super NodeAddress > nodeAddressFilter ,
245
259
boolean autoClose , EndpointSelectionStrategy selectionStrategy ,
246
- boolean allowEmptyEndpoints , long selectionTimeoutMillis ) {
260
+ boolean allowEmptyEndpoints , long selectionTimeoutMillis , long maxWatchAgeMillis ) {
247
261
super (selectionStrategy , allowEmptyEndpoints , selectionTimeoutMillis );
248
262
this .client = client ;
249
263
this .namespace = namespace ;
250
264
this .serviceName = serviceName ;
251
265
this .portName = portName ;
252
266
this .nodeAddressFilter = nodeAddressFilter ;
253
267
this .autoClose = autoClose ;
268
+ this .maxWatchAgeMillis = maxWatchAgeMillis == Long .MAX_VALUE ? 0 : maxWatchAgeMillis ;
254
269
watchJob (this ::watchNode );
255
270
watchJob (this ::watchService );
256
271
}
257
272
258
273
private void watchService () {
259
- final Watch oldServiceWatch = serviceWatch ;
260
- if (oldServiceWatch != null ) {
261
- oldServiceWatch .close ();
262
- }
263
-
264
- if (closed ) {
265
- return ;
266
- }
267
- final Watch newServiceWatch ;
274
+ watchLock .lock ();
268
275
try {
269
- newServiceWatch = doWatchService ();
270
- } catch (Exception e ) {
271
- logger .warn ("[{}/{}] Failed to start the service watcher." , namespace , serviceName , e );
272
- return ;
273
- }
274
- // Recheck the closed flag because the doWatchService() method may take a while.
275
- if (closed ) {
276
- newServiceWatch .close ();
277
- } else {
276
+ final Watch oldServiceWatch = serviceWatch ;
277
+ if (oldServiceWatch != null ) {
278
+ oldServiceWatch .close ();
279
+ }
280
+ if (closed ) {
281
+ return ;
282
+ }
283
+ final Watch newServiceWatch ;
284
+ logger .info ("[{}/{}] Start the service watcher..." , namespace , serviceName );
285
+ try {
286
+ newServiceWatch = doWatchService ();
287
+ } catch (Exception e ) {
288
+ logger .warn ("[{}/{}] Failed to start the service watcher." , namespace , serviceName , e );
289
+ return ;
290
+ }
291
+ // Recheck the closed flag because the doWatchService() method may take a while.
292
+ if (closed ) {
293
+ newServiceWatch .close ();
294
+ return ;
295
+ }
278
296
serviceWatch = newServiceWatch ;
279
297
logger .info ("[{}/{}] Service watcher is started." , namespace , serviceName );
298
+ } finally {
299
+ watchLock .unlock ();
300
+ }
301
+
302
+ if (maxWatchAgeMillis > 0 ) {
303
+ logger .debug ("[{}/{}] Schedule a new Service watcher to start in {} ms." , namespace ,
304
+ serviceName , maxWatchAgeMillis );
305
+ scheduleWatchService (maxWatchAgeMillis );
280
306
}
281
307
}
282
308
@@ -333,14 +359,17 @@ public void eventReceived(Action action, Service service0) {
333
359
334
360
@ Override
335
361
public void onClose (WatcherException cause ) {
362
+ // Note: As per the JavaDoc of Watcher.onClose(), the method should not be implemented in a
363
+ // blocking way.
336
364
if (closed ) {
337
365
return ;
338
366
}
339
367
logger .warn ("[{}/{}] Service watcher is closed." , namespace , serviceName , cause );
340
368
logger .info ("[{}/{}] Reconnecting the service watcher..." , namespace , serviceName );
341
369
342
370
// Immediately retry on the first failure.
343
- watchJob (() -> watchService (), numServiceFailures ++);
371
+ final long delayMillis = delayMillis (numServiceFailures ++);
372
+ scheduleWatchService (delayMillis );
344
373
}
345
374
346
375
@ Override
@@ -357,27 +386,33 @@ public void onClose() {
357
386
}
358
387
359
388
private void watchPod () {
360
- final Watch oldPodWatch = podWatch ;
361
- if (oldPodWatch != null ) {
362
- oldPodWatch .close ();
363
- }
364
-
365
- if (closed ) {
366
- return ;
367
- }
368
- final Watch newPodwatch ;
389
+ watchLock .lock ();
369
390
try {
370
- newPodwatch = doWatchPod ();
371
- } catch (Exception e ) {
372
- logger .warn ("[{}/{}] Failed to start the pod watcher." , namespace , serviceName , e );
373
- return ;
374
- }
375
- // Recheck the closed flag because the doWatchPod() method may take a while.
376
- if (closed ) {
377
- newPodwatch .close ();
378
- } else {
379
- podWatch = newPodwatch ;
380
- logger .info ("[{}/{}] Pod watcher is started." , namespace , serviceName );
391
+ final Watch oldPodWatch = podWatch ;
392
+ if (oldPodWatch != null ) {
393
+ oldPodWatch .close ();
394
+ }
395
+
396
+ if (closed ) {
397
+ return ;
398
+ }
399
+ final Watch newPodwatch ;
400
+ logger .info ("[{}/{}] Start the pod watcher..." , namespace , serviceName );
401
+ try {
402
+ newPodwatch = doWatchPod ();
403
+ } catch (Exception e ) {
404
+ logger .warn ("[{}/{}] Failed to start the pod watcher." , namespace , serviceName , e );
405
+ return ;
406
+ }
407
+ // Recheck the closed flag because the doWatchPod() method may take a while.
408
+ if (closed ) {
409
+ newPodwatch .close ();
410
+ } else {
411
+ podWatch = newPodwatch ;
412
+ logger .info ("[{}/{}] Pod watcher is started." , namespace , serviceName );
413
+ }
414
+ } finally {
415
+ watchLock .unlock ();
381
416
}
382
417
}
383
418
@@ -428,7 +463,8 @@ public void onClose(WatcherException cause) {
428
463
logger .warn ("[{}/{}] Pod watcher is closed." , namespace , serviceName , cause );
429
464
logger .info ("[{}/{}] Reconnecting the pod watcher..." , namespace , serviceName );
430
465
431
- watchJob (() -> watchPod (), numPodFailures ++);
466
+ final long delayMillis = delayMillis (numPodFailures ++);
467
+ scheduleWatchPod (delayMillis );
432
468
}
433
469
434
470
@ Override
@@ -449,21 +485,33 @@ public void onClose() {
449
485
}
450
486
451
487
private void watchNode () {
452
- final Watch oldNodeWatch = nodeWatch ;
453
- if (oldNodeWatch != null ) {
454
- oldNodeWatch .close ();
455
- }
488
+ watchLock .lock ();
489
+ try {
490
+ final Watch oldNodeWatch = nodeWatch ;
491
+ if (oldNodeWatch != null ) {
492
+ oldNodeWatch .close ();
493
+ }
456
494
457
- if (closed ) {
458
- return ;
459
- }
460
- final Watch newNodeWatch = doWatchNode ();
461
- // Recheck the closed flag because the doWatchNode() method may take a while.
462
- if (closed ) {
463
- newNodeWatch .close ();
464
- } else {
495
+ if (closed ) {
496
+ return ;
497
+ }
498
+ logger .info ("[{}/{}] Start the node watcher..." , namespace , serviceName );
499
+ final Watch newNodeWatch = doWatchNode ();
500
+ // Recheck the closed flag because the doWatchNode() method may take a while.
501
+ if (closed ) {
502
+ newNodeWatch .close ();
503
+ return ;
504
+ }
465
505
nodeWatch = newNodeWatch ;
466
506
logger .info ("[{}/{}] Node watcher is started." , namespace , serviceName );
507
+ } finally {
508
+ watchLock .unlock ();
509
+ }
510
+
511
+ if (maxWatchAgeMillis > 0 ) {
512
+ logger .debug ("[{}/{}] Schedule a new Node watcher to start in {} ms." ,
513
+ namespace , serviceName , maxWatchAgeMillis );
514
+ scheduleWatchNode (maxWatchAgeMillis );
467
515
}
468
516
}
469
517
@@ -516,7 +564,8 @@ public void onClose(WatcherException cause) {
516
564
}
517
565
logger .warn ("[{}/{}] Node watcher is closed." , namespace , serviceName , cause );
518
566
logger .info ("[{}/{}] Reconnecting the node watcher..." , namespace , serviceName );
519
- watchJob (() -> watchNode (), numNodeFailures ++);
567
+ final long delayMillis = Backoff .ofDefault ().nextDelayMillis (numNodeFailures ++);
568
+ scheduleWatchNode (delayMillis );
520
569
}
521
570
522
571
@ Override
@@ -529,23 +578,81 @@ public void onClose() {
529
578
}
530
579
531
580
private void watchJob (Runnable job ) {
532
- watchJob (job , 0 );
581
+ final Runnable safeRunnable = safeRunnable (job );
582
+ worker .execute (safeRunnable );
583
+ }
584
+
585
+ private ScheduledFuture <?> scheduleJob (Runnable job , long delayMillis ) {
586
+ return worker .schedule (safeRunnable (job ), delayMillis , TimeUnit .MILLISECONDS );
587
+ }
588
+
589
+ private void scheduleWatchService (long delayMillis ) {
590
+ schedulerLock .lock ();
591
+ try {
592
+ final ScheduledFuture <?> serviceScheduledFuture = this .serviceScheduledFuture ;
593
+ if (serviceScheduledFuture != null ) {
594
+ serviceScheduledFuture .cancel (false );
595
+ }
596
+
597
+ if (delayMillis == 0 ) {
598
+ watchJob (this ::watchService );
599
+ return ;
600
+ }
601
+ this .serviceScheduledFuture = scheduleJob (this ::watchService , delayMillis );
602
+ } finally {
603
+ schedulerLock .unlock ();
604
+ }
605
+ }
606
+
607
+ private void scheduleWatchNode (long delayMillis ) {
608
+ schedulerLock .lock ();
609
+ try {
610
+ final ScheduledFuture <?> nodeScheduledFuture = this .nodeScheduledFuture ;
611
+ if (nodeScheduledFuture != null ) {
612
+ nodeScheduledFuture .cancel (false );
613
+ }
614
+ if (delayMillis == 0 ) {
615
+ watchJob (this ::watchNode );
616
+ return ;
617
+ }
618
+ this .nodeScheduledFuture = scheduleJob (this ::watchNode , delayMillis );
619
+ } finally {
620
+ schedulerLock .unlock ();
621
+ }
533
622
}
534
623
535
- private void watchJob (Runnable job , int numAttempts ) {
536
- final Runnable safeRunnable = () -> {
624
+ private void scheduleWatchPod (long delayMillis ) {
625
+ schedulerLock .lock ();
626
+ try {
627
+ final ScheduledFuture <?> podScheduledFuture = this .podScheduledFuture ;
628
+ if (podScheduledFuture != null ) {
629
+ podScheduledFuture .cancel (false );
630
+ }
631
+ if (delayMillis == 0 ) {
632
+ watchJob (this ::watchPod );
633
+ return ;
634
+ }
635
+ this .podScheduledFuture = scheduleJob (this ::watchPod , delayMillis );
636
+ } finally {
637
+ schedulerLock .unlock ();
638
+ }
639
+ }
640
+
641
+ private Runnable safeRunnable (Runnable job ) {
642
+ return () -> {
537
643
try {
538
644
job .run ();
539
645
} catch (Exception e ) {
540
646
logger .warn ("[{}/{}] Failed to run a watch job." , namespace , serviceName , e );
541
647
}
542
648
};
649
+ }
650
+
651
+ private static long delayMillis (int numAttempts ) {
543
652
if (numAttempts == 0 ) {
544
- worker .execute (safeRunnable );
545
- } else {
546
- worker .schedule (safeRunnable , Backoff .ofDefault ().nextDelayMillis (numAttempts ),
547
- TimeUnit .MILLISECONDS );
653
+ return 0 ;
548
654
}
655
+ return Backoff .ofDefault ().nextDelayMillis (numAttempts );
549
656
}
550
657
551
658
private void maybeUpdateEndpoints (boolean scheduledJob ) {
@@ -567,22 +674,22 @@ private void maybeUpdateEndpoints(boolean scheduledJob) {
567
674
return ;
568
675
}
569
676
570
- lock .lock ();
677
+ schedulerLock .lock ();
571
678
try {
572
679
if (scheduledJob ) {
573
- scheduledFuture = null ;
680
+ updateScheduledFuture = null ;
574
681
} else {
575
- if (scheduledFuture != null ) {
682
+ if (updateScheduledFuture != null ) {
576
683
// A scheduled job is already scheduled.
577
684
return ;
578
685
}
579
686
// Schedule a job to debounce the update of the endpoints.
580
- scheduledFuture = worker .schedule (() -> maybeUpdateEndpoints (true ),
581
- DEBOUNCE_MILLIS , TimeUnit .MILLISECONDS );
687
+ updateScheduledFuture = worker .schedule (() -> maybeUpdateEndpoints (true ),
688
+ DEBOUNCE_MILLIS , TimeUnit .MILLISECONDS );
582
689
return ;
583
690
}
584
691
} finally {
585
- lock .unlock ();
692
+ schedulerLock .unlock ();
586
693
}
587
694
588
695
assert nodePort != null ;
0 commit comments