Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgrosso committed Dec 18, 2024
1 parent d8721f4 commit 00211aa
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
37 changes: 23 additions & 14 deletions src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.identityconnectors.framework.common.exceptions.ConnectorException;
import org.identityconnectors.framework.common.exceptions.PreconditionFailedException;
import org.identityconnectors.framework.common.objects.Attribute;
import org.identityconnectors.framework.common.objects.AttributeBuilder;
import org.identityconnectors.framework.common.objects.AttributeUtil;
import org.identityconnectors.framework.common.objects.ConnectorObject;
import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
Expand All @@ -72,8 +71,8 @@
import org.identityconnectors.framework.common.objects.SyncToken;
import org.identityconnectors.framework.common.objects.Uid;
import org.identityconnectors.framework.spi.Configuration;
import org.identityconnectors.framework.spi.Connector;
import org.identityconnectors.framework.spi.ConnectorClass;
import org.identityconnectors.framework.spi.PoolableConnector;
import org.identityconnectors.framework.spi.operations.CreateOp;
import org.identityconnectors.framework.spi.operations.DeleteOp;
import org.identityconnectors.framework.spi.operations.LiveSyncOp;
Expand All @@ -83,7 +82,7 @@

@ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display")
public class KafkaConnector
implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp {
implements PoolableConnector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp {

private static final Log LOG = Log.getLog(KafkaConnector.class);

Expand Down Expand Up @@ -182,6 +181,11 @@ public void test() {
}
}

@Override
public void checkAlive() {
test();
}

@Override
public Schema schema() {
return new Schema(
Expand Down Expand Up @@ -286,18 +290,23 @@ public void livesync(
Map<String, String> headers = new HashMap<>();
record.headers().forEach(header -> headers.put(header.key(), new String(header.value())));

handler.handle(new LiveSyncDeltaBuilder().
setObjectClass(objectClass).
setUid(uid).
setObject(new ConnectorObjectBuilder().
addAttribute(new Name(uid.getUidValue())).
addAttribute(AttributeBuilder.build("record.timestamp", record.timestamp())).
addAttribute(AttributeBuilder.build("record.headers", headers)).
addAttribute(AttributeBuilder.build("record.value", record.value())).
setUid(uid).
build()).
build());
try {
ConnectorObjectBuilder object = new ConnectorObjectBuilder().
setObjectClass(objectClass).
setUid(uid).
addAttribute(new Name(uid.getUidValue())).
addAttribute("record.timestamp", record.timestamp()).
addAttribute("record.value", record.value());
if (!headers.isEmpty()) {
object.addAttribute("record.headers", headers);
}

handler.handle(new LiveSyncDeltaBuilder().setObject(object.build()).build());
} catch (Exception e) {
LOG.error(e, "While processing {0}", record);
}
});
consumer.commitSync();
} catch (Exception e) {
throw new ConnectorException("While polling events from " + getTopic(objectClass), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
Expand Down Expand Up @@ -132,6 +133,8 @@ void livesync() throws InterruptedException, ExecutionException {
return !deltas.isEmpty();
});
assertEquals(1, deltas.size());
assertNotNull(deltas.get(0).getObject().getAttributeByName("record.timestamp").getValue().get(0));
assertNull(deltas.get(0).getObject().getAttributeByName("record.headers"));
assertEquals(value, deltas.get(0).getObject().getAttributeByName("record.value").getValue().get(0));
}
}

0 comments on commit 00211aa

Please sign in to comment.