diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java index a893fe88..3f2f5b72 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java @@ -41,7 +41,9 @@ void checkTopicDistribution(final String expectedSourceFormat, final String sour } @ParameterizedTest - @CsvSource({ "{{topic}}-{{partition}}-{{start_offset}}.txt, logs2-1-0001.txt, logs2,1,0001", + @CsvSource({ "{{topic}}-{{partition}}-{{start_offset}}.txt, logs2-1-1-0001.txt, logs2-1,1,0001", + "{{topic}}-{{start_offset}}-{{partition}}.txt, logs2_1-0001-1.txt, logs2_1,0001,1", + "{{topic}}-{{start_offset}}-{{partition}}.txt, logs2.1-0001-00000001.txt, logs2.1,0001,1", "{{topic}}-{{start_offset}}-{{partition}}.txt, logs2-0001-1.txt, logs2,0001,1", "{{topic}}-{{start_offset}}-{{partition}}.txt, logs2-99999-1.txt, logs2,1,99999", "{{partition}}-{{start_offset}}-{{topic}}.txt, logs2-1-logs2.txt, logs2,2,0001", @@ -49,6 +51,9 @@ void checkTopicDistribution(final String expectedSourceFormat, final String sour "{{topic}}-{{start_offset}}-{{partition}}.txt, logs2-99999-0001.txt, logs2,1,99999", "{{partition}}-{{start_offset}}-{{topic}}.txt, logs0002-01-logs2.txt, logs2,2,0001", "{{partition}}-{{start_offset}}-{{topic}}.txt, logs002-1-logs2.txt, logs2,2,0001", + "topic/{{topic}}/partition/{{partition}}/startOffset/{{start_offset}}," + + " topic/logs0002-12_2.topic/partition/000001/startOffset/432342322/file.txt, logs0002-12_2.topic,1,432342322", + "{{topic}}/{{partition}}/{{start_offset}}, logs0002/01/4323422/file.txt, logs0002,1,4323422", "{{partition}}-{{start_offset}}-{{topic}}.txt, logs002-9223372036854775807-logs2.txt, logs2,2,9223372036854775807", "{{partition}}-{{start_offset}}-{{topic}}.txt, logs002-8273685692-logs2.txt, logs2,2,8273685692" }) void checkTopicDistribution(final String expectedSourceFormat, final String sourceName, final String expectedTopic, diff --git a/s3-source-connector/README.md b/s3-source-connector/README.md index a5952008..7860b698 100644 --- a/s3-source-connector/README.md +++ b/s3-source-connector/README.md @@ -55,19 +55,30 @@ It is also important to specify `aws.sts.role.external.id` for the security reas ### File name format -> File name format is tightly related to [Record Grouping](#record-grouping) - The connector uses the following format for input files (blobs): ``. ``is the optional prefix that can be used, for example, for subdirectories in the bucket. -`` is the file name. The connector has a fixed +`` is the file name. The connector has the configurable template for file names. - Fixed template for file : `{{topic}}-{{partition}}-{{start_offset}}` + Configuration property `file.name.template` is a mandatory config. If not set, objects would not be processed. +Example templates are mentioned below. + +It supports placeholders with variable names: +`{{ variable_name }}`. Currently, supported variables are: +- `topic` - the Kafka topic; +- `partition` - the Kafka partition; +- `start_offset` - the Kafka offset of the first record in the file; +- `timestamp` - the timestamp of when the Kafka record has been processed by the connector. -Example object name : customertopic-00001-1734445664111.txt +Example object names : +- {{topic}}-{{partition}}-{{start_offset}} customer-topic-1-1734445664111.txt +- {{topic}}/{{partition}}/{{start_offset}} customer-topic/1/1734445664111.txt +- topic/{{topic}}/partition/{{partition}}/startOffset/{{start_offset}} topic/customer-topic/partition/1/startOffset/1734445664111.txt + +{{topic}} is the destination kafka topic where data is produced to. This will be used if `topics` config is not defined. ## Data Format @@ -240,11 +251,6 @@ input.type=jsonl # See https://kafka.apache.org/documentation/#connect_configuring topics=topic1,topic2 -# A comma-separated list of topic partitions where the connector's offset storage reader -# can read the stored offsets for those partitions. If not mentioned, s3 objects will be read again if -# available in the bucket -topic.partitions=1,2,3 - ### Connector-specific configuration ### Fill in you values # AWS Access Key ID