From 10db53d72e939687a282eb09a78ec76ff4393259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francesco=20Chicchiricc=C3=B2?= Date: Tue, 19 Nov 2024 14:36:44 +0100 Subject: [PATCH] Initial commit --- LICENSE | 4 +- NOTICE | 9 + pom.xml | 344 ++++++++++++++++++ src/assemble/connector.xml | 51 +++ .../bundles/kafka/KafkaConfiguration.java | 172 +++++++++ .../connid/bundles/kafka/KafkaConnector.java | 306 ++++++++++++++++ .../AbstractValueDeserializer.java | 64 ++++ .../AbstractValueSerializer.java | 61 ++++ .../serialization/AttributeDeserializer.java | 49 +++ .../serialization/AttributeSerializer.java | 38 ++ .../GuardedStringDeserializer.java | 99 +++++ .../GuardedStringSerializer.java | 87 +++++ .../serialization/SyncDeltaDeserializer.java | 38 ++ .../serialization/SyncDeltaSerializer.java | 37 ++ .../serialization/SyncTokenDeserializer.java | 84 +++++ .../serialization/SyncTokenSerializer.java | 57 +++ .../connid/bundles/kafka/Message.properties | 35 ++ .../bundles/kafka/Message_it.properties | 35 ++ .../bundles/kafka/KafkaConnectorTests.java | 131 +++++++ src/test/resources/simplelogger.properties | 20 + 20 files changed, 1719 insertions(+), 2 deletions(-) create mode 100644 NOTICE create mode 100644 pom.xml create mode 100644 src/assemble/connector.xml create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueSerializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeDeserializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeSerializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringSerializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaDeserializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenDeserializer.java create mode 100644 src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java create mode 100644 src/main/resources/net/tirasa/connid/bundles/kafka/Message.properties create mode 100644 src/main/resources/net/tirasa/connid/bundles/kafka/Message_it.properties create mode 100644 src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java create mode 100644 src/test/resources/simplelogger.properties diff --git a/LICENSE b/LICENSE index 261eeb9..8dada3e 100644 --- a/LICENSE +++ b/LICENSE @@ -178,7 +178,7 @@ APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" + boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..8fa8165 --- /dev/null +++ b/NOTICE @@ -0,0 +1,9 @@ +ConnId Kafka bundle +Copyright 2024 ConnId + +This product includes software developed by: +The ConnId project (https://connid.tirasa.net/). + +The following copyright notice(s) were affixed to portions of this code +with which this file is now or was at one time distributed. + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d54b1b8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,344 @@ + + + + + 4.0.0 + + + net.tirasa.connid + connid + 1.6.0.0-SNAPSHOT + + + net.tirasa.connid.bundles + net.tirasa.connid.bundles.kafka + + ConnId Bundles: Kafka + + jar + + http://connid.tirasa.net + 2024 + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git + scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git + scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git + HEAD + + + + jira + https://connid.atlassian.net/browse/KAFKA + + + + GitHub Workflow + https://github.com/Tirasa/ConnIdKafkaBundle/actions + + + + + connid-dev + connid-dev@googlegroups.com + http://groups.google.com/group/connid-dev + + + connid-users + connid-users@googlegroups.com + http://groups.google.com/group/connid-users + + + + + 1.6.0.0-SNAPSHOT + 3.9.0 + 2.17.3 + 1.20.3 + + 11 + UTF-8 + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + + net.tirasa.connid + connector-framework + ${connid.version} + + + net.tirasa.connid + connector-framework-internal + ${connid.version} + + + + + org.slf4j + slf4j-simple + 2.0.16 + test + + + org.testcontainers + kafka + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + org.awaitility + awaitility + 4.2.2 + test + + + net.tirasa.connid + connector-test-common + ${connid.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + false + + ${targetJdk} + false + true + true + -Xlint:unchecked + + + + + org.gaul + modernizer-maven-plugin + 2.9.0 + + ${targetJdk} + + + + modernizer-check + verify + + modernizer + + + + + + + com.mycila + license-maven-plugin + false + + true + + + + + org.apache.rat + apache-rat-plugin + 0.16.1 + + + **/nbactions.xml + **/nb-configuration.xml + **/META-INF/cxf/** + **/META-INF/services/** + **/META-INF/MANIFEST.MF + **/*.csv + **/archetype-resources/** + **/AdminLTE*/** + **/goal.txt + **/rat.txt + **/build-copy-javadoc-files.xml + **/maven-eclipse.xml + **/*.iml + **/*.log + **/.externalToolBuilders/** + .git/** + .idea/** + **/.settings/** + **/.* + **/deb/control/conffiles + **/deb/control/control + **/*.lst + **/*.json + **/banner.txt + **/target/** + + + + + rat-check + verify + + check + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + copy-artifact-legal-files + process-resources + + copy-resources + + + ${project.build.outputDirectory}/META-INF + + + . + + LICENSE + NOTICE + + + + + + + copy-javadoc-legal-files + process-resources + + copy-resources + + + ${project.build.directory}/reports/apidocs/META-INF + + + . + + LICENSE + NOTICE + + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + true + + true + + src/assemble/connector.xml + + + true + + ${connid.version} + ${project.artifactId} + ${project.version} + + + + + + make-assembly + package + + single + + + + + + + + + src/main/resources + + + . + META-INF + + LICENSE + NOTICE + + + + + + + + sonatype + https://oss.sonatype.org/content/repositories/snapshots + + false + + + + + + diff --git a/src/assemble/connector.xml b/src/assemble/connector.xml new file mode 100644 index 0000000..c59d570 --- /dev/null +++ b/src/assemble/connector.xml @@ -0,0 +1,51 @@ + + + + + bundle + + + jar + + + false + + + + ${project.build.directory}/classes + + + + + + + lib + false + false + runtime + + net.tirasa.connid:connector-framework + net.tirasa.connid:connector-framework-internal + + + + diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java new file mode 100644 index 0000000..7533f41 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka; + +import net.tirasa.connid.bundles.kafka.serialization.SyncDeltaSerializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.identityconnectors.common.StringUtil; +import org.identityconnectors.framework.common.exceptions.ConfigurationException; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.spi.AbstractConfiguration; +import org.identityconnectors.framework.spi.ConfigurationProperty; + +public class KafkaConfiguration extends AbstractConfiguration { + + private String bootstrapServers; + + private String clientId; + + private String consumerGroupId; + + private String autoOffsetReset = "earliest"; + + private String valueSerializerClassName = SyncDeltaSerializer.class.getName(); + + private String valueDeserializerClassName = StringDeserializer.class.getName(); + + private String accountTopic = ObjectClass.ACCOUNT_NAME; + + private String groupTopic = ObjectClass.GROUP_NAME; + + private String allTopic = ObjectClass.ALL_NAME; + + private long consumerPollMillis = 100; + + @ConfigurationProperty(displayMessageKey = "bootstrapServers.display", + helpMessageKey = "bootstrapServers.help", required = true, order = 1) + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + @ConfigurationProperty(displayMessageKey = "clientId.display", + helpMessageKey = "clientId.help", required = true, order = 2) + public String getClientId() { + return clientId; + } + + public void setClientId(final String clientId) { + this.clientId = clientId; + } + + @ConfigurationProperty(displayMessageKey = "consumerGroupId.display", + helpMessageKey = "consumerGroupId.help", required = true, order = 3) + public String getConsumerGroupId() { + return consumerGroupId; + } + + public void setConsumerGroupId(final String consumerGroupId) { + this.consumerGroupId = consumerGroupId; + } + + @ConfigurationProperty(displayMessageKey = "autoOffsetReset.display", + helpMessageKey = "autoOffsetReset.help", required = true, order = 4, + allowedValues = { "earliest", "latest", "none" }) + public String getAutoOffsetReset() { + return autoOffsetReset; + } + + public void setAutoOffsetReset(final String autoOffsetReset) { + this.autoOffsetReset = autoOffsetReset; + } + + @ConfigurationProperty(displayMessageKey = "valueSerializerClassName.display", + helpMessageKey = "valueSerializerClassName.help", required = true, order = 5) + public String getValueSerializerClassName() { + return valueSerializerClassName; + } + + public void setValueSerializerClassName(final String valueSerializerClassName) { + this.valueSerializerClassName = valueSerializerClassName; + } + + @ConfigurationProperty(displayMessageKey = "valueDeserializerClassName.display", + helpMessageKey = "valueDeserializerClassName.help", required = true, order = 6) + public String getValueDeserializerClassName() { + return valueDeserializerClassName; + } + + public void setValueDeserializerClassName(final String valueDeserializerClassName) { + this.valueDeserializerClassName = valueDeserializerClassName; + } + + @ConfigurationProperty(displayMessageKey = "accountTopic.display", + helpMessageKey = "accountTopic.help", order = 7) + public String getAccountTopic() { + return accountTopic; + } + + public void setAccountTopic(final String accountTopic) { + this.accountTopic = accountTopic; + } + + @ConfigurationProperty(displayMessageKey = "groupTopic.display", + helpMessageKey = "groupTopic.help", order = 8) + public String getGroupTopic() { + return groupTopic; + } + + public void setGroupTopic(final String groupTopic) { + this.groupTopic = groupTopic; + } + + @ConfigurationProperty(displayMessageKey = "allTopic.display", + helpMessageKey = "allTopic.help", order = 9) + public String getAllTopic() { + return allTopic; + } + + public void setAllTopic(final String allTopic) { + this.allTopic = allTopic; + } + + @ConfigurationProperty(displayMessageKey = "consumerPollMillis.display", + helpMessageKey = "consumerPollMillis.help", order = 10) + public long getConsumerPollMillis() { + return consumerPollMillis; + } + + public void setConsumerPollMillis(final long consumerPollMillis) { + this.consumerPollMillis = consumerPollMillis; + } + + @Override + public void validate() { + if (StringUtil.isBlank(bootstrapServers)) { + throw new ConfigurationException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + } + if (StringUtil.isBlank(clientId)) { + throw new ConfigurationException("Missing " + ProducerConfig.CLIENT_ID_CONFIG); + } + if (StringUtil.isBlank(consumerGroupId)) { + throw new ConfigurationException("Missing " + ConsumerConfig.GROUP_ID_CONFIG); + } + if (StringUtil.isBlank(autoOffsetReset)) { + throw new ConfigurationException("Missing " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + } + if (StringUtil.isBlank(valueSerializerClassName)) { + throw new ConfigurationException("Missing " + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + if (StringUtil.isBlank(valueDeserializerClassName)) { + throw new ConfigurationException("Missing " + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java new file mode 100644 index 0000000..c86e58c --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java @@ -0,0 +1,306 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka; + +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import net.tirasa.connid.bundles.kafka.serialization.AttributeDeserializer; +import net.tirasa.connid.bundles.kafka.serialization.AttributeSerializer; +import net.tirasa.connid.bundles.kafka.serialization.GuardedStringDeserializer; +import net.tirasa.connid.bundles.kafka.serialization.GuardedStringSerializer; +import net.tirasa.connid.bundles.kafka.serialization.SyncTokenDeserializer; +import net.tirasa.connid.bundles.kafka.serialization.SyncTokenSerializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.identityconnectors.common.logging.Log; +import org.identityconnectors.common.security.GuardedString; +import org.identityconnectors.framework.api.operations.APIOperation; +import org.identityconnectors.framework.common.exceptions.ConnectionFailedException; +import org.identityconnectors.framework.common.exceptions.ConnectorException; +import org.identityconnectors.framework.common.exceptions.PreconditionFailedException; +import org.identityconnectors.framework.common.objects.Attribute; +import org.identityconnectors.framework.common.objects.AttributeBuilder; +import org.identityconnectors.framework.common.objects.AttributeUtil; +import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; +import org.identityconnectors.framework.common.objects.Name; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.ObjectClassInfo; +import org.identityconnectors.framework.common.objects.OperationOptionInfo; +import org.identityconnectors.framework.common.objects.OperationOptions; +import org.identityconnectors.framework.common.objects.Schema; +import org.identityconnectors.framework.common.objects.SyncDelta; +import org.identityconnectors.framework.common.objects.SyncDeltaBuilder; +import org.identityconnectors.framework.common.objects.SyncDeltaType; +import org.identityconnectors.framework.common.objects.SyncResultsHandler; +import org.identityconnectors.framework.common.objects.SyncToken; +import org.identityconnectors.framework.common.objects.Uid; +import org.identityconnectors.framework.spi.Configuration; +import org.identityconnectors.framework.spi.Connector; +import org.identityconnectors.framework.spi.ConnectorClass; +import org.identityconnectors.framework.spi.operations.CreateOp; +import org.identityconnectors.framework.spi.operations.DeleteOp; +import org.identityconnectors.framework.spi.operations.SchemaOp; +import org.identityconnectors.framework.spi.operations.SyncOp; +import org.identityconnectors.framework.spi.operations.TestOp; +import org.identityconnectors.framework.spi.operations.UpdateOp; + +@ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display") +public class KafkaConnector + implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, SyncOp, TestOp { + + private static final Log LOG = Log.getLog(KafkaConnector.class); + + public static final JsonMapper MAPPER; + + static { + SimpleModule pojoModule = new SimpleModule("KafkaConnectorModule", new Version(1, 0, 0, null, null, null)); + pojoModule.addSerializer(GuardedString.class, new GuardedStringSerializer()); + pojoModule.addSerializer(Attribute.class, new AttributeSerializer()); + pojoModule.addSerializer(SyncToken.class, new SyncTokenSerializer()); + pojoModule.addDeserializer(GuardedString.class, new GuardedStringDeserializer()); + pojoModule.addDeserializer(Attribute.class, new AttributeDeserializer()); + pojoModule.addDeserializer(SyncToken.class, new SyncTokenDeserializer()); + + MAPPER = JsonMapper.builder(). + addModule(pojoModule). + addModule(new JavaTimeModule()). + disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS). + visibility(PropertyAccessor.ALL, Visibility.NONE). + visibility(PropertyAccessor.FIELD, Visibility.ANY). + build(); + } + + private KafkaConfiguration configuration; + + private KafkaProducer producer; + + @Override + public KafkaConfiguration getConfiguration() { + return configuration; + } + + @Override + public void init(final Configuration configuration) { + this.configuration = (KafkaConfiguration) configuration; + this.configuration.validate(); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configuration.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, this.configuration.getClientId()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.configuration.getValueSerializerClassName()); + try { + producer = new KafkaProducer<>(props); + } catch (KafkaException e) { + LOG.error(e, "While creating Kafka producer"); + throw new ConnectionFailedException(e); + } + + LOG.ok("Connector {0} successfully inited", getClass().getName()); + } + + @Override + public void dispose() { + try { + Optional.ofNullable(producer).ifPresent(KafkaProducer::close); + } catch (KafkaException e) { + LOG.error(e, "While closing Kafka producer"); + throw new ConnectorException(e); + } + } + + private KafkaConsumer createConsumer(final ObjectClass objectClass) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configuration.getBootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, this.configuration.getClientId()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getConsumerGroupId()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.configuration.getAutoOffsetReset()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.configuration.getValueDeserializerClassName()); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(getTopic(objectClass))); + return consumer; + } + + @Override + public void test() { + if (producer == null) { + throw new ConnectorException("No Kafka producer configured"); + } + try { + producer.clientInstanceId(Duration.ofSeconds(10)); + } catch (Exception e) { + LOG.error(e, "While testing Kafka producer"); + throw new ConnectionFailedException(e); + } + + try (KafkaConsumer consumer = createConsumer(ObjectClass.ACCOUNT)) { + consumer.clientInstanceId(Duration.ofSeconds(10)); + } catch (Exception e) { + LOG.error(e, "While testing Kafka consumer"); + throw new ConnectionFailedException(e); + } + } + + @Override + public Schema schema() { + return new Schema( + Collections.emptySet(), + Collections.emptySet(), + Collections., Set>emptyMap(), + Collections., Set>emptyMap()); + } + + private String getTopic(final ObjectClass objectClass) { + if (ObjectClass.ACCOUNT.equals(objectClass)) { + return configuration.getAccountTopic(); + } + if (ObjectClass.GROUP.equals(objectClass)) { + return configuration.getGroupTopic(); + } + if (ObjectClass.ALL.equals(objectClass)) { + return configuration.getAllTopic(); + } + throw new PreconditionFailedException("Unsupported object class: " + objectClass.getObjectClassValue()); + } + + @Override + public Uid create( + final ObjectClass objectClass, + final Set createAttributes, + final OperationOptions options) { + + Uid uid = new Uid(Optional.ofNullable(AttributeUtil.getNameFromAttributes(createAttributes)). + orElseThrow(() -> new ConnectorException(Name.NAME + " not found in create attributes")). + getNameValue()); + + SyncDelta syncDelta = new SyncDeltaBuilder(). + setDeltaType(SyncDeltaType.CREATE). + setObjectClass(objectClass). + setUid(uid). + setObject(new ConnectorObjectBuilder().addAttributes(createAttributes).setUid(uid).build()). + setToken(new SyncToken(System.currentTimeMillis())). + build(); + try { + producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta)); + } catch (Exception e) { + throw new ConnectorException("Could not send the create event to " + getTopic(objectClass), e); + } + + return uid; + } + + @Override + public Uid update( + final ObjectClass objectClass, + final Uid uid, + final Set replaceAttributes, + final OperationOptions options) { + + SyncDelta syncDelta = new SyncDeltaBuilder(). + setDeltaType(SyncDeltaType.UPDATE). + setObjectClass(objectClass). + setUid(uid). + setObject(new ConnectorObjectBuilder().addAttributes(replaceAttributes).setUid(uid).build()). + setToken(new SyncToken(System.currentTimeMillis())). + build(); + try { + producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta)); + } catch (Exception e) { + throw new ConnectorException("Could not send the update event to " + getTopic(objectClass), e); + } + + return uid; + } + + @Override + public void delete( + final ObjectClass objectClass, + final Uid uid, + final OperationOptions options) { + + SyncDelta syncDelta = new SyncDeltaBuilder(). + setDeltaType(SyncDeltaType.DELETE). + setObjectClass(objectClass). + setUid(uid). + setToken(new SyncToken(System.currentTimeMillis())). + build(); + try { + producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta)); + } catch (Exception e) { + throw new ConnectorException("Could not send the delete event to " + getTopic(objectClass), e); + } + } + + @Override + public void sync( + final ObjectClass objectClass, + final SyncToken token, + final SyncResultsHandler handler, + final OperationOptions options) { + + try (KafkaConsumer consumer = createConsumer(objectClass)) { + consumer.poll(Duration.ofMillis(configuration.getConsumerPollMillis())).forEach(record -> { + LOG.ok("Processing {0} for topic {1}", record.key(), record.topic()); + + Uid uid = new Uid(Optional.ofNullable(record.key()).orElseGet(() -> UUID.randomUUID().toString())); + Map headers = new HashMap<>(); + record.headers().forEach(header -> headers.put(header.key(), new String(header.value()))); + + handler.handle(new SyncDeltaBuilder(). + setDeltaType(SyncDeltaType.CREATE_OR_UPDATE). + setObjectClass(objectClass). + setUid(uid). + setObject(new ConnectorObjectBuilder(). + addAttribute(new Name(uid.getUidValue())). + addAttribute(AttributeBuilder.build("record.headers", headers)). + addAttribute(AttributeBuilder.build("record.value", record.value())). + setUid(uid). + build()). + setToken(new SyncToken(record.timestamp())). + build()); + }); + } catch (Exception e) { + throw new ConnectorException("While polling events from " + getTopic(objectClass), e); + } + } + + @Override + public SyncToken getLatestSyncToken(final ObjectClass objectClass) { + return new SyncToken(System.currentTimeMillis()); + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java new file mode 100644 index 0000000..ca97a91 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import org.identityconnectors.common.StringUtil; +import org.identityconnectors.common.security.GuardedString; + +public abstract class AbstractValueDeserializer extends JsonDeserializer { + + protected List doDeserialize(final JsonNode value, final JsonParser jp) throws IOException { + List values = new ArrayList<>(); + + for (JsonNode node : value) { + if (node.isNull()) { + values.add(null); + } else if (node.isObject()) { + values.add(((ObjectNode) node).traverse(jp.getCodec()).readValueAs(GuardedString.class)); + } else if (node.isBoolean()) { + values.add(node.asBoolean()); + } else if (node.isDouble()) { + values.add(node.asDouble()); + } else if (node.isLong()) { + values.add(node.asLong()); + } else if (node.isInt()) { + values.add(node.asInt()); + } else { + String text = node.asText(); + if (text.startsWith(AbstractValueSerializer.BYTE_ARRAY_PREFIX) + && text.endsWith(AbstractValueSerializer.BYTE_ARRAY_SUFFIX)) { + + values.add(Base64.getDecoder().decode(StringUtil.substringBetween( + text, + AbstractValueSerializer.BYTE_ARRAY_PREFIX, + AbstractValueSerializer.BYTE_ARRAY_SUFFIX))); + } else { + values.add(text); + } + } + } + + return values; + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueSerializer.java new file mode 100644 index 0000000..97d1355 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueSerializer.java @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import java.io.IOException; +import java.util.Base64; +import java.util.List; +import org.identityconnectors.common.security.GuardedString; + +public abstract class AbstractValueSerializer extends JsonSerializer { + + public static final String BYTE_ARRAY_PREFIX = ""; + + public static final String BYTE_ARRAY_SUFFIX = ""; + + protected void doSerialize(final List value, final JsonGenerator jgen) throws IOException { + if (value == null) { + jgen.writeNull(); + } else { + jgen.writeStartArray(); + for (Object v : value) { + if (v == null) { + jgen.writeNull(); + } else if (v instanceof GuardedString) { + jgen.writeObject(v); + } else if (v instanceof Integer) { + jgen.writeNumber((Integer) v); + } else if (v instanceof Long) { + jgen.writeNumber((Long) v); + } else if (v instanceof Double) { + jgen.writeNumber((Double) v); + } else if (v instanceof Boolean) { + jgen.writeBoolean((Boolean) v); + } else if (v instanceof byte[]) { + jgen.writeString( + BYTE_ARRAY_PREFIX + + Base64.getEncoder().encodeToString((byte[]) v) + + BYTE_ARRAY_SUFFIX); + } else { + jgen.writeString(v.toString()); + } + } + jgen.writeEndArray(); + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeDeserializer.java new file mode 100644 index 0000000..cf32ed2 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeDeserializer.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.List; +import org.identityconnectors.framework.common.objects.Attribute; +import org.identityconnectors.framework.common.objects.AttributeBuilder; +import org.identityconnectors.framework.common.objects.Name; +import org.identityconnectors.framework.common.objects.Uid; + +@SuppressWarnings("squid:S3776") +public class AttributeDeserializer extends AbstractValueDeserializer { + + @Override + public Attribute deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException { + ObjectNode tree = jp.readValueAsTree(); + + String name = tree.get("name").asText(); + + List values = doDeserialize(tree.get("value"), jp); + + if (Uid.NAME.equals(name)) { + return new Uid(values.isEmpty() || values.get(0) == null ? null : values.get(0).toString()); + } else { + if (Name.NAME.equals(name)) { + return new Name(values.isEmpty() || values.get(0) == null ? null : values.get(0).toString()); + } + + return AttributeBuilder.build(name, values); + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeSerializer.java new file mode 100644 index 0000000..be91123 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AttributeSerializer.java @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import java.io.IOException; +import org.identityconnectors.framework.common.objects.Attribute; + +public class AttributeSerializer extends AbstractValueSerializer { + + @Override + public void serialize(final Attribute source, final JsonGenerator jgen, final SerializerProvider sp) + throws IOException { + + jgen.writeStartObject(); + + jgen.writeStringField("name", source.getName()); + + jgen.writeFieldName("value"); + doSerialize(source.getValue(), jgen); + + jgen.writeEndObject(); + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java new file mode 100644 index 0000000..e192689 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Base64; +import org.identityconnectors.common.logging.Log; +import org.identityconnectors.common.security.EncryptorFactory; +import org.identityconnectors.common.security.GuardedString; + +public class GuardedStringDeserializer extends JsonDeserializer { + + private static final Log LOG = Log.getLog(GuardedStringDeserializer.class); + + private static final String READONLY = "readOnly"; + + private static final String DISPOSED = "disposed"; + + private static final String ENCRYPTED_BYTES = "encryptedBytes"; + + private static final String BASE64_SHA1_HASH = "base64SHA1Hash"; + + private static final String LOG_ERROR_MESSAGE = "Could not set field value to {}"; + + @Override + public GuardedString deserialize(final JsonParser jp, final DeserializationContext ctx) + throws IOException { + + ObjectNode tree = jp.readValueAsTree(); + + boolean readOnly = false; + if (tree.has(READONLY)) { + readOnly = tree.get(READONLY).asBoolean(); + } + boolean disposed = false; + if (tree.has(DISPOSED)) { + disposed = tree.get(DISPOSED).asBoolean(); + } + byte[] encryptedBytes = null; + if (tree.has(ENCRYPTED_BYTES)) { + encryptedBytes = Base64.getDecoder().decode(tree.get(ENCRYPTED_BYTES).asText()); + } + String base64SHA1Hash = null; + if (tree.has(BASE64_SHA1_HASH)) { + base64SHA1Hash = tree.get(BASE64_SHA1_HASH).asText(); + } + + final byte[] clearBytes = EncryptorFactory.getInstance().getDefaultEncryptor().decrypt(encryptedBytes); + + GuardedString dest = new GuardedString(new String(clearBytes).toCharArray()); + + try { + Field field = GuardedString.class.getDeclaredField(READONLY); + field.setAccessible(true); + field.set(dest, readOnly); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, readOnly, e); + } + + try { + Field field = GuardedString.class.getDeclaredField(DISPOSED); + field.setAccessible(true); + field.set(dest, disposed); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, disposed, e); + } + + if (base64SHA1Hash != null) { + try { + Field field = GuardedString.class.getDeclaredField(BASE64_SHA1_HASH); + field.setAccessible(true); + field.set(dest, base64SHA1Hash); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, base64SHA1Hash, e); + } + } + + return dest; + } + +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringSerializer.java new file mode 100644 index 0000000..272854a --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringSerializer.java @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Base64; +import org.identityconnectors.common.logging.Log; +import org.identityconnectors.common.security.EncryptorFactory; +import org.identityconnectors.common.security.GuardedString; +import org.identityconnectors.common.security.SecurityUtil; + +public class GuardedStringSerializer extends JsonSerializer { + + private static final Log LOG = Log.getLog(GuardedStringDeserializer.class); + + private static final String READONLY = "readOnly"; + + private static final String DISPOSED = "disposed"; + + private static final String ENCRYPTED_BYTES = "encryptedBytes"; + + private static final String BASE64_SHA1_HASH = "base64SHA1Hash"; + + private static final String LOG_ERROR_MESSAGE = "Could not get field value"; + + @Override + public void serialize(final GuardedString source, final JsonGenerator jgen, final SerializerProvider sp) + throws IOException { + + jgen.writeStartObject(); + + boolean readOnly = false; + try { + Field field = GuardedString.class.getDeclaredField(READONLY); + field.setAccessible(true); + readOnly = field.getBoolean(source); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, e); + } + jgen.writeBooleanField(READONLY, readOnly); + + boolean disposed = false; + try { + Field field = GuardedString.class.getDeclaredField(DISPOSED); + field.setAccessible(true); + disposed = field.getBoolean(source); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, e); + } + jgen.writeBooleanField(DISPOSED, disposed); + + byte[] encryptedBytes = + EncryptorFactory.getInstance().getDefaultEncryptor().encrypt(SecurityUtil.decrypt(source).getBytes()); + jgen.writeStringField(ENCRYPTED_BYTES, Base64.getEncoder().encodeToString(encryptedBytes)); + + String base64SHA1Hash = null; + try { + Field field = GuardedString.class.getDeclaredField(BASE64_SHA1_HASH); + field.setAccessible(true); + base64SHA1Hash = field.get(source).toString(); + } catch (Exception e) { + LOG.error(LOG_ERROR_MESSAGE, e); + } + if (base64SHA1Hash != null) { + jgen.writeStringField(BASE64_SHA1_HASH, base64SHA1Hash); + } + + jgen.writeEndObject(); + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaDeserializer.java new file mode 100644 index 0000000..607a62f --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaDeserializer.java @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import net.tirasa.connid.bundles.kafka.KafkaConnector; +import org.apache.kafka.common.serialization.Deserializer; +import org.identityconnectors.common.logging.Log; +import org.identityconnectors.framework.common.objects.SyncDelta; + +public class SyncDeltaDeserializer implements Deserializer { + + private static final Log LOG = Log.getLog(SyncDeltaDeserializer.class); + + @Override + public SyncDelta deserialize(final String topic, final byte[] data) { + try { + return KafkaConnector.MAPPER.readValue(new ByteArrayInputStream(data), SyncDelta.class); + } catch (IOException e) { + LOG.error(e, "While deserializing {0}", new String(data)); + return null; + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerializer.java new file mode 100644 index 0000000..ec2bf87 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerializer.java @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import java.io.IOException; +import net.tirasa.connid.bundles.kafka.KafkaConnector; +import org.apache.kafka.common.serialization.Serializer; +import org.identityconnectors.common.logging.Log; +import org.identityconnectors.framework.common.objects.SyncDelta; + +public class SyncDeltaSerializer implements Serializer { + + private static final Log LOG = Log.getLog(SyncDeltaSerializer.class); + + @Override + public byte[] serialize(final String topic, final SyncDelta data) { + try { + return KafkaConnector.MAPPER.writeValueAsBytes(data); + } catch (IOException e) { + LOG.error(e, "While serializing {0}", data); + return null; + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenDeserializer.java new file mode 100644 index 0000000..7bf968a --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenDeserializer.java @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.Base64; +import java.util.Objects; +import org.identityconnectors.framework.common.objects.SyncToken; + +public class SyncTokenDeserializer extends JsonDeserializer { + + @Override + public SyncToken deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException { + ObjectNode tree = jp.readValueAsTree(); + + Object value = tree.has("value") + ? tree.has("type") + ? deserialize(tree.get("value"), tree.get("type")) + : deserialize(tree.get("value")) + : null; + + return new SyncToken(Objects.requireNonNull(value)); + } + + private Object deserialize(final JsonNode value, final JsonNode type) { + if (Boolean.class.getSimpleName().equals(type.asText())) { + return value.asBoolean(); + } + + if (Double.class.getSimpleName().equals(type.asText())) { + return value.asDouble(); + } + if (Long.class.getSimpleName().equals(type.asText())) { + return value.asLong(); + } + if (Integer.class.getSimpleName().equals(type.asText())) { + return value.asInt(); + } + + return value.asText(); + } + + private Object deserialize(final JsonNode value) { + if (value.isBoolean()) { + return value.asBoolean(); + } + + if (value.isDouble()) { + return value.asDouble(); + } + + if (value.isLong()) { + return value.asLong(); + } + + if (value.isInt()) { + return value.asInt(); + } + + try { + return Base64.getDecoder().decode(value.asText()); + } catch (RuntimeException e) { + return value.asText(); + } + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java new file mode 100644 index 0000000..82f47d7 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import java.io.IOException; +import java.util.Base64; +import org.identityconnectors.framework.common.objects.SyncToken; + +public class SyncTokenSerializer extends JsonSerializer { + + @Override + public void serialize(final SyncToken source, final JsonGenerator jgen, final SerializerProvider sp) + throws IOException { + + jgen.writeStartObject(); + + if (source.getValue() == null) { + jgen.writeNullField("value"); + } else if (source.getValue() instanceof Boolean) { + jgen.writeStringField("type", Boolean.class.getSimpleName()); + jgen.writeBooleanField("value", (Boolean) source.getValue()); + } else if (source.getValue() instanceof Double) { + jgen.writeStringField("type", Double.class.getSimpleName()); + jgen.writeNumberField("value", (Double) source.getValue()); + } else if (source.getValue() instanceof Long) { + jgen.writeStringField("type", Long.class.getSimpleName()); + jgen.writeNumberField("value", (Long) source.getValue()); + } else if (source.getValue() instanceof Integer) { + jgen.writeStringField("type", Integer.class.getSimpleName()); + jgen.writeNumberField("value", (Integer) source.getValue()); + } else if (source.getValue() instanceof byte[]) { + jgen.writeStringField("value", Base64.getEncoder().encodeToString((byte[]) source.getValue())); + } else { + jgen.writeStringField("type", String.class.getSimpleName()); + jgen.writeStringField("value", source.getValue().toString()); + } + + jgen.writeEndObject(); + } + +} diff --git a/src/main/resources/net/tirasa/connid/bundles/kafka/Message.properties b/src/main/resources/net/tirasa/connid/bundles/kafka/Message.properties new file mode 100644 index 0000000..03baf46 --- /dev/null +++ b/src/main/resources/net/tirasa/connid/bundles/kafka/Message.properties @@ -0,0 +1,35 @@ +# +# Copyright (C) 2016 ConnId (connid-dev@googlegroups.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +kafka.connector.display=Kafka Connector +bootstrapServers.display=Sample Property +bootstrapServers.help=Only a sample property +clientId.display=Client id +clientId.help=Client id for subscription +autoOffsetReset.display=Auto offset reset +autoOffsetReset.help=What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted); defaults to 'earliest' in order to make sure the producer sent all messages before the consumer starts. +valueSerializerClassName.display=Value serializer class +valueSerializerClassName.help=Serializer class for value that implements the 'org.apache.kafka.common.serialization.Serializer' interface. Defaults to 'net.tirasa.connid.bundles.kafka.serialization.SyncDeltaSerializer' +valueDeserializerClassName.display=Value deserializer class +valueDeserializerClassName.help=Deserializer class for value that implements the 'org.apache.kafka.common.serialization.Deserializer' interface. Defaults to 'org.apache.kafka.common.serialization.StringDeserializer' +accountTopic.display=Topic to publish to and being subscribed to for object class __ACCOUNT__ +accountTopic.help=A topic is similar to a folder in a filesystem, and the events are the files in that folder. +groupTopic.display=Topic to publish to and being subscribed to for object class __GROUP__ +groupTopic.help=A topic is similar to a folder in a filesystem, and the events are the files in that folder. +allTopic.display=Topic to publish to and being subscribed to for object class __ALL__ +allTopic.help=A topic is similar to a folder in a filesystem, and the events are the files in that folder. +consumerPollMillis.display=The maximum number of milliseconds to block while polling +consumerPollMillis.help=Polling returns immediately if there are records available or if the position advances past control records; otherwise, it will await the passed timeout: if the timeout expires, an empty record set will be returned. diff --git a/src/main/resources/net/tirasa/connid/bundles/kafka/Message_it.properties b/src/main/resources/net/tirasa/connid/bundles/kafka/Message_it.properties new file mode 100644 index 0000000..c38e3b5 --- /dev/null +++ b/src/main/resources/net/tirasa/connid/bundles/kafka/Message_it.properties @@ -0,0 +1,35 @@ +# +# Copyright (C) 2016 ConnId (connid-dev@googlegroups.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +kafka.connector.display=Connettore Kafka +bootstrapServers.display=Propriet\u00e0 d'esempio +bootstrapServers.help=Solo una propriet\u00e0 d'esempio +clientId.display=Id client +clientId.help=Id client per sottoscrizione +autoOffsetReset.display=Reset auto offset +autoOffsetReset.help=Cosa fare quando non c'\u00e8 alcun offset iniziale in Kafka o se l'offset corrente non esiste pi\u00f9 sul server (ad esempio perch\u00e9 i dati sono stati eliminati); il valore predefinito \u00e8 "earliest" per assicurarsi che il producer abbia inviato tutti i messaggi prima che il consumer inizi. +valueSerializerClassName.display=Classe serializzazione valori +valueSerializerClassName.help=Classe per la serializzazione dei valori che implementa l'interfaccia org.apache.kafka.common.serialization.Serializer. Il valore predefinito \u00e8 'net.tirasa.connid.bundles.kafka.serialization.SyncDeltaSerializer' +valueDeserializerClassName.display=Classe deserializzazione valori +valueDeserializerClassName.help=Classe per la deserializzazione dei valori che implementa l'interfaccia 'org.apache.kafka.common.serialization.Deserializer'. Il valore predefinito \u00e8 'org.apache.kafka.common.serialization.StringDeserializer' +accountTopic.display=Topic su cui pubblicare e di cui essere abbonati per l'object class __ACCOUNT__ +accountTopic.help=Un topic \u00e8 simile a una cartella in un file system e gli eventi sono i file contenuti in quella cartella. +groupTopic.display=Topic su cui pubblicare e di cui essere abbonati per l'object class __GROUP__ +groupTopic.help=Un topic \u00e8 simile a una cartella in un file system e gli eventi sono i file contenuti in quella cartella. +allTopic.display=Topic su cui pubblicare e di cui essere abbonati per l'object class __ALL__ +allTopic.help=Un topic \u00e8 simile a una cartella in un file system e gli eventi sono i file contenuti in quella cartella. +consumerPollMillis.display=Numero massimo di millisecondi da bloccare durante il polling +consumerPollMillis.help=Il polling restituisce un risultato immediato se ci sono record disponibili o se la posizione supera i record di controllo; in caso contrario, attender\u00e0 il timeout trascorso: se il timeout scade, verr\u00e0 restituito un set di record vuoto. diff --git a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java new file mode 100644 index 0000000..a60a66a --- /dev/null +++ b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.identityconnectors.framework.api.APIConfiguration; +import org.identityconnectors.framework.api.ConnectorFacade; +import org.identityconnectors.framework.api.ConnectorFacadeFactory; +import org.identityconnectors.framework.common.objects.Name; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.OperationOptionsBuilder; +import org.identityconnectors.framework.common.objects.SyncDelta; +import org.identityconnectors.framework.common.objects.SyncToken; +import org.identityconnectors.framework.common.objects.Uid; +import org.identityconnectors.test.common.TestHelpers; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +class KafkaConnectorTests { + + @Container + static ConfluentKafkaContainer KAFKA_CONTAINER = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:7.7.1")); + + protected KafkaConfiguration newConfiguration() { + KafkaConfiguration config = new KafkaConfiguration(); + config.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()); + config.setClientId("connid-client"); + config.setConsumerGroupId("connid-test"); + return config; + } + + protected ConnectorFacade newFacade() { + ConnectorFacadeFactory factory = ConnectorFacadeFactory.getInstance(); + APIConfiguration impl = TestHelpers.createTestConfiguration(KafkaConnector.class, newConfiguration()); + impl.getResultsHandlerConfiguration().setFilteredResultsHandlerInValidationMode(true); + return factory.newInstance(impl); + } + + @Test + void test() { + newFacade().test(); + } + + @Test + void create() { + Uid created = newFacade().create( + ObjectClass.ACCOUNT, + Set.of(new Name("testcreate")), + new OperationOptionsBuilder().build()); + assertNotNull(created); + } + + @Test + void update() { + Uid uid = new Uid("testupdate"); + Uid updated = newFacade().update( + ObjectClass.ACCOUNT, + uid, + Set.of(new Name("testupdate")), + new OperationOptionsBuilder().build()); + assertEquals(uid, updated); + } + + @Test + void delete() { + assertDoesNotThrow(() -> newFacade().delete( + ObjectClass.ACCOUNT, + new Uid("testdelete"), + new OperationOptionsBuilder().build())); + } + + @Test + void sync() throws InterruptedException, ExecutionException { + // create a producer and send a new event + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, getClass().getSimpleName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + String value = "{\"key\":\"value\"}"; + try (KafkaProducer producer = new KafkaProducer<>(props)) { + producer.send(new ProducerRecord<>(ObjectClass.GROUP_NAME, value)).get(); + } + + // sync + List deltas = new ArrayList<>(); + await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { + newFacade().sync( + ObjectClass.GROUP, + new SyncToken(""), + deltas::add, + new OperationOptionsBuilder().build()); + return !deltas.isEmpty(); + }); + assertEquals(1, deltas.size()); + assertEquals(value, deltas.get(0).getObject().getAttributeByName("record.value").getValue().get(0)); + } +} diff --git a/src/test/resources/simplelogger.properties b/src/test/resources/simplelogger.properties new file mode 100644 index 0000000..f639e44 --- /dev/null +++ b/src/test/resources/simplelogger.properties @@ -0,0 +1,20 @@ +# +# Copyright (C) 2016 ConnId (connid-dev@googlegroups.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See http://www.slf4j.org/api/org/slf4j/impl/SimpleLogger.html +# Possible values: "trace", "debug", "info", "warn", or "error" +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.log.org.apache.kafka.clients=trace