diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java index c86e58c..881c411 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java @@ -55,6 +55,8 @@ import org.identityconnectors.framework.common.objects.AttributeBuilder; import org.identityconnectors.framework.common.objects.AttributeUtil; import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; +import org.identityconnectors.framework.common.objects.LiveSyncDeltaBuilder; +import org.identityconnectors.framework.common.objects.LiveSyncResultsHandler; import org.identityconnectors.framework.common.objects.Name; import org.identityconnectors.framework.common.objects.ObjectClass; import org.identityconnectors.framework.common.objects.ObjectClassInfo; @@ -64,7 +66,6 @@ import org.identityconnectors.framework.common.objects.SyncDelta; import org.identityconnectors.framework.common.objects.SyncDeltaBuilder; import org.identityconnectors.framework.common.objects.SyncDeltaType; -import org.identityconnectors.framework.common.objects.SyncResultsHandler; import org.identityconnectors.framework.common.objects.SyncToken; import org.identityconnectors.framework.common.objects.Uid; import org.identityconnectors.framework.spi.Configuration; @@ -72,14 +73,14 @@ import org.identityconnectors.framework.spi.ConnectorClass; import org.identityconnectors.framework.spi.operations.CreateOp; import org.identityconnectors.framework.spi.operations.DeleteOp; +import org.identityconnectors.framework.spi.operations.LiveSyncOp; import org.identityconnectors.framework.spi.operations.SchemaOp; -import org.identityconnectors.framework.spi.operations.SyncOp; import org.identityconnectors.framework.spi.operations.TestOp; import org.identityconnectors.framework.spi.operations.UpdateOp; @ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display") public class KafkaConnector - implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, SyncOp, TestOp { + implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp { private static final Log LOG = Log.getLog(KafkaConnector.class); @@ -267,10 +268,9 @@ public void delete( } @Override - public void sync( + public void livesync( final ObjectClass objectClass, - final SyncToken token, - final SyncResultsHandler handler, + final LiveSyncResultsHandler handler, final OperationOptions options) { try (KafkaConsumer consumer = createConsumer(objectClass)) { @@ -281,26 +281,20 @@ public void sync( Map headers = new HashMap<>(); record.headers().forEach(header -> headers.put(header.key(), new String(header.value()))); - handler.handle(new SyncDeltaBuilder(). - setDeltaType(SyncDeltaType.CREATE_OR_UPDATE). + handler.handle(new LiveSyncDeltaBuilder(). setObjectClass(objectClass). setUid(uid). setObject(new ConnectorObjectBuilder(). addAttribute(new Name(uid.getUidValue())). + addAttribute(AttributeBuilder.build("record.timestamp", record.timestamp())). addAttribute(AttributeBuilder.build("record.headers", headers)). addAttribute(AttributeBuilder.build("record.value", record.value())). setUid(uid). build()). - setToken(new SyncToken(record.timestamp())). build()); }); } catch (Exception e) { throw new ConnectorException("While polling events from " + getTopic(objectClass), e); } } - - @Override - public SyncToken getLatestSyncToken(final ObjectClass objectClass) { - return new SyncToken(System.currentTimeMillis()); - } } diff --git a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java index a60a66a..5136fe2 100644 --- a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java +++ b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java @@ -33,11 +33,10 @@ import org.identityconnectors.framework.api.APIConfiguration; import org.identityconnectors.framework.api.ConnectorFacade; import org.identityconnectors.framework.api.ConnectorFacadeFactory; +import org.identityconnectors.framework.common.objects.LiveSyncDelta; import org.identityconnectors.framework.common.objects.Name; import org.identityconnectors.framework.common.objects.ObjectClass; import org.identityconnectors.framework.common.objects.OperationOptionsBuilder; -import org.identityconnectors.framework.common.objects.SyncDelta; -import org.identityconnectors.framework.common.objects.SyncToken; import org.identityconnectors.framework.common.objects.Uid; import org.identityconnectors.test.common.TestHelpers; import org.junit.jupiter.api.Test; @@ -102,7 +101,7 @@ void delete() { } @Test - void sync() throws InterruptedException, ExecutionException { + void livesync() throws InterruptedException, ExecutionException { // create a producer and send a new event Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); @@ -115,12 +114,11 @@ void sync() throws InterruptedException, ExecutionException { producer.send(new ProducerRecord<>(ObjectClass.GROUP_NAME, value)).get(); } - // sync - List deltas = new ArrayList<>(); + // live sync + List deltas = new ArrayList<>(); await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { - newFacade().sync( + newFacade().livesync( ObjectClass.GROUP, - new SyncToken(""), deltas::add, new OperationOptionsBuilder().build()); return !deltas.isEmpty();