From fc17f35e84513d2486e84f1ebf53013db783d363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francesco=20Chicchiricc=C3=B2?= Date: Tue, 3 Dec 2024 11:28:25 +0100 Subject: [PATCH] Refinements --- .../tirasa/connid/bundles/kafka/KafkaConnector.java | 4 ++-- .../tirasa/connid/bundles/kafka/Messages.properties | 4 ++-- .../tirasa/connid/bundles/kafka/Messages_it.properties | 4 ++-- .../connid/bundles/kafka/KafkaConnectorTests.java | 10 +++++++++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java index 881c411..197ae9b 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java @@ -163,14 +163,14 @@ public void test() { throw new ConnectorException("No Kafka producer configured"); } try { - producer.clientInstanceId(Duration.ofSeconds(10)); + producer.metrics(); } catch (Exception e) { LOG.error(e, "While testing Kafka producer"); throw new ConnectionFailedException(e); } try (KafkaConsumer consumer = createConsumer(ObjectClass.ACCOUNT)) { - consumer.clientInstanceId(Duration.ofSeconds(10)); + consumer.listTopics(Duration.ofSeconds(10)); } catch (Exception e) { LOG.error(e, "While testing Kafka consumer"); throw new ConnectionFailedException(e); diff --git a/src/main/resources/net/tirasa/connid/bundles/kafka/Messages.properties b/src/main/resources/net/tirasa/connid/bundles/kafka/Messages.properties index feeaad9..266a1dc 100644 --- a/src/main/resources/net/tirasa/connid/bundles/kafka/Messages.properties +++ b/src/main/resources/net/tirasa/connid/bundles/kafka/Messages.properties @@ -15,8 +15,8 @@ # kafka.connector.display=Kafka Connector -bootstrapServers.display=Sample Property -bootstrapServers.help=Only a sample property +bootstrapServers.display=Bootstrap servers +bootstrapServers.help=A list of host/port pairs used to establish the initial connection to the Kafka cluster clientId.display=Client id clientId.help=Client id for subscription autoOffsetReset.display=Auto offset reset diff --git a/src/main/resources/net/tirasa/connid/bundles/kafka/Messages_it.properties b/src/main/resources/net/tirasa/connid/bundles/kafka/Messages_it.properties index 05dd336..56211ab 100644 --- a/src/main/resources/net/tirasa/connid/bundles/kafka/Messages_it.properties +++ b/src/main/resources/net/tirasa/connid/bundles/kafka/Messages_it.properties @@ -15,8 +15,8 @@ # kafka.connector.display=Connettore Kafka -bootstrapServers.display=Propriet\u00e0 d'esempio -bootstrapServers.help=Solo una propriet\u00e0 d'esempio +bootstrapServers.display=Server di bootstrap +bootstrapServers.help=Un elenco di coppie host/porta utilizzate per stabilire la connessione iniziale al cluster Kafka clientId.display=Id client clientId.help=Id client per sottoscrizione autoOffsetReset.display=Reset auto offset diff --git a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java index 5136fe2..40f43cb 100644 --- a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java +++ b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; @@ -102,6 +103,13 @@ void delete() { @Test void livesync() throws InterruptedException, ExecutionException { + List deltas = new ArrayList<>(); + newFacade().livesync( + ObjectClass.GROUP, + deltas::add, + new OperationOptionsBuilder().build()); + assertTrue(deltas.isEmpty()); + // create a producer and send a new event Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); @@ -115,7 +123,7 @@ void livesync() throws InterruptedException, ExecutionException { } // live sync - List deltas = new ArrayList<>(); + deltas.clear(); await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { newFacade().livesync( ObjectClass.GROUP,