diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 932923e8..442993bf 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -161,20 +161,23 @@ static List consumeMessages(final String topic, final int expectedMess final List recordValues = new ArrayList<>(); await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { - assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount); + assertThat(consumeRecordsInProgress(consumer, recordValues)).hasSize(expectedMessageCount); }); return recordValues; } } - private static List assertAllRecordsConsumed(KafkaConsumer consumer, List recordValues) { - int recordsRetrieved = 0; + private static List consumeRecordsInProgress(KafkaConsumer consumer, List recordValues) { + int recordsRetrieved; do { final ConsumerRecords records = consumer.poll(Duration.ofMillis(500L)); recordsRetrieved = records.count(); for (final ConsumerRecord record : records) { recordValues.add(record.value()); } + // Choosing 10 records as it allows for integration tests with a smaller max poll to be added + // while maintaining efficiency, a slightly larger number could be added but this is slightly more efficient + // than larger numbers. } while (recordsRetrieved > 10); return recordValues; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 1991b356..5a573395 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -230,12 +230,10 @@ void avroTest(final TestInfo testInfo) throws IOException { assertThat(testBucketAccessor.listObjects()).hasSize(5); // Poll Avro messages from the Kafka topic and deserialize them + // Waiting for 25k kafka records in this test so a longer Duration is added. final List records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5, - Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure - // this - // method - // deserializes - // Avro + Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); + // Ensure this method deserializes Avro // Verify that the correct data is read from the S3 bucket and pushed to Kafka assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message")))) @@ -272,6 +270,7 @@ void parquetTest(final TestInfo testInfo) throws IOException { Files.delete(path); } + // Waiting for a small number of messages so using a smaller Duration of a minute final List records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); final List expectedRecordNames = IntStream.range(0, 100)