Skip to content

Commit

Permalink
Fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 9, 2025
1 parent a7d4e9b commit 6c437ae
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 196 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@

package io.aiven.kafka.connect.common.source.input.utils;

import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.NUMBER_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PARTITION_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.START_OFFSET_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TIMESTAMP_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_NAMED_GROUP_REGEX_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.TOPIC_PATTERN;

import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;
Expand All @@ -32,6 +24,20 @@

public final class FilePatternUtils {

public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String ANY_FILENAME_PATTERN = ".*$";

private FilePatternUtils() {
// hidden
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.task;

import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PATTERN_PARTITION_KEY;
import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.task;

import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.PATTERN_PARTITION_KEY;
import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
Expand All @@ -34,8 +33,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -49,6 +46,8 @@

import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
Expand All @@ -58,7 +57,6 @@

import org.apache.avro.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
Expand All @@ -76,28 +74,16 @@ class AwsIntegrationTest implements IntegrationBase {
@Container
public static final LocalStackContainer LOCALSTACK = IntegrationBase.createS3Container();

private static String s3Prefix;

private S3Client s3Client;
private String s3Endpoint;

private BucketAccessor testBucketAccessor;

@Override
public String getS3Prefix() {
return s3Prefix;
}

@Override
public S3Client getS3Client() {
return s3Client;
}

@BeforeAll
static void setUpAll() {
s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/";
}

@BeforeEach
void setupAWS() {
s3Client = IntegrationBase.createS3Client(LOCALSTACK);
Expand All @@ -118,7 +104,6 @@ private Map<String, String> getConfig(final String topics, final int maxTasks) {
config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY);
config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint);
config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME);
config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix());
config.put(TARGET_TOPIC_PARTITIONS, "0,1");
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
Expand Down Expand Up @@ -146,14 +131,14 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {
final List<String> offsetKeys = new ArrayList<>();
final List<String> expectedKeys = new ArrayList<>();
// write 2 objects to s3
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000"));
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001"));
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0"));
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1"));

// we don't expext the empty one.
offsetKeys.addAll(expectedKeys);
offsetKeys.add(writeToS3(topicName, new byte[0], "00003"));
offsetKeys.add(writeToS3(topicName, new byte[0], "3"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

Expand All @@ -168,7 +153,8 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());

final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient);
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1),
FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0);

final HashSet<String> seenKeys = new HashSet<>();
while (sourceRecordIterator.hasNext()) {
Expand All @@ -183,8 +169,10 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {
@Test
void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final int maxTasks = 1;
final int taskId = 0;

final Map<String, String> configData = getConfig(topicName, 1);
final Map<String, String> configData = getConfig(topicName, maxTasks);

configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue());
configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter");
Expand All @@ -211,12 +199,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {

final Set<String> offsetKeys = new HashSet<>();

offsetKeys.add(writeToS3(topicName, outputStream1, "00001"));
offsetKeys.add(writeToS3(topicName, outputStream2, "00001"));
offsetKeys.add(writeToS3(topicName, outputStream1, "1"));
offsetKeys.add(writeToS3(topicName, outputStream2, "1"));

offsetKeys.add(writeToS3(topicName, outputStream3, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream4, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream5, "00002"));
offsetKeys.add(writeToS3(topicName, outputStream3, "2"));
offsetKeys.add(writeToS3(topicName, outputStream4, "2"));
offsetKeys.add(writeToS3(topicName, outputStream5, "2"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

Expand All @@ -231,7 +219,9 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());

final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient);
TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient,
new HashDistributionStrategy(maxTasks),
FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId);

final HashSet<String> seenKeys = new HashSet<>();
final Map<String, List<Long>> seenRecords = new HashMap<>();
Expand Down Expand Up @@ -275,9 +265,9 @@ void verifyIteratorRehydration(final TestInfo testInfo) {
final List<String> actualKeys = new ArrayList<>();

// write 2 objects to s3
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000")
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0")
.substring((OBJECT_KEY + SEPARATOR).length()));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000")
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0")
.substring((OBJECT_KEY + SEPARATOR).length()));

assertThat(testBucketAccessor.listObjects()).hasSize(2);
Expand All @@ -296,7 +286,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) {
assertThat(actualKeys).containsAll(expectedKeys);

// write 3rd object to s3
expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "00000")
expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "0")
.substring((OBJECT_KEY + SEPARATOR).length()));
assertThat(testBucketAccessor.listObjects()).hasSize(3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final

S3Client getS3Client();

String getS3Prefix();

/**
* Write file to s3 with the specified key and data.
*
Expand Down Expand Up @@ -134,8 +132,7 @@ default void writeToS3WithKey(final String objectKey, final byte[] testDataBytes
* {@link io.aiven.kafka.connect.s3.source.utils.OffsetManager#SEPARATOR}
*/
default String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName
+ "-" + partitionId + "-" + System.currentTimeMillis() + ".txt";
final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt";
writeToS3WithKey(objectKey, testDataBytes);
return OBJECT_KEY + SEPARATOR + objectKey;
}
Expand Down
Loading

0 comments on commit 6c437ae

Please sign in to comment.