Skip to content

Commit

Permalink
Add comments explaining changes and improve the method name for consu…
Browse files Browse the repository at this point in the history
…ming messages.

Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 3, 2025
1 parent baab1ec commit 8267489
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,23 @@ static <K, V> List<V> consumeMessages(final String topic, final int expectedMess

final List<V> recordValues = new ArrayList<>();
await().atMost(expectedMaxDuration).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount);
assertThat(consumeRecordsInProgress(consumer, recordValues)).hasSize(expectedMessageCount);
});
return recordValues;
}
}

private static <K, V> List<V> assertAllRecordsConsumed(KafkaConsumer<K, V> consumer, List<V> recordValues) {
int recordsRetrieved = 0;
private static <K, V> List<V> consumeRecordsInProgress(KafkaConsumer<K, V> consumer, List<V> recordValues) {
int recordsRetrieved;
do {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
recordsRetrieved = records.count();
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
// Choosing 10 records as it allows for integration tests with a smaller max poll to be added
// while maintaining efficiency, a slightly larger number could be added but this is slightly more efficient
// than larger numbers.
} while (recordsRetrieved > 10);
return recordValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,10 @@ void avroTest(final TestInfo testInfo) throws IOException {
assertThat(testBucketAccessor.listObjects()).hasSize(5);

// Poll Avro messages from the Kafka topic and deserialize them
// Waiting for 25k kafka records in this test so a longer Duration is added.
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, numOfRecsFactor * 5,
Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl()); // Ensure
// this
// method
// deserializes
// Avro
Duration.ofMinutes(3), connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl());
// Ensure this method deserializes Avro

// Verify that the correct data is read from the S3 bucket and pushed to Kafka
assertThat(records).map(record -> entry(record.get("id"), String.valueOf(record.get("message"))))
Expand Down Expand Up @@ -272,6 +270,7 @@ void parquetTest(final TestInfo testInfo) throws IOException {
Files.delete(path);
}

// Waiting for a small number of messages so using a smaller Duration of a minute
final List<GenericRecord> records = IntegrationBase.consumeAvroMessages(topicName, 100, Duration.ofSeconds(60),
connectRunner.getBootstrapServers(), schemaRegistry.getSchemaRegistryUrl());
final List<String> expectedRecordNames = IntStream.range(0, 100)
Expand Down

0 comments on commit 8267489

Please sign in to comment.