Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Jan 26, 2024
1 parent 761c9bb commit 9bce536
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 63 deletions.
3 changes: 3 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ task integTest(type: RestIntegTestTask) {
group 'verification'
systemProperty 'tests.security.manager', 'false'
dependsOn test
testLogging {
showStandardStreams = true
}
}
check.dependsOn integTest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.Before;
import org.junit.Ignore;
import org.mockito.Mockito;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -20,6 +21,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.NodeDisconnectedException;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -31,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -86,20 +89,20 @@ public void setup() {
// thus the OpenSearchIntegTestCase.clusterService() will throw exception.
this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock"))
.thenReturn(false)
.thenReturn(true);
.thenReturn(false)
.thenReturn(true);
}

public void testSanity() throws Exception {
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand All @@ -108,9 +111,9 @@ public void testSanity() throws Exception {
assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals(
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
);
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
Expand All @@ -133,11 +136,11 @@ public void testSanityWithCustomLockID() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -167,11 +170,11 @@ public void testSecondAcquireLockFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -196,11 +199,11 @@ public void testAcquireLockWithLongIdFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -218,11 +221,11 @@ public void testLockReleasedAndAcquired() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -252,11 +255,11 @@ public void testLockExpired() throws Exception {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -316,11 +319,11 @@ public void testMultiThreadCreateLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -379,11 +382,11 @@ public void testMultiThreadAcquireLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand All @@ -400,16 +403,16 @@ public void testMultiThreadAcquireLock() throws Exception {
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLockWithId(
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
);
callableLatch.await(5L, TimeUnit.SECONDS);
return true;
Expand Down Expand Up @@ -446,11 +449,11 @@ public void testRenewLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -463,9 +466,9 @@ public void testRenewLock() throws Exception {
assertNotNull("Expected to successfully renew lock", renewedLock);
assertEquals("lock_time is expected to be the renewal time.", now, renewedLock.getLockTime());
assertEquals(
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
);
lockService.release(lock, ActionListener.wrap(released -> {
assertTrue("Failed to release lock.", released);
Expand All @@ -478,4 +481,38 @@ public void testRenewLock() throws Exception {
}, exception -> fail(exception.getMessage())));
latch.await(5L, TimeUnit.SECONDS);
}

private boolean retryReleaseLock(LockService lockService, LockModel lock) {
final int maxRetries = 3;
final long retryIntervalMillis = 1000; // 1 second retry interval

for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
System.out.println("The attempt is " + attempt);
AtomicBoolean released = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
lockService.release(lock, ActionListener.wrap(releaseResult -> {
released.set(releaseResult);
latch.countDown();
}, exception -> {
if (exception instanceof NodeDisconnectedException) {
// Log or handle the NodeDisconnectedException
System.out.println("NodeDisconnectedException occurred, retrying releaseLock...");
} else {
// Log or handle other exceptions if needed
latch.countDown();
}
}));
latch.await(); // Wait for release action to complete
if (released.get()) {
return true; // Lock released successfully
}
Thread.sleep(retryIntervalMillis); // Wait before retrying
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return false;
}
}

0 comments on commit 9bce536

Please sign in to comment.