Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 27, 2025
1 parent cb84ae7 commit a1561ba
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,8 @@ public class OffsetManager<E extends OffsetManager.OffsetManagerEntry<E>> {
* the context for this instance to use.
*/
public OffsetManager(final SourceTaskContext context) {
this(context, new ConcurrentHashMap<>());
}

/**
* Package private for testing.
*
* @param context
* the context for this instance to use.
* @param offsets
* the offsets
*/
protected OffsetManager(final SourceTaskContext context,
final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets) {
this.context = context;
this.offsets = offsets;
this.offsets = new ConcurrentHashMap<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ static List<String> consumeByteMessages(final String topic, final int expectedMe
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(90),
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(120),
consumerProperties);
return objects.stream().map(String::new).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void bytesTest(final boolean addPrefix) {
addPrefix, localS3Prefix, prefixPattern, fileNamePatternSeparator);

connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);


final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";
Expand All @@ -193,10 +193,13 @@ void bytesTest(final boolean addPrefix) {
// write 5 objects to s3
offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "0", localS3Prefix));
offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00000", localS3Prefix));
offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "1", localS3Prefix));
offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "00001", localS3Prefix));
offsetKeys.add(writeToS3(topic, testData1.getBytes(StandardCharsets.UTF_8), "00001", localS3Prefix));
offsetKeys.add(writeToS3(topic, testData2.getBytes(StandardCharsets.UTF_8), "1", localS3Prefix));
offsetKeys.add(writeToS3(topic, new byte[0], "3"));

//Start the Connector
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);

assertThat(testBucketAccessor.listObjects()).hasSize(5);
// Poll messages from the Kafka topic and verify the consumed data
final List<String> records = IntegrationBase.consumeByteMessages(topic, 4, connectRunner.getBootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public void setValueData(final SchemaAndValue valueData) {
}

public String getTopic() {
return context.getTopic().isPresent() ? context.getTopic().get() : null;
return context.getTopic().orElse(null);
}

public Integer getPartition() {
return context.getPartition().isPresent() ? context.getPartition().get() : null;
return context.getPartition().orElse(null);
}

public String getObjectKey() {
Expand Down

0 comments on commit a1561ba

Please sign in to comment.