diff --git a/janusgraph-core/src/test/java/org/janusgraph/diskstorage/util/backpressure/QueryBackPressureTest.java b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/util/backpressure/QueryBackPressureTest.java index 8bad620f29..3bffb8b68e 100644 --- a/janusgraph-core/src/test/java/org/janusgraph/diskstorage/util/backpressure/QueryBackPressureTest.java +++ b/janusgraph-core/src/test/java/org/janusgraph/diskstorage/util/backpressure/QueryBackPressureTest.java @@ -15,15 +15,22 @@ package org.janusgraph.diskstorage.util.backpressure; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; -import java.util.concurrent.CountDownLatch; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.janusgraph.util.system.ExecuteUtil.gracefulExecutorServiceShutdown; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +@Timeout(60000) public class QueryBackPressureTest { @Test @@ -38,7 +45,7 @@ public void testNoExtraPermitsAreAddedWithSemaphoreProtectedReleaseQueryBackPres for(int i=0;i<10;i++){ semaphore.releaseAfterQuery(); } - while (semaphore.availablePermits() != 5){ + while (semaphore.availablePermits() < 5){ Thread.sleep(10); } @@ -62,7 +69,7 @@ public void testExtraPermitsAreAddedWithSemaphoreQueryBackPressure() throws Inte for(int i=0;i<10;i++){ semaphore.releaseAfterQuery(); } - while (semaphore.availablePermits() != 10){ + while (semaphore.availablePermits() < 10){ Thread.sleep(10); } @@ -99,25 +106,35 @@ private void testSemaphoreBasedQueryBackPressureInMultipleThreads(int backPressu } assertEquals(0, availablePermits(backPressure)); - CountDownLatch acquiresCountDownLatch = new CountDownLatch(acquiresThreads); + List> aquireFutureList = new ArrayList<>(acquiresThreads); for(int i=0;i { + aquireFutureList.add(acquireExecutorService.submit(() -> { backPressure.acquireBeforeQuery(); - acquiresCountDownLatch.countDown(); - }); + return true; + })); } // Sleep for 2 seconds and check that no acquires are happened due to `backPressure` being exhausted Thread.sleep(2000); - assertEquals(acquiresThreads, acquiresCountDownLatch.getCount()); + aquireFutureList.forEach(booleanFuture -> assertFalse(booleanFuture.isDone())); + List> releaseFutureList = new ArrayList<>(acquiresThreads); for(int i=0;i { + backPressure.releaseAfterQuery(); + return true; + })); } - acquiresCountDownLatch.await(); + releaseFutureList.forEach(booleanFuture -> { + try { + booleanFuture.get(); + } catch (InterruptedException | ExecutionException e) { + fail("Couldn't release semaphore", e); + } + }); - gracefulExecutorServiceShutdown(acquireExecutorService, Long.MAX_VALUE); - gracefulExecutorServiceShutdown(releaseExecutorService, Long.MAX_VALUE); + gracefulExecutorServiceShutdown(acquireExecutorService, 10000); + gracefulExecutorServiceShutdown(releaseExecutorService, 10000); assertEquals(0, availablePermits(backPressure)); @@ -128,6 +145,8 @@ private void testSemaphoreBasedQueryBackPressureInMultipleThreads(int backPressu backPressure.close(); assertEquals(backPressureLimit, availablePermits(backPressure)); + + aquireFutureList.forEach(booleanFuture -> assertTrue(booleanFuture.isDone())); } private int availablePermits(QueryBackPressure backPressure){