diff --git a/LICENSE b/LICENSE
index 261eeb9..8dada3e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -178,7 +178,7 @@
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
+ boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright [yyyy] [name of copyright owner]
+ Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..8fa8165
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,9 @@
+ConnId Kafka bundle
+Copyright 2024 ConnId
+
+This product includes software developed by:
+The ConnId project (https://connid.tirasa.net/).
+
+The following copyright notice(s) were affixed to portions of this code
+with which this file is now or was at one time distributed.
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d54b1b8
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,344 @@
+
+
+
+
+ 4.0.0
+
+
+ net.tirasa.connid
+ connid
+ 1.6.0.0-SNAPSHOT
+
+
+ net.tirasa.connid.bundles
+ net.tirasa.connid.bundles.kafka
+
+ ConnId Bundles: Kafka
+
+ jar
+
+ http://connid.tirasa.net
+ 2024
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git
+ scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git
+ scm:git:git@github.com:Tirasa/ConnIdKafkaBundle.git
+ HEAD
+
+
+
+ jira
+ https://connid.atlassian.net/browse/KAFKA
+
+
+
+ GitHub Workflow
+ https://github.com/Tirasa/ConnIdKafkaBundle/actions
+
+
+
+
+ connid-dev
+ connid-dev@googlegroups.com
+ http://groups.google.com/group/connid-dev
+
+
+ connid-users
+ connid-users@googlegroups.com
+ http://groups.google.com/group/connid-users
+
+
+
+
+ 1.6.0.0-SNAPSHOT
+ 3.9.0
+ 2.17.3
+ 1.20.3
+
+ 11
+ UTF-8
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson.version}
+
+
+
+ net.tirasa.connid
+ connector-framework
+ ${connid.version}
+
+
+ net.tirasa.connid
+ connector-framework-internal
+ ${connid.version}
+
+
+
+
+ org.slf4j
+ slf4j-simple
+ 2.0.16
+ test
+
+
+ org.testcontainers
+ kafka
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+ org.awaitility
+ awaitility
+ 4.2.2
+ test
+
+
+ net.tirasa.connid
+ connector-test-common
+ ${connid.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.13.0
+ false
+
+ ${targetJdk}
+ false
+ true
+ true
+ -Xlint:unchecked
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+ 2.9.0
+
+ ${targetJdk}
+
+
+
+ modernizer-check
+ verify
+
+ modernizer
+
+
+
+
+
+
+ com.mycila
+ license-maven-plugin
+ false
+
+ true
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.16.1
+
+
+ **/nbactions.xml
+ **/nb-configuration.xml
+ **/META-INF/cxf/**
+ **/META-INF/services/**
+ **/META-INF/MANIFEST.MF
+ **/*.csv
+ **/archetype-resources/**
+ **/AdminLTE*/**
+ **/goal.txt
+ **/rat.txt
+ **/build-copy-javadoc-files.xml
+ **/maven-eclipse.xml
+ **/*.iml
+ **/*.log
+ **/.externalToolBuilders/**
+ .git/**
+ .idea/**
+ **/.settings/**
+ **/.*
+ **/deb/control/conffiles
+ **/deb/control/control
+ **/*.lst
+ **/*.json
+ **/banner.txt
+ **/target/**
+
+
+
+
+ rat-check
+ verify
+
+ check
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+
+
+ copy-artifact-legal-files
+ process-resources
+
+ copy-resources
+
+
+ ${project.build.outputDirectory}/META-INF
+
+
+ .
+
+ LICENSE
+ NOTICE
+
+
+
+
+
+
+ copy-javadoc-legal-files
+ process-resources
+
+ copy-resources
+
+
+ ${project.build.directory}/reports/apidocs/META-INF
+
+
+ .
+
+ LICENSE
+ NOTICE
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ true
+
+ true
+
+ src/assemble/connector.xml
+
+
+ true
+
+ ${connid.version}
+ ${project.artifactId}
+ ${project.version}
+
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+ src/main/resources
+
+
+ .
+ META-INF
+
+ LICENSE
+ NOTICE
+
+
+
+
+
+
+
+ sonatype
+ https://oss.sonatype.org/content/repositories/snapshots
+
+ false
+
+
+
+
+
+
diff --git a/src/assemble/connector.xml b/src/assemble/connector.xml
new file mode 100644
index 0000000..c59d570
--- /dev/null
+++ b/src/assemble/connector.xml
@@ -0,0 +1,51 @@
+
+
+
+
+ bundle
+
+
+ jar
+
+
+ false
+
+
+
+ ${project.build.directory}/classes
+
+
+
+
+
+
+ lib
+ false
+ false
+ runtime
+
+ net.tirasa.connid:connector-framework
+ net.tirasa.connid:connector-framework-internal
+
+
+
+
diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java
new file mode 100644
index 0000000..7533f41
--- /dev/null
+++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConfiguration.java
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.tirasa.connid.bundles.kafka;
+
+import net.tirasa.connid.bundles.kafka.serialization.SyncDeltaSerializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.identityconnectors.common.StringUtil;
+import org.identityconnectors.framework.common.exceptions.ConfigurationException;
+import org.identityconnectors.framework.common.objects.ObjectClass;
+import org.identityconnectors.framework.spi.AbstractConfiguration;
+import org.identityconnectors.framework.spi.ConfigurationProperty;
+
+public class KafkaConfiguration extends AbstractConfiguration {
+
+ private String bootstrapServers;
+
+ private String clientId;
+
+ private String consumerGroupId;
+
+ private String autoOffsetReset = "earliest";
+
+ private String valueSerializerClassName = SyncDeltaSerializer.class.getName();
+
+ private String valueDeserializerClassName = StringDeserializer.class.getName();
+
+ private String accountTopic = ObjectClass.ACCOUNT_NAME;
+
+ private String groupTopic = ObjectClass.GROUP_NAME;
+
+ private String allTopic = ObjectClass.ALL_NAME;
+
+ private long consumerPollMillis = 100;
+
+ @ConfigurationProperty(displayMessageKey = "bootstrapServers.display",
+ helpMessageKey = "bootstrapServers.help", required = true, order = 1)
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(final String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "clientId.display",
+ helpMessageKey = "clientId.help", required = true, order = 2)
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(final String clientId) {
+ this.clientId = clientId;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "consumerGroupId.display",
+ helpMessageKey = "consumerGroupId.help", required = true, order = 3)
+ public String getConsumerGroupId() {
+ return consumerGroupId;
+ }
+
+ public void setConsumerGroupId(final String consumerGroupId) {
+ this.consumerGroupId = consumerGroupId;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "autoOffsetReset.display",
+ helpMessageKey = "autoOffsetReset.help", required = true, order = 4,
+ allowedValues = { "earliest", "latest", "none" })
+ public String getAutoOffsetReset() {
+ return autoOffsetReset;
+ }
+
+ public void setAutoOffsetReset(final String autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "valueSerializerClassName.display",
+ helpMessageKey = "valueSerializerClassName.help", required = true, order = 5)
+ public String getValueSerializerClassName() {
+ return valueSerializerClassName;
+ }
+
+ public void setValueSerializerClassName(final String valueSerializerClassName) {
+ this.valueSerializerClassName = valueSerializerClassName;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "valueDeserializerClassName.display",
+ helpMessageKey = "valueDeserializerClassName.help", required = true, order = 6)
+ public String getValueDeserializerClassName() {
+ return valueDeserializerClassName;
+ }
+
+ public void setValueDeserializerClassName(final String valueDeserializerClassName) {
+ this.valueDeserializerClassName = valueDeserializerClassName;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "accountTopic.display",
+ helpMessageKey = "accountTopic.help", order = 7)
+ public String getAccountTopic() {
+ return accountTopic;
+ }
+
+ public void setAccountTopic(final String accountTopic) {
+ this.accountTopic = accountTopic;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "groupTopic.display",
+ helpMessageKey = "groupTopic.help", order = 8)
+ public String getGroupTopic() {
+ return groupTopic;
+ }
+
+ public void setGroupTopic(final String groupTopic) {
+ this.groupTopic = groupTopic;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "allTopic.display",
+ helpMessageKey = "allTopic.help", order = 9)
+ public String getAllTopic() {
+ return allTopic;
+ }
+
+ public void setAllTopic(final String allTopic) {
+ this.allTopic = allTopic;
+ }
+
+ @ConfigurationProperty(displayMessageKey = "consumerPollMillis.display",
+ helpMessageKey = "consumerPollMillis.help", order = 10)
+ public long getConsumerPollMillis() {
+ return consumerPollMillis;
+ }
+
+ public void setConsumerPollMillis(final long consumerPollMillis) {
+ this.consumerPollMillis = consumerPollMillis;
+ }
+
+ @Override
+ public void validate() {
+ if (StringUtil.isBlank(bootstrapServers)) {
+ throw new ConfigurationException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ }
+ if (StringUtil.isBlank(clientId)) {
+ throw new ConfigurationException("Missing " + ProducerConfig.CLIENT_ID_CONFIG);
+ }
+ if (StringUtil.isBlank(consumerGroupId)) {
+ throw new ConfigurationException("Missing " + ConsumerConfig.GROUP_ID_CONFIG);
+ }
+ if (StringUtil.isBlank(autoOffsetReset)) {
+ throw new ConfigurationException("Missing " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ }
+ if (StringUtil.isBlank(valueSerializerClassName)) {
+ throw new ConfigurationException("Missing " + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+ if (StringUtil.isBlank(valueDeserializerClassName)) {
+ throw new ConfigurationException("Missing " + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+ }
+}
diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java
new file mode 100644
index 0000000..c86e58c
--- /dev/null
+++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java
@@ -0,0 +1,306 @@
+/**
+ * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.tirasa.connid.bundles.kafka;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import net.tirasa.connid.bundles.kafka.serialization.AttributeDeserializer;
+import net.tirasa.connid.bundles.kafka.serialization.AttributeSerializer;
+import net.tirasa.connid.bundles.kafka.serialization.GuardedStringDeserializer;
+import net.tirasa.connid.bundles.kafka.serialization.GuardedStringSerializer;
+import net.tirasa.connid.bundles.kafka.serialization.SyncTokenDeserializer;
+import net.tirasa.connid.bundles.kafka.serialization.SyncTokenSerializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.identityconnectors.common.logging.Log;
+import org.identityconnectors.common.security.GuardedString;
+import org.identityconnectors.framework.api.operations.APIOperation;
+import org.identityconnectors.framework.common.exceptions.ConnectionFailedException;
+import org.identityconnectors.framework.common.exceptions.ConnectorException;
+import org.identityconnectors.framework.common.exceptions.PreconditionFailedException;
+import org.identityconnectors.framework.common.objects.Attribute;
+import org.identityconnectors.framework.common.objects.AttributeBuilder;
+import org.identityconnectors.framework.common.objects.AttributeUtil;
+import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
+import org.identityconnectors.framework.common.objects.Name;
+import org.identityconnectors.framework.common.objects.ObjectClass;
+import org.identityconnectors.framework.common.objects.ObjectClassInfo;
+import org.identityconnectors.framework.common.objects.OperationOptionInfo;
+import org.identityconnectors.framework.common.objects.OperationOptions;
+import org.identityconnectors.framework.common.objects.Schema;
+import org.identityconnectors.framework.common.objects.SyncDelta;
+import org.identityconnectors.framework.common.objects.SyncDeltaBuilder;
+import org.identityconnectors.framework.common.objects.SyncDeltaType;
+import org.identityconnectors.framework.common.objects.SyncResultsHandler;
+import org.identityconnectors.framework.common.objects.SyncToken;
+import org.identityconnectors.framework.common.objects.Uid;
+import org.identityconnectors.framework.spi.Configuration;
+import org.identityconnectors.framework.spi.Connector;
+import org.identityconnectors.framework.spi.ConnectorClass;
+import org.identityconnectors.framework.spi.operations.CreateOp;
+import org.identityconnectors.framework.spi.operations.DeleteOp;
+import org.identityconnectors.framework.spi.operations.SchemaOp;
+import org.identityconnectors.framework.spi.operations.SyncOp;
+import org.identityconnectors.framework.spi.operations.TestOp;
+import org.identityconnectors.framework.spi.operations.UpdateOp;
+
+@ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display")
+public class KafkaConnector
+ implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, SyncOp, TestOp {
+
+ private static final Log LOG = Log.getLog(KafkaConnector.class);
+
+ public static final JsonMapper MAPPER;
+
+ static {
+ SimpleModule pojoModule = new SimpleModule("KafkaConnectorModule", new Version(1, 0, 0, null, null, null));
+ pojoModule.addSerializer(GuardedString.class, new GuardedStringSerializer());
+ pojoModule.addSerializer(Attribute.class, new AttributeSerializer());
+ pojoModule.addSerializer(SyncToken.class, new SyncTokenSerializer());
+ pojoModule.addDeserializer(GuardedString.class, new GuardedStringDeserializer());
+ pojoModule.addDeserializer(Attribute.class, new AttributeDeserializer());
+ pojoModule.addDeserializer(SyncToken.class, new SyncTokenDeserializer());
+
+ MAPPER = JsonMapper.builder().
+ addModule(pojoModule).
+ addModule(new JavaTimeModule()).
+ disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).
+ visibility(PropertyAccessor.ALL, Visibility.NONE).
+ visibility(PropertyAccessor.FIELD, Visibility.ANY).
+ build();
+ }
+
+ private KafkaConfiguration configuration;
+
+ private KafkaProducer producer;
+
+ @Override
+ public KafkaConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void init(final Configuration configuration) {
+ this.configuration = (KafkaConfiguration) configuration;
+ this.configuration.validate();
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configuration.getBootstrapServers());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, this.configuration.getClientId());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.configuration.getValueSerializerClassName());
+ try {
+ producer = new KafkaProducer<>(props);
+ } catch (KafkaException e) {
+ LOG.error(e, "While creating Kafka producer");
+ throw new ConnectionFailedException(e);
+ }
+
+ LOG.ok("Connector {0} successfully inited", getClass().getName());
+ }
+
+ @Override
+ public void dispose() {
+ try {
+ Optional.ofNullable(producer).ifPresent(KafkaProducer::close);
+ } catch (KafkaException e) {
+ LOG.error(e, "While closing Kafka producer");
+ throw new ConnectorException(e);
+ }
+ }
+
+ private KafkaConsumer createConsumer(final ObjectClass objectClass) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configuration.getBootstrapServers());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, this.configuration.getClientId());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getConsumerGroupId());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.configuration.getAutoOffsetReset());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.configuration.getValueDeserializerClassName());
+
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(List.of(getTopic(objectClass)));
+ return consumer;
+ }
+
+ @Override
+ public void test() {
+ if (producer == null) {
+ throw new ConnectorException("No Kafka producer configured");
+ }
+ try {
+ producer.clientInstanceId(Duration.ofSeconds(10));
+ } catch (Exception e) {
+ LOG.error(e, "While testing Kafka producer");
+ throw new ConnectionFailedException(e);
+ }
+
+ try (KafkaConsumer consumer = createConsumer(ObjectClass.ACCOUNT)) {
+ consumer.clientInstanceId(Duration.ofSeconds(10));
+ } catch (Exception e) {
+ LOG.error(e, "While testing Kafka consumer");
+ throw new ConnectionFailedException(e);
+ }
+ }
+
+ @Override
+ public Schema schema() {
+ return new Schema(
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections., Set>emptyMap(),
+ Collections., Set>emptyMap());
+ }
+
+ private String getTopic(final ObjectClass objectClass) {
+ if (ObjectClass.ACCOUNT.equals(objectClass)) {
+ return configuration.getAccountTopic();
+ }
+ if (ObjectClass.GROUP.equals(objectClass)) {
+ return configuration.getGroupTopic();
+ }
+ if (ObjectClass.ALL.equals(objectClass)) {
+ return configuration.getAllTopic();
+ }
+ throw new PreconditionFailedException("Unsupported object class: " + objectClass.getObjectClassValue());
+ }
+
+ @Override
+ public Uid create(
+ final ObjectClass objectClass,
+ final Set createAttributes,
+ final OperationOptions options) {
+
+ Uid uid = new Uid(Optional.ofNullable(AttributeUtil.getNameFromAttributes(createAttributes)).
+ orElseThrow(() -> new ConnectorException(Name.NAME + " not found in create attributes")).
+ getNameValue());
+
+ SyncDelta syncDelta = new SyncDeltaBuilder().
+ setDeltaType(SyncDeltaType.CREATE).
+ setObjectClass(objectClass).
+ setUid(uid).
+ setObject(new ConnectorObjectBuilder().addAttributes(createAttributes).setUid(uid).build()).
+ setToken(new SyncToken(System.currentTimeMillis())).
+ build();
+ try {
+ producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta));
+ } catch (Exception e) {
+ throw new ConnectorException("Could not send the create event to " + getTopic(objectClass), e);
+ }
+
+ return uid;
+ }
+
+ @Override
+ public Uid update(
+ final ObjectClass objectClass,
+ final Uid uid,
+ final Set replaceAttributes,
+ final OperationOptions options) {
+
+ SyncDelta syncDelta = new SyncDeltaBuilder().
+ setDeltaType(SyncDeltaType.UPDATE).
+ setObjectClass(objectClass).
+ setUid(uid).
+ setObject(new ConnectorObjectBuilder().addAttributes(replaceAttributes).setUid(uid).build()).
+ setToken(new SyncToken(System.currentTimeMillis())).
+ build();
+ try {
+ producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta));
+ } catch (Exception e) {
+ throw new ConnectorException("Could not send the update event to " + getTopic(objectClass), e);
+ }
+
+ return uid;
+ }
+
+ @Override
+ public void delete(
+ final ObjectClass objectClass,
+ final Uid uid,
+ final OperationOptions options) {
+
+ SyncDelta syncDelta = new SyncDeltaBuilder().
+ setDeltaType(SyncDeltaType.DELETE).
+ setObjectClass(objectClass).
+ setUid(uid).
+ setToken(new SyncToken(System.currentTimeMillis())).
+ build();
+ try {
+ producer.send(new ProducerRecord<>(getTopic(objectClass), syncDelta));
+ } catch (Exception e) {
+ throw new ConnectorException("Could not send the delete event to " + getTopic(objectClass), e);
+ }
+ }
+
+ @Override
+ public void sync(
+ final ObjectClass objectClass,
+ final SyncToken token,
+ final SyncResultsHandler handler,
+ final OperationOptions options) {
+
+ try (KafkaConsumer consumer = createConsumer(objectClass)) {
+ consumer.poll(Duration.ofMillis(configuration.getConsumerPollMillis())).forEach(record -> {
+ LOG.ok("Processing {0} for topic {1}", record.key(), record.topic());
+
+ Uid uid = new Uid(Optional.ofNullable(record.key()).orElseGet(() -> UUID.randomUUID().toString()));
+ Map headers = new HashMap<>();
+ record.headers().forEach(header -> headers.put(header.key(), new String(header.value())));
+
+ handler.handle(new SyncDeltaBuilder().
+ setDeltaType(SyncDeltaType.CREATE_OR_UPDATE).
+ setObjectClass(objectClass).
+ setUid(uid).
+ setObject(new ConnectorObjectBuilder().
+ addAttribute(new Name(uid.getUidValue())).
+ addAttribute(AttributeBuilder.build("record.headers", headers)).
+ addAttribute(AttributeBuilder.build("record.value", record.value())).
+ setUid(uid).
+ build()).
+ setToken(new SyncToken(record.timestamp())).
+ build());
+ });
+ } catch (Exception e) {
+ throw new ConnectorException("While polling events from " + getTopic(objectClass), e);
+ }
+ }
+
+ @Override
+ public SyncToken getLatestSyncToken(final ObjectClass objectClass) {
+ return new SyncToken(System.currentTimeMillis());
+ }
+}
diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java
new file mode 100644
index 0000000..ca97a91
--- /dev/null
+++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/AbstractValueDeserializer.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.tirasa.connid.bundles.kafka.serialization;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import org.identityconnectors.common.StringUtil;
+import org.identityconnectors.common.security.GuardedString;
+
+public abstract class AbstractValueDeserializer extends JsonDeserializer {
+
+ protected List