Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Jan 29, 2024
1 parent 761c9bb commit 07530ea
Showing 1 changed file with 95 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.NodeDisconnectedException;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -86,20 +88,20 @@ public void setup() {
// thus the OpenSearchIntegTestCase.clusterService() will throw exception.
this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock"))
.thenReturn(false)
.thenReturn(true);
.thenReturn(false)
.thenReturn(true);
}

public void testSanity() throws Exception {
String uniqSuffix = "_sanity";
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand All @@ -108,9 +110,9 @@ public void testSanity() throws Exception {
assertEquals("job_id does not match.", JOB_ID + uniqSuffix, lock.getJobId());
assertEquals("job_index_name does not match.", JOB_INDEX_NAME + uniqSuffix, lock.getJobIndexName());
assertEquals(
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
"lock_id does not match.",
LockModel.generateLockId(JOB_INDEX_NAME + uniqSuffix, JOB_ID + uniqSuffix),
lock.getLockId()
);
assertEquals("lock_duration_seconds does not match.", LOCK_DURATION_SECONDS, lock.getLockDurationSeconds());
assertEquals("lock_time does not match.", testTime.getEpochSecond(), lock.getLockTime().getEpochSecond());
Expand All @@ -133,11 +135,11 @@ public void testSanityWithCustomLockID() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -167,11 +169,11 @@ public void testSecondAcquireLockFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -196,11 +198,11 @@ public void testAcquireLockWithLongIdFail() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -218,11 +220,11 @@ public void testLockReleasedAndAcquired() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -252,11 +254,11 @@ public void testLockExpired() throws Exception {
// Set lock time in the past.
lockService.setTime(Instant.now().minus(Duration.ofSeconds(LOCK_DURATION_SECONDS + LOCK_DURATION_SECONDS)));
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -316,11 +318,11 @@ public void testMultiThreadCreateLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -379,11 +381,11 @@ public void testMultiThreadAcquireLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand All @@ -400,16 +402,16 @@ public void testMultiThreadAcquireLock() throws Exception {
Callable<Boolean> callable = () -> {
CountDownLatch callableLatch = new CountDownLatch(1);
lockService.acquireLockWithId(
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
context.getJobIndexName(),
LOCK_DURATION_SECONDS,
lockID,
ActionListener.wrap(lock -> {
if (lock != null) {
lockModelAtomicReference.set(lock);
Integer test = multiThreadAcquireLockCounter.getAndAdd(1);
}
callableLatch.countDown();
}, exception -> fail(exception.getMessage()))
);
callableLatch.await(5L, TimeUnit.SECONDS);
return true;
Expand Down Expand Up @@ -446,11 +448,11 @@ public void testRenewLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
LockService lockService = new LockService(client(), this.clusterService);
final JobExecutionContext context = new JobExecutionContext(
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
Instant.now(),
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand All @@ -463,17 +465,40 @@ public void testRenewLock() throws Exception {
assertNotNull("Expected to successfully renew lock", renewedLock);
assertEquals("lock_time is expected to be the renewal time.", now, renewedLock.getLockTime());
assertEquals(
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
"lock_duration is expected to be unchanged.",
lock.getLockDurationSeconds(),
renewedLock.getLockDurationSeconds()
);
lockService.release(lock, ActionListener.wrap(released -> {
assertTrue("Failed to release lock.", released);
lockService.deleteLock(lock.getLockId(), ActionListener.wrap(deleted -> {
assertTrue("Failed to delete lock.", deleted);
latch.countDown();
}, exception -> fail(exception.getMessage())));
}, exception -> fail(exception.getMessage())));

// Retry logic for releasing the lock
int maxRetries = 3;
AtomicInteger retryCount = new AtomicInteger(0);
AtomicBoolean released = new AtomicBoolean(false);
while (retryCount.get() < maxRetries && !released.get()) {
try {
lockService.release(lock, ActionListener.wrap(releaseResponse -> {
if (releaseResponse) {
released.set(true);
latch.countDown();
}
}, exception -> {
if (exception instanceof NodeDisconnectedException && retryCount.incrementAndGet() < maxRetries) {
System.out.println("Failed to release lock. Retrying...");
try {
Thread.sleep(1000); // 1 second delay between retry attempts
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
fail(exception.getMessage());
}
}));
} catch (Exception e) {
fail(e.getMessage());
}
}

assertTrue("Failed to release lock.", released.get()); // Assert release status
}, exception -> fail(exception.getMessage())));
}, exception -> fail(exception.getMessage())));
latch.await(5L, TimeUnit.SECONDS);
Expand Down

0 comments on commit 07530ea

Please sign in to comment.