Skip to content

Commit

Permalink
Remove commented code
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 7, 2025
1 parent 9edeb8a commit 0861d4c
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Aiven Oy
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
private int maxTasks;
private final Pattern filePattern;
private Pattern filePattern;
public HashObjectDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
this.maxTasks = maxTasks;
this.filePattern = createPattern(expectedSourceNameFormat);
configureDistributionStrategy(maxTasks, expectedSourceNameFormat);
}

@Override
Expand All @@ -51,7 +50,7 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
setMaxTasks(maxTasks);
this.maxTasks = maxTasks;
}

@Override
Expand All @@ -62,4 +61,9 @@ public Pattern getFilePattern() {
public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,15 @@ public interface ObjectDistributionStrategy {
String NUMBER_REGEX_PATTERN = "(?:\\d+)";
// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.

String PATTERN_PARTITION_KEY = "partition";
String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";

String PATTERN_TOPIC_KEY = "topic";

String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";

String START_OFFSET_PATTERN = "{{start_offset}}";
String TIMESTAMP_PATTERN = "{{timestamp}}";

String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/";

String ANY_FILENAME_PATTERN = ".*$";

/**
Expand Down Expand Up @@ -118,7 +111,10 @@ default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, fi

Pattern getFilePattern();

default Pattern createPattern(final String expectedSourceNameFormat) {
/**
* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ public final class PartitionInFilenameDistributionStrategy implements ObjectDist
private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class);

public static final String PARTITION = "partition";
private final Pattern filePattern;

private final int maxTasks;
private Pattern filePattern;
private int maxTasks;

@Override
public Pattern getFilePattern() {
Expand All @@ -45,7 +44,7 @@ public Pattern getFilePattern() {

public PartitionInFilenameDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
this.maxTasks = maxTasks;
this.filePattern = createPattern(expectedSourceNameFormat);
this.filePattern = configurePattern(expectedSourceNameFormat);
}

/**
Expand Down Expand Up @@ -79,6 +78,7 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
*/
@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
createPattern(expectedSourceNameFormat);
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@
*/
public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class);

private String s3Prefix;
private Pattern filePattern;
private int maxTasks;

public PartitionInPathDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
this.maxTasks = maxTasks;
this.filePattern = createPattern(expectedPathFormat);
s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN);
configureDistributionStrategy(maxTasks, expectedPathFormat);
}

@Override
Expand Down Expand Up @@ -93,22 +90,17 @@ public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) {
*/
@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
this.maxTasks = maxTasks;
this.filePattern = createPattern(expectedPathFormat);
s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN);
configureDistributionStrategy(maxTasks, expectedPathFormat);
}

@Override
public Pattern getFilePattern() {
return filePattern;
}

// private void configureDistributionStrategy(final String expectedPathFormat) {
// if (StringUtils.isEmpty(expectedPathFormat) || !expectedPathFormat.contains(PARTITION_PATTERN)) {
// throw new ConfigException(String.format(
// "Expected path format %s is missing the identifier '%s' to correctly select the partition",
// expectedPathFormat, PARTITION_PATTERN));
// }
// s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN);
// }
private void configureDistributionStrategy(final int maxTasks, final String expectedPathFormat) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedPathFormat);
s3Prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_PATTERN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ final class IntegrationTest implements IntegrationBase {
private static final String TEST_BUCKET_NAME = "test-bucket0";

private static String s3Endpoint;
// private String s3Prefix;
private static BucketAccessor testBucketAccessor;

@Container
Expand All @@ -117,8 +116,6 @@ final class IntegrationTest implements IntegrationBase {

@BeforeAll
static void setUpAll() throws IOException, InterruptedException {
// s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/";

s3Client = IntegrationBase.createS3Client(LOCALSTACK);
s3Endpoint = LOCALSTACK.getEndpoint().toString();
testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,6 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

// public void setFilterPredicate(final Predicate<S3Object> predicate) {
// filterPredicate = predicate;
// }
//
// private boolean assignObjectToTask(final String objectKey) {
// final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString());
// final int taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
// final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
// return taskAssignment == taskId;
// }

public void shutdown() {
s3Client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@
*/
public final class SourceRecordIterator implements Iterator<S3SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);

// TODO this path/config is yet to be defined.
// public static final String FILE_PATH_DEFAULT = "/PREFIX/{{partition}}/YYYY/MM/DD/mm/}}";
//
// public static final String FILE_DEFAULT_PATTERN_STR = "(?<topicName>[^/]+?)-" + "(?<partition>\\d)-"
// + "(?<uniqueId>[a-zA-Z0-9]+)" + "\\.(?<fileExtension>[^.]+)$";
//
// public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile(FILE_DEFAULT_PATTERN_STR); // topic-00001.txt

private final Pattern filePattern;
public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L;
private String currentObjectKey;
Expand Down

0 comments on commit 0861d4c

Please sign in to comment.