Skip to content

Commit

Permalink
KAFKA-17779: Fix flaky RemoteLogManager test (apache#17724)
Browse files Browse the repository at this point in the history
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
  • Loading branch information
wperlichek authored Nov 10, 2024
1 parent 42ea29c commit 9c0fe85
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -1719,9 +1720,29 @@ void testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocal
FileRecords.TimestampAndOffset expectedRemoteResult = new FileRecords.TimestampAndOffset(timestamp + 999, 999, Optional.of(Integer.MAX_VALUE));
Partition mockFollowerPartition = mockPartition(tpId);

LogSegment logSegment = mockLogSegment(50L, timestamp, null);
LogSegment logSegment1 = mockLogSegment(100L, timestamp + 1, expectedLocalResult);
when(mockLog.logSegments()).thenReturn(Arrays.asList(logSegment, logSegment1));
LogSegment logSegmentBaseOffset50 = mockLogSegment(50L, timestamp, null);
LogSegment logSegmentBaseOffset100 = mockLogSegment(100L, timestamp + 1, expectedLocalResult);
LogSegment logSegmentBaseOffset101 = mockLogSegment(101L, timestamp + 1, expectedLocalResult);

// Constants representing the states of local log segments
final int twoSegmentsBaseOffsets50and100 = 0;
final int oneSegmentBaseOffset100 = 1;
final int oneSegmentBaseOffset101 = 2;

AtomicInteger localLogOffsetState = new AtomicInteger(twoSegmentsBaseOffsets50and100);

when(mockLog.logSegments()).thenAnswer(invocation -> {
if (localLogOffsetState.get() == twoSegmentsBaseOffsets50and100) {
return Arrays.asList(logSegmentBaseOffset50, logSegmentBaseOffset100);
} else if (localLogOffsetState.get() == oneSegmentBaseOffset100) {
return Collections.singletonList(logSegmentBaseOffset100);
} else if (localLogOffsetState.get() == oneSegmentBaseOffset101) {
return Collections.singletonList(logSegmentBaseOffset101);
} else {
throw new IllegalStateException("Unexpected localLogOffsetState");
}
});

when(mockLog.logEndOffset()).thenReturn(300L);
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
partition -> Optional.of(mockLog),
Expand All @@ -1747,12 +1768,12 @@ Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadat

// Move the local-log start offset to 100L, still the read from the remote storage should be short-circuited
// as the message with (timestamp + 1) exists in the local log
when(mockLog.logSegments()).thenReturn(Collections.singletonList(logSegment1));
localLogOffsetState.set(oneSegmentBaseOffset100);
assertEquals(Optional.of(expectedLocalResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache));

// Move the local log start offset to 101L, now message with (timestamp + 1) does not exist in the local log and
// the indexes needs to be fetched from the remote storage
when(logSegment1.baseOffset()).thenReturn(101L);
localLogOffsetState.set(oneSegmentBaseOffset101);
assertEquals(Optional.of(expectedRemoteResult), remoteLogManager.findOffsetByTimestamp(tp, timestamp + 1, 0L, cache));
}

Expand Down

0 comments on commit 9c0fe85

Please sign in to comment.