Skip to content

Commit

Permalink
Fix up the outstanding SourceRecordIterator
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 22, 2025
1 parent 5e004d6 commit 54851fe
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.aiven.kafka.connect.common.source;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -111,6 +112,17 @@ public void updateCurrentOffsets(final E entry) {
});
}

/**
* Gets any offset information stored in the offsetStorageReader and adds to the local offsets Map. This provides a
* performance improvement over when checking if offsets exists individually.
*
* @param partitionMaps
* A Collection of partition maps which identify individual offset entries
*/
public void hydrateOffsetManager(final Collection<Map<String, Object>> partitionMaps) {
offsets.putAll(context.offsetStorageReader().offsets(partitionMaps));
}

/**
* Removes the specified entry from the in memory table. Does not impact the records stored in the
* {@link SourceTaskContext}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
package io.aiven.kafka.connect.common.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -74,6 +79,56 @@ void testNewEntryWithoutDataFromContext() {
assertThat(result).isNotPresent();
}

@Test
void testHydrateOffsetManagerWithPartitionMapsNoExistingEntries() {
final List<Map<String, Object>> partitionMaps = new ArrayList<>();

for (int i = 0; i < 10; i++) {
final Map<String, Object> partitionKey = new HashMap<>(); // NOPMD avoid instantiating objects in loops.
partitionKey.put("segment1", "topic" + i);
partitionKey.put("segment2", String.valueOf(i));
partitionKey.put("segment3", "something else" + i);
partitionMaps.add(partitionKey);
}

when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(Map.of());

offsetManager.hydrateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps));

// No Existing entries so we expect nothing to exist and for it to try check the offsets again.
final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(() -> partitionMaps.get(0),
TestingOffsetManagerEntry::new);
assertThat(result).isEmpty();
verify(offsetStorageReader, times(1)).offset(eq(partitionMaps.get(0)));

}

@Test
void testHydrateOffsetManagerWithPartitionMapsAllEntriesExist() {
final List<Map<String, Object>> partitionMaps = new ArrayList<>();
final Map<Map<String, Object>, Map<String, Object>> offsetReaderMap = new HashMap<>();

for (int i = 0; i < 10; i++) {
final Map<String, Object> partitionKey = Map.of("segment1", "topic" + 1, "segment2", String.valueOf(i),
"segment3", "somethingelse" + i); // NOPMD avoid instantiating objects in loops.
partitionMaps.add(partitionKey);
offsetReaderMap.put(partitionKey, Map.of("recordCount", (long) i));
}

when(offsetStorageReader.offsets(eq(partitionMaps))).thenReturn(offsetReaderMap);

offsetManager.hydrateOffsetManager(partitionMaps);
verify(offsetStorageReader, times(1)).offsets(eq(partitionMaps));

// No Existing entries so we expect nothing to exist and for it to try check the offsets again.
final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(() -> partitionMaps.get(0),
TestingOffsetManagerEntry::new);
assertThat(result).isPresent();
verify(offsetStorageReader, times(0)).offset(eq(partitionMaps.get(0)));

}

@SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails.
public static class TestingOffsetManagerEntry // NOPMD the above suppress warnings does not work.
implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -59,6 +60,7 @@
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;

import org.apache.avro.Schema;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -68,9 +70,9 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Object;

@Testcontainers
@SuppressWarnings("PMD.ExcessiveImports")
class AwsIntegrationTest implements IntegrationBase {

private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-AWS-test-";
Expand Down Expand Up @@ -274,6 +276,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) {
// create 2 files.
final var topic = IntegrationBase.getTopic(testInfo);
final Map<String, String> configData = getConfig(topic, 1);
configData.put(TASK_ID, "0");

configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());

Expand All @@ -293,26 +296,52 @@ void verifyIteratorRehydration(final TestInfo testInfo) {

final S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig);
final Iterator<S3Object> iter = sourceClient.getS3ObjectIterator(null);

assertThat(iter).hasNext();
S3Object object = iter.next();
actualKeys.add(object.key());
assertThat(iter).hasNext();
object = iter.next();
actualKeys.add(object.key());
assertThat(iter).isExhausted();
final Iterator<S3SourceRecord> iterator = new SourceRecordIterator(s3SourceConfig, createOffsetManager(),
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient);

assertThat(iterator).hasNext();
S3SourceRecord s3SourceRecord = iterator.next();
actualKeys.add(s3SourceRecord.getObjectKey());
assertThat(iterator).hasNext();
s3SourceRecord = iterator.next();
actualKeys.add(s3SourceRecord.getObjectKey());
assertThat(iterator).isExhausted();
assertThat(actualKeys).containsAll(expectedKeys);

// write 3rd object to s3
expectedKeys.add(writeToS3(topic, testData3.getBytes(StandardCharsets.UTF_8), "00000"));
assertThat(testBucketAccessor.listObjects()).hasSize(3);

assertThat(iter).hasNext();
object = iter.next();
actualKeys.add(object.key());
assertThat(iter).isExhausted();
assertThat(iterator).hasNext();
s3SourceRecord = iterator.next();
actualKeys.add(s3SourceRecord.getObjectKey());
assertThat(iterator).isExhausted();
assertThat(actualKeys).containsAll(expectedKeys);

}

private static @NotNull OffsetManager<S3OffsetManagerEntry> createOffsetManager() {
return new OffsetManager<>(new SourceTaskContext() {
@Override
public Map<String, String> configs() {
return Map.of();
}

@Override
public OffsetStorageReader offsetStorageReader() {
return new OffsetStorageReader() {
@Override
public <T> Map<String, Object> offset(final Map<String, T> map) {
return Map.of();
}

@Override
public <T> Map<Map<String, T>, Map<String, Object>> offsets(
final Collection<Map<String, T>> collection) {
return Map.of();
}
};
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig) {
* the beginning key, or {@code null} to start at the beginning.
* @return a Stream of S3Objects for the current state of the S3 storage.
*/
Stream<S3Object> getS3ObjectStream(final String startToken) {
public Stream<S3Object> getS3ObjectStream(final String startToken) {
final ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucketName)
.maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
Expand All @@ -98,19 +98,6 @@ Stream<S3Object> getS3ObjectStream(final String startToken) {
}).flatMap(response -> response.contents().stream().filter(filterPredicate));
}

/**
* Creates an S3Object iterator that will return the objects from the current objects in S3 storage and then try to
* refresh on every {@code hasNext()} that returns false. This should pick up new files as they are dropped on the
* file system.
*
* @param startToken
* the beginning key, or {@code null} to start at the beginning.
* @return an Iterator on the S3Objects.
*/
public Iterator<S3Object> getS3ObjectIterator(final String startToken) {
return new S3ObjectIterator(startToken);
}

/**
* Gets an iterator of keys from the current S3 storage.
*
Expand All @@ -136,39 +123,4 @@ public void addPredicate(final Predicate<S3Object> objectPredicate) {
this.filterPredicate = this.filterPredicate.and(objectPredicate);
}

/**
* An iterator that reads from
*/
public class S3ObjectIterator implements Iterator<S3Object> {

/** The current iterator. */
private Iterator<S3Object> inner;
/** The last object key that was seen. */
private String lastSeenObjectKey;

private S3ObjectIterator(final String initialKey) {
lastSeenObjectKey = initialKey;
inner = getS3ObjectStream(lastSeenObjectKey).iterator();
}
@Override
public boolean hasNext() {
if (!inner.hasNext()) {
inner = getS3ObjectStream(lastSeenObjectKey).iterator();
}
return inner.hasNext();
}

@Override
public S3Object next() {
final S3Object result = inner.next();
lastSeenObjectKey = result.key();
return result;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private final Transformer transformer;
/** The AWS client that provides the S3Objects */
private final AWSV2SourceClient sourceClient;

/** the taskId of this running task */
private int taskId;

/** The S3 bucket we are processing */
Expand All @@ -61,13 +61,19 @@ public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private Iterator<S3SourceRecord> inner;
/** The outer iterator that provides S3SourceRecords */
private Iterator<S3SourceRecord> outer;

/** The topic(s) which have been configured with the 'topics' configuration */
private final Optional<String> targetTopics;

/** Check if the S3 Object Key is part of the 'target' files configured to be extracted from S3 */
final FileMatching fileMatching;

/** The predicate which will determine if an S3Object should be assigned to this task for processing */
final Predicate<Optional<S3SourceRecord>> taskAssignment;
/** The utility to extract the context from the S3 Object Key */
private FilePatternUtils filePattern;
/**
* The object key which is currently being processed, when rehydrating from S3 we will seek S3 Objects after this
* key
*/
private String lastSeenObjectKey;

private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);
Expand Down
Loading

0 comments on commit 54851fe

Please sign in to comment.