Skip to content

Commit

Permalink
fix(search): temporal attribute metadata extraction for ATTRIBUTE_UPD…
Browse files Browse the repository at this point in the history
…ATE events #390 (#391)

Co-authored-by: Benoit Orihuela <benoit.orihuela@eglobalmark.com>
  • Loading branch information
HoucemKacem and bobeal authored May 4, 2021
1 parent 4695e87 commit 46a9d53
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
4 changes: 3 additions & 1 deletion search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
<SmellBaseline>
<ManuallySuppressedIssues></ManuallySuppressedIssues>
<CurrentIssues>
<ID>LargeClass:EntityEventListenerServiceTest.kt$EntityEventListenerServiceTest</ID>
<ID>LargeClass:TemporalEntityHandlerTests.kt$TemporalEntityHandlerTests</ID>
<ID>LongMethod:ParameterizedTests.kt$ParameterizedTests.Companion$@JvmStatic fun rawResultsProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongParameterList:AttributeInstance.kt$AttributeInstance.Companion$( temporalEntityAttribute: UUID, instanceId: URI? = null, observedAt: ZonedDateTime, value: String? = null, measuredValue: Double? = null, payload: Map&lt;String, Any&gt; )</ID>
<ID>LongParameterList:EntityEventListenerService.kt$EntityEventListenerService$( entityId: URI, expandedAttributeName: String, datasetId: URI?, attributeValuesNode: JsonNode, updatedEntity: String, contexts: List&lt;String&gt; )</ID>
<ID>MaxLineLength:WebSecurityTestConfig.kt$WebSecurityTestConfig : WebSecurityConfig</ID>
<ID>ReturnCount:EntityEventListenerService.kt$EntityEventListenerService$internal fun toTemporalAttributeMedata(jsonNode: JsonNode): Validated&lt;String, AttributeMetadata&gt;</ID>
<ID>ReturnCount:EntityEventListenerService.kt$EntityEventListenerService$internal fun toTemporalAttributeMetadata(jsonNode: JsonNode): Validated&lt;String, AttributeMetadata&gt;</ID>
<ID>ReturnCount:TemporalEntityAttributeService.kt$TemporalEntityAttributeService$internal fun toTemporalAttributeMetadata( ngsiLdAttributeInstance: NgsiLdAttributeInstance ): Validated&lt;String, AttributeMetadata&gt;</ID>
<ID>ThrowsCount:TemporalEntityHandler.kt$internal fun buildTemporalQuery(params: MultiValueMap&lt;String, String&gt;, contextLink: String): TemporalQuery</ID>
<ID>UtilityClassWithPublicConstructor:ParameterizedTests.kt$ParameterizedTests</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.egm.stellio.shared.util.RECEIVED_NON_PARSEABLE_ENTITY
import com.egm.stellio.shared.util.extractAttributeInstanceFromCompactedEntity
import com.egm.stellio.shared.util.toUri
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
Expand Down Expand Up @@ -107,6 +108,7 @@ class EntityEventListenerService(
handleAttributeUpdate(
attributeReplaceEvent.entityId,
expandedAttributeName,
attributeReplaceEvent.datasetId,
attributeNode,
attributeReplaceEvent.updatedEntity,
attributeReplaceEvent.contexts
Expand All @@ -120,6 +122,7 @@ class EntityEventListenerService(
handleAttributeUpdate(
attributeUpdateEvent.entityId,
expandedAttributeName,
attributeUpdateEvent.datasetId,
attributeNode,
attributeUpdateEvent.updatedEntity,
attributeUpdateEvent.contexts
Expand All @@ -129,21 +132,37 @@ class EntityEventListenerService(
private fun handleAttributeUpdate(
entityId: URI,
expandedAttributeName: String,
datasetId: URI?,
attributeValuesNode: JsonNode,
updatedEntity: String,
contexts: List<String>
) {
// TODO add missing checks:
// - existence of temporal entity attribute
when (val extractedAttributeMetadata = toTemporalAttributeMedata(attributeValuesNode)) {

// return early to avoid extra processing if the attribute is not a temporal one
if (!attributeValuesNode.has("observedAt")) {
logger.info("Ignoring append event for $attributeValuesNode, it has no observedAt information")
return
}
val compactedJsonLdEntity = addContextsToEntity(JsonUtils.deserializeObject(updatedEntity), contexts)
val attributeInstancePayload = extractAttributeInstanceFromCompactedEntity(
compactedJsonLdEntity,
compactTerm(expandedAttributeName, contexts),
datasetId
)
// Since ATTRIBUTE_UPDATE events payload may not contain the attribute type
if (!attributeValuesNode.has("type")) {
(attributeValuesNode as ObjectNode).put("type", attributeInstancePayload["type"] as String)
}

when (val extractedAttributeMetadata = toTemporalAttributeMetadata(attributeValuesNode)) {
is Invalid -> {
logger.info(extractedAttributeMetadata.e)
return
}
is Valid -> {
val attributeMetadata = extractedAttributeMetadata.a
val compactedJsonLdEntity = addContextsToEntity(JsonUtils.deserializeObject(updatedEntity), contexts)

temporalEntityAttributeService.getForEntityAndAttribute(
entityId, expandedAttributeName, attributeMetadata.datasetId
).zipWhen {
Expand All @@ -152,12 +171,7 @@ class EntityEventListenerService(
observedAt = attributeMetadata.observedAt,
value = attributeMetadata.value,
measuredValue = attributeMetadata.measuredValue,
payload =
extractAttributeInstanceFromCompactedEntity(
compactedJsonLdEntity,
compactTerm(expandedAttributeName, contexts),
attributeMetadata.datasetId
)
payload = attributeInstancePayload
)
attributeInstanceService.create(attributeInstance)
.then(
Expand All @@ -182,7 +196,7 @@ class EntityEventListenerService(
updatedEntity: String,
contexts: List<String>
) {
when (val extractedAttributeMetadata = toTemporalAttributeMedata(attributeValuesNode)) {
when (val extractedAttributeMetadata = toTemporalAttributeMetadata(attributeValuesNode)) {
is Invalid -> {
logger.info(extractedAttributeMetadata.e)
return
Expand Down Expand Up @@ -229,10 +243,7 @@ class EntityEventListenerService(
}
}

internal fun toTemporalAttributeMedata(jsonNode: JsonNode): Validated<String, AttributeMetadata> {
if (!jsonNode.has("observedAt")) {
return "Ignoring append event for $jsonNode, it has no observedAt information".invalid()
}
internal fun toTemporalAttributeMetadata(jsonNode: JsonNode): Validated<String, AttributeMetadata> {
val attributeTypeAsText = jsonNode["type"].asText()
val attributeType = kotlin.runCatching {
TemporalEntityAttribute.AttributeType.valueOf(attributeTypeAsText)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,6 @@ class EntityEventListenerServiceTest {
}
""".trimIndent()

@Test
fun `it should return an invalid result if the attribute does not have an observedAt information`() {
val operationPayload =
"""
{
"type":"Property",
"value":33869
}
""".trimIndent()
val jsonNode = jacksonObjectMapper().readTree(operationPayload)
val result = entityEventListenerService.toTemporalAttributeMedata(jsonNode)
result.bimap(
{
assertEquals(
"Ignoring append event for {\"type\":\"Property\",\"value\":33869}, " +
"it has no observedAt information",
it
)
},
{
fail<String>("Expecting an invalid result, got a valid one: $it")
}
)
}

@Test
fun `it should return an invalid result if the attribute has an unsupported type`() {
val operationPayload =
Expand All @@ -135,7 +110,7 @@ class EntityEventListenerServiceTest {
}
""".trimIndent()
val jsonNode = jacksonObjectMapper().readTree(operationPayload)
val result = entityEventListenerService.toTemporalAttributeMedata(jsonNode)
val result = entityEventListenerService.toTemporalAttributeMetadata(jsonNode)
result.bimap(
{
assertEquals(
Expand All @@ -159,7 +134,7 @@ class EntityEventListenerServiceTest {
}
""".trimIndent()
val jsonNode = jacksonObjectMapper().readTree(operationPayload)
val result = entityEventListenerService.toTemporalAttributeMedata(jsonNode)
val result = entityEventListenerService.toTemporalAttributeMetadata(jsonNode)
result.bimap(
{
assertEquals(
Expand All @@ -186,7 +161,7 @@ class EntityEventListenerServiceTest {
}
""".trimIndent()
val jsonNode = jacksonObjectMapper().readTree(operationPayload)
val result = entityEventListenerService.toTemporalAttributeMedata(jsonNode)
val result = entityEventListenerService.toTemporalAttributeMetadata(jsonNode)
result.bimap(
{
fail<String>("Expecting a valid result, got an invalid one: $it")
Expand Down Expand Up @@ -554,6 +529,31 @@ class EntityEventListenerServiceTest {
verifyAndConfirmMockForMeasuredValue(temporalEntityAttributeUuid)
}

@Test
fun `it should create an attribute instance for ATTRIBUTE_UPDATE events with minimal fragment`() {
val eventPayload =
"""
{
\"value\":33869,
\"observedAt\":\"$observedAt\"
}
""".trimIndent()
val content = prepareAttributeEventPayload(
EventsType.ATTRIBUTE_UPDATE,
eventPayload,
updatedEntityNumericValue
)
val temporalEntityAttributeUuid = UUID.randomUUID()

every { temporalEntityAttributeService.getForEntityAndAttribute(any(), any()) } returns Mono.just(
temporalEntityAttributeUuid
)

entityEventListenerService.processMessage(content)

verifyAndConfirmMockForMeasuredValue(temporalEntityAttributeUuid)
}

@Test
fun `it should create an attribute instance for a relationship for ATTRIBUTE_UPDATE event`() {
val eventPayload =
Expand Down

0 comments on commit 46a9d53

Please sign in to comment.