diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java index 197ae9b..8fcf33f 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java @@ -33,8 +33,10 @@ import java.util.UUID; import net.tirasa.connid.bundles.kafka.serialization.AttributeDeserializer; import net.tirasa.connid.bundles.kafka.serialization.AttributeSerializer; +import net.tirasa.connid.bundles.kafka.serialization.ConnectorObjectDeserializer; import net.tirasa.connid.bundles.kafka.serialization.GuardedStringDeserializer; import net.tirasa.connid.bundles.kafka.serialization.GuardedStringSerializer; +import net.tirasa.connid.bundles.kafka.serialization.SyncDeltaJacksonDeserializer; import net.tirasa.connid.bundles.kafka.serialization.SyncTokenDeserializer; import net.tirasa.connid.bundles.kafka.serialization.SyncTokenSerializer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -54,6 +56,7 @@ import org.identityconnectors.framework.common.objects.Attribute; import org.identityconnectors.framework.common.objects.AttributeBuilder; import org.identityconnectors.framework.common.objects.AttributeUtil; +import org.identityconnectors.framework.common.objects.ConnectorObject; import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; import org.identityconnectors.framework.common.objects.LiveSyncDeltaBuilder; import org.identityconnectors.framework.common.objects.LiveSyncResultsHandler; @@ -94,6 +97,8 @@ public class KafkaConnector pojoModule.addDeserializer(GuardedString.class, new GuardedStringDeserializer()); pojoModule.addDeserializer(Attribute.class, new AttributeDeserializer()); pojoModule.addDeserializer(SyncToken.class, new SyncTokenDeserializer()); + pojoModule.addDeserializer(ConnectorObject.class, new ConnectorObjectDeserializer()); + pojoModule.addDeserializer(SyncDelta.class, new SyncDeltaJacksonDeserializer()); MAPPER = JsonMapper.builder(). addModule(pojoModule). diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/ConnectorObjectDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/ConnectorObjectDeserializer.java new file mode 100644 index 0000000..d873933 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/ConnectorObjectDeserializer.java @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import java.util.Map; +import org.identityconnectors.framework.common.objects.ConnectorObject; +import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; +import org.identityconnectors.framework.common.objects.ObjectClass; + +public class ConnectorObjectDeserializer extends JsonDeserializer { + + private static final AttributeDeserializer ATTR_DESERIALIZER = new AttributeDeserializer(); + + @Override + public ConnectorObject deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException { + ObjectNode tree = jp.readValueAsTree(); + + ConnectorObjectBuilder builder = new ConnectorObjectBuilder(); + + if (tree.has("objectClass")) { + builder.setObjectClass(new ObjectClass(tree.get("objectClass").get("type").asText())); + } + + if (tree.has("attributeMap")) { + JsonParser parser = tree.get("attributeMap").traverse(); + parser.setCodec(jp.getCodec()); + Map attributeMap = parser.readValueAs(new TypeReference>() { + }); + + for (ObjectNode attribute : attributeMap.values()) { + JsonParser p = attribute.traverse(); + p.setCodec(jp.getCodec()); + builder.addAttribute(ATTR_DESERIALIZER.deserialize(p, ctx)); + } + } + + return builder.build(); + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java index e192689..2b60540 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/GuardedStringDeserializer.java @@ -41,9 +41,7 @@ public class GuardedStringDeserializer extends JsonDeserializer { private static final String LOG_ERROR_MESSAGE = "Could not set field value to {}"; @Override - public GuardedString deserialize(final JsonParser jp, final DeserializationContext ctx) - throws IOException { - + public GuardedString deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException { ObjectNode tree = jp.readValueAsTree(); boolean readOnly = false; @@ -95,5 +93,4 @@ public GuardedString deserialize(final JsonParser jp, final DeserializationConte return dest; } - } diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaJacksonDeserializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaJacksonDeserializer.java new file mode 100644 index 0000000..da00807 --- /dev/null +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaJacksonDeserializer.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.SyncDelta; +import org.identityconnectors.framework.common.objects.SyncDeltaBuilder; +import org.identityconnectors.framework.common.objects.SyncDeltaType; +import org.identityconnectors.framework.common.objects.Uid; + +public class SyncDeltaJacksonDeserializer extends JsonDeserializer { + + private static final AttributeDeserializer ATTR_DESERIALIZER = new AttributeDeserializer(); + + private static final SyncTokenDeserializer SYNC_TOKEN_DESERIALIZER = new SyncTokenDeserializer(); + + private static final ConnectorObjectDeserializer CONNECTOR_OBJECT_DESERIALIZER = new ConnectorObjectDeserializer(); + + @Override + public SyncDelta deserialize(final JsonParser jp, final DeserializationContext ctx) throws IOException { + ObjectNode tree = jp.readValueAsTree(); + + SyncDeltaBuilder builder = new SyncDeltaBuilder(); + + if (tree.has("objectClass")) { + builder.setObjectClass(new ObjectClass(tree.get("objectClass").get("type").asText())); + } + + if (tree.has("uid")) { + JsonParser parser = tree.get("uid").traverse(); + parser.setCodec(jp.getCodec()); + builder.setUid((Uid) ATTR_DESERIALIZER.deserialize(parser, ctx)); + } + + if (tree.has("previousUid") && !tree.get("previousUid").isNull()) { + JsonParser parser = tree.get("previousUid").traverse(); + parser.setCodec(jp.getCodec()); + builder.setPreviousUid((Uid) ATTR_DESERIALIZER.deserialize(parser, ctx)); + } + + if (tree.has("token")) { + JsonParser parser = tree.get("token").traverse(); + parser.setCodec(jp.getCodec()); + builder.setToken(SYNC_TOKEN_DESERIALIZER.deserialize(parser, ctx)); + } + + if (tree.has("deltaType")) { + builder.setDeltaType(SyncDeltaType.valueOf(tree.get("deltaType").asText())); + } + + if (tree.has("object")) { + JsonParser parser = tree.get("object").traverse(); + parser.setCodec(jp.getCodec()); + builder.setObject(CONNECTOR_OBJECT_DESERIALIZER.deserialize(parser, ctx)); + } + + return builder.build(); + } +} diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java index 82f47d7..b92cb9b 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/serialization/SyncTokenSerializer.java @@ -53,5 +53,4 @@ public void serialize(final SyncToken source, final JsonGenerator jgen, final Se jgen.writeEndObject(); } - } diff --git a/src/test/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerDesTests.java b/src/test/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerDesTests.java new file mode 100644 index 0000000..90d83bb --- /dev/null +++ b/src/test/java/net/tirasa/connid/bundles/kafka/serialization/SyncDeltaSerDesTests.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2024 ConnId (connid-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.tirasa.connid.bundles.kafka.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Set; +import java.util.UUID; +import org.identityconnectors.framework.common.objects.AttributeBuilder; +import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; +import org.identityconnectors.framework.common.objects.Name; +import org.identityconnectors.framework.common.objects.ObjectClass; +import org.identityconnectors.framework.common.objects.SyncDelta; +import org.identityconnectors.framework.common.objects.SyncDeltaBuilder; +import org.identityconnectors.framework.common.objects.SyncDeltaType; +import org.identityconnectors.framework.common.objects.SyncToken; +import org.identityconnectors.framework.common.objects.Uid; +import org.junit.jupiter.api.Test; + +class SyncDeltaSerDesTests { + + @Test + void serdes() { + Uid uid = new Uid(UUID.randomUUID().toString()); + SyncDelta syncDelta = new SyncDeltaBuilder(). + setDeltaType(SyncDeltaType.CREATE). + setObjectClass(ObjectClass.ACCOUNT). + setUid(uid). + setObject(new ConnectorObjectBuilder().addAttributes(Set.of( + new Name("name"), + AttributeBuilder.build("email", "connid-dev@googlegroups.com"))). + setUid(uid).build()). + setToken(new SyncToken(System.currentTimeMillis())). + build(); + + byte[] serialized = new SyncDeltaSerializer().serialize("topic", syncDelta); + SyncDelta deserialized = new SyncDeltaDeserializer().deserialize("topic", serialized); + assertEquals(syncDelta, deserialized); + } +}