Skip to content

Commit

Permalink
Implementing live sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgrosso committed Nov 20, 2024
1 parent 07979bc commit a86ee41
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 21 deletions.
22 changes: 8 additions & 14 deletions src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.identityconnectors.framework.common.objects.AttributeBuilder;
import org.identityconnectors.framework.common.objects.AttributeUtil;
import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
import org.identityconnectors.framework.common.objects.LiveSyncDeltaBuilder;
import org.identityconnectors.framework.common.objects.LiveSyncResultsHandler;
import org.identityconnectors.framework.common.objects.Name;
import org.identityconnectors.framework.common.objects.ObjectClass;
import org.identityconnectors.framework.common.objects.ObjectClassInfo;
Expand All @@ -64,22 +66,21 @@
import org.identityconnectors.framework.common.objects.SyncDelta;
import org.identityconnectors.framework.common.objects.SyncDeltaBuilder;
import org.identityconnectors.framework.common.objects.SyncDeltaType;
import org.identityconnectors.framework.common.objects.SyncResultsHandler;
import org.identityconnectors.framework.common.objects.SyncToken;
import org.identityconnectors.framework.common.objects.Uid;
import org.identityconnectors.framework.spi.Configuration;
import org.identityconnectors.framework.spi.Connector;
import org.identityconnectors.framework.spi.ConnectorClass;
import org.identityconnectors.framework.spi.operations.CreateOp;
import org.identityconnectors.framework.spi.operations.DeleteOp;
import org.identityconnectors.framework.spi.operations.LiveSyncOp;
import org.identityconnectors.framework.spi.operations.SchemaOp;
import org.identityconnectors.framework.spi.operations.SyncOp;
import org.identityconnectors.framework.spi.operations.TestOp;
import org.identityconnectors.framework.spi.operations.UpdateOp;

@ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display")
public class KafkaConnector
implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, SyncOp, TestOp {
implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp {

private static final Log LOG = Log.getLog(KafkaConnector.class);

Expand Down Expand Up @@ -267,10 +268,9 @@ public void delete(
}

@Override
public void sync(
public void livesync(
final ObjectClass objectClass,
final SyncToken token,
final SyncResultsHandler handler,
final LiveSyncResultsHandler handler,
final OperationOptions options) {

try (KafkaConsumer<String, Object> consumer = createConsumer(objectClass)) {
Expand All @@ -281,26 +281,20 @@ public void sync(
Map<String, String> headers = new HashMap<>();
record.headers().forEach(header -> headers.put(header.key(), new String(header.value())));

handler.handle(new SyncDeltaBuilder().
setDeltaType(SyncDeltaType.CREATE_OR_UPDATE).
handler.handle(new LiveSyncDeltaBuilder().
setObjectClass(objectClass).
setUid(uid).
setObject(new ConnectorObjectBuilder().
addAttribute(new Name(uid.getUidValue())).
addAttribute(AttributeBuilder.build("record.timestamp", record.timestamp())).
addAttribute(AttributeBuilder.build("record.headers", headers)).
addAttribute(AttributeBuilder.build("record.value", record.value())).
setUid(uid).
build()).
setToken(new SyncToken(record.timestamp())).
build());
});
} catch (Exception e) {
throw new ConnectorException("While polling events from " + getTopic(objectClass), e);
}
}

@Override
public SyncToken getLatestSyncToken(final ObjectClass objectClass) {
return new SyncToken(System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
import org.identityconnectors.framework.api.APIConfiguration;
import org.identityconnectors.framework.api.ConnectorFacade;
import org.identityconnectors.framework.api.ConnectorFacadeFactory;
import org.identityconnectors.framework.common.objects.LiveSyncDelta;
import org.identityconnectors.framework.common.objects.Name;
import org.identityconnectors.framework.common.objects.ObjectClass;
import org.identityconnectors.framework.common.objects.OperationOptionsBuilder;
import org.identityconnectors.framework.common.objects.SyncDelta;
import org.identityconnectors.framework.common.objects.SyncToken;
import org.identityconnectors.framework.common.objects.Uid;
import org.identityconnectors.test.common.TestHelpers;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -102,7 +101,7 @@ void delete() {
}

@Test
void sync() throws InterruptedException, ExecutionException {
void livesync() throws InterruptedException, ExecutionException {
// create a producer and send a new event
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
Expand All @@ -115,12 +114,11 @@ void sync() throws InterruptedException, ExecutionException {
producer.send(new ProducerRecord<>(ObjectClass.GROUP_NAME, value)).get();
}

// sync
List<SyncDelta> deltas = new ArrayList<>();
// live sync
List<LiveSyncDelta> deltas = new ArrayList<>();
await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
newFacade().sync(
newFacade().livesync(
ObjectClass.GROUP,
new SyncToken(""),
deltas::add,
new OperationOptionsBuilder().build());
return !deltas.isEmpty();
Expand Down

0 comments on commit a86ee41

Please sign in to comment.