diff --git a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java index 8fcf33f..4db7a99 100644 --- a/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java +++ b/src/main/java/net/tirasa/connid/bundles/kafka/KafkaConnector.java @@ -54,7 +54,6 @@ import org.identityconnectors.framework.common.exceptions.ConnectorException; import org.identityconnectors.framework.common.exceptions.PreconditionFailedException; import org.identityconnectors.framework.common.objects.Attribute; -import org.identityconnectors.framework.common.objects.AttributeBuilder; import org.identityconnectors.framework.common.objects.AttributeUtil; import org.identityconnectors.framework.common.objects.ConnectorObject; import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder; @@ -72,8 +71,8 @@ import org.identityconnectors.framework.common.objects.SyncToken; import org.identityconnectors.framework.common.objects.Uid; import org.identityconnectors.framework.spi.Configuration; -import org.identityconnectors.framework.spi.Connector; import org.identityconnectors.framework.spi.ConnectorClass; +import org.identityconnectors.framework.spi.PoolableConnector; import org.identityconnectors.framework.spi.operations.CreateOp; import org.identityconnectors.framework.spi.operations.DeleteOp; import org.identityconnectors.framework.spi.operations.LiveSyncOp; @@ -83,7 +82,7 @@ @ConnectorClass(configurationClass = KafkaConfiguration.class, displayNameKey = "kafka.connector.display") public class KafkaConnector - implements Connector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp { + implements PoolableConnector, CreateOp, UpdateOp, DeleteOp, SchemaOp, LiveSyncOp, TestOp { private static final Log LOG = Log.getLog(KafkaConnector.class); @@ -182,6 +181,11 @@ public void test() { } } + @Override + public void checkAlive() { + test(); + } + @Override public Schema schema() { return new Schema( @@ -286,18 +290,23 @@ public void livesync( Map headers = new HashMap<>(); record.headers().forEach(header -> headers.put(header.key(), new String(header.value()))); - handler.handle(new LiveSyncDeltaBuilder(). - setObjectClass(objectClass). - setUid(uid). - setObject(new ConnectorObjectBuilder(). - addAttribute(new Name(uid.getUidValue())). - addAttribute(AttributeBuilder.build("record.timestamp", record.timestamp())). - addAttribute(AttributeBuilder.build("record.headers", headers)). - addAttribute(AttributeBuilder.build("record.value", record.value())). - setUid(uid). - build()). - build()); + try { + ConnectorObjectBuilder object = new ConnectorObjectBuilder(). + setObjectClass(objectClass). + setUid(uid). + addAttribute(new Name(uid.getUidValue())). + addAttribute("record.timestamp", record.timestamp()). + addAttribute("record.value", record.value()); + if (!headers.isEmpty()) { + object.addAttribute("record.headers", headers); + } + + handler.handle(new LiveSyncDeltaBuilder().setObject(object.build()).build()); + } catch (Exception e) { + LOG.error(e, "While processing {0}", record); + } }); + consumer.commitSync(); } catch (Exception e) { throw new ConnectorException("While polling events from " + getTopic(objectClass), e); } diff --git a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java index 40f43cb..7bebc8b 100644 --- a/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java +++ b/src/test/java/net/tirasa/connid/bundles/kafka/KafkaConnectorTests.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -132,6 +133,8 @@ void livesync() throws InterruptedException, ExecutionException { return !deltas.isEmpty(); }); assertEquals(1, deltas.size()); + assertNotNull(deltas.get(0).getObject().getAttributeByName("record.timestamp").getValue().get(0)); + assertNull(deltas.get(0).getObject().getAttributeByName("record.headers")); assertEquals(value, deltas.get(0).getObject().getAttributeByName("record.value").getValue().get(0)); } }