diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java index 15923a10..71547f83 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -108,6 +108,7 @@ public > Optional> process(final K sourceName getOffset(matcher.get(), sourceName.toString()).ifPresent(ctx::setOffset); return Optional.of(ctx); } + LOGGER.debug("{} did not match pattern and was skipped for processing.", sourceName); return Optional.empty(); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 5bf799d3..70eb10b3 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -115,6 +115,10 @@ public void setContext(final Context context) { */ public SourceRecord getSourceRecord(final ErrorsTolerance tolerance) { try { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Source Record: {} for Topic: {} , Partition: {}, recordCount: {}", getObjectKey(), + getTopic(), getPartition(), getRecordCount()); + } return new SourceRecord(offsetManagerEntry.getManagerKey().getPartitionMap(), offsetManagerEntry.getProperties(), getTopic(), getPartition(), keyData.schema(), keyData.value(), valueData.schema(), valueData.value());