Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Jan 29, 2024
1 parent 761c9bb commit 84b8cf6
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 70 deletions.
3 changes: 3 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ task integTest(type: RestIntegTestTask) {
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
testLogging {
showStandardStreams = true
}
}
check.dependsOn integTest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -20,6 +21,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.NodeDisconnectedException;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -31,8 +33,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

public class LockServiceIT extends OpenSearchIntegTestCase {

Expand Down Expand Up @@ -86,20 +90,20 @@ public void setup() {
// thus the OpenSearchIntegTestCase.clusterService() will throw exception.
this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock"))
.thenReturn(false)
.thenReturn(true);
.thenReturn(false)
.thenReturn(true);
}

public void testSanity() throws Exception {
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand All @@ -108,9 +112,9 @@ public void testSanity() throws Exception {
assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals(
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
);
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
Expand All @@ -133,11 +137,11 @@ public void testSanityWithCustomLockID() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -167,11 +171,11 @@ public void testSecondAcquireLockFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -196,11 +200,11 @@ public void testAcquireLockWithLongIdFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -218,11 +222,11 @@ public void testLockReleasedAndAcquired() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -252,11 +256,11 @@ public void testLockExpired() throws Exception {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -316,11 +320,11 @@ public void testMultiThreadCreateLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -379,11 +383,11 @@ public void testMultiThreadAcquireLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand All @@ -400,16 +404,16 @@ public void testMultiThreadAcquireLock() throws Exception {
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLockWithId(
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
);
callableLatch.await(5L, TimeUnit.SECONDS);
return true;
Expand Down Expand Up @@ -446,11 +450,11 @@ public void testRenewLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -463,17 +467,34 @@ public void testRenewLock() throws Exception {
assertNotNull("Expected to successfully renew lock", renewedLock);
assertEquals("lock_time is expected to be the renewal time.", now, renewedLock.getLockTime());
assertEquals(
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
);
lockService.release(lock, ActionListener.wrap(released -> {
assertTrue("Failed to release lock.", released);
lockService.deleteLock(lock.getLockId(), ActionListener.wrap(deleted -> {
assertTrue("Failed to delete lock.", deleted);
latch.countDown();
}, exception -> fail(exception.getMessage())));
}, exception -> fail(exception.getMessage())));
AtomicBoolean released = new AtomicBoolean(false);
IntStream.range(0, 3).anyMatch(retry -> {
try {
CountDownLatch releaseLatch = new CountDownLatch(1);
lockService.release(lock, ActionListener.wrap(release -> {
released.set(release);
releaseLatch.countDown();
}, exception -> {
// Handle NodeDisconnectedException by retrying
if (exception instanceof NodeDisconnectedException && retry < 2) {
System.out.println("NodeDisconnectedException occurred, retrying releaseLock...");
} else {
releaseLatch.countDown();
}
}));
releaseLatch.await();
return released.get(); // Exit loop if lock released successfully
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return true; // Exit loop if interrupted
}
});
assertTrue("Failed to release lock.", released.get());
latch.countDown();
}, exception -> fail(exception.getMessage())));
}, exception -> fail(exception.getMessage())));
latch.await(5L, TimeUnit.SECONDS);
Expand Down

0 comments on commit 84b8cf6

Please sign in to comment.