Skip to content

Commit

Permalink
feat: untested ContextSource For query entities
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasBousselin committed Dec 12, 2024
1 parent 2b6e2d7 commit 24c0cd4
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 58 deletions.
3 changes: 3 additions & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand All @@ -30,6 +32,7 @@
<ID>LongParameterList:TemporalEntityHandler.kt$TemporalEntityHandler$( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @PathVariable attrId: String, @PathVariable instanceId: URI, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters(notImplemented = [QP.LOCAL, QP.VIA]) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; )</ID>
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SpacingBetweenPackageAndImports:ContextSourceCaller.kt$ </ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
</CurrentIssues>
</SmellBaseline>
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@ import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val type: String? = null,
val idPattern: String? = null,
val csf: String? = null,
) {
constructor(ids: Set<URI> = emptySet(), operations: List<Operation>) :
constructor(
ids: Set<URI> = emptySet(),
type: String? = null,
idPattern: String? = null,
operations: List<Operation>
) :
this(
ids = ids,
type = type,
idPattern = idPattern,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,83 @@ import com.egm.stellio.search.csr.model.NGSILDWarning
import com.egm.stellio.search.csr.model.RevalidationFailedWarning
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.queryparameter.QueryParameter
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap
import com.egm.stellio.shared.util.RESULTS_COUNT_HEADER
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.core.codec.DecodingException
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.util.CollectionUtils
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

typealias QueryEntityResponse = Pair<List<CompactedEntity>, Int?>
object ContextSourceCaller {
val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun retrieveContextSourceEntity(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
id: URI,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
val path = "/ngsi-ld/v1/entities/$id"

return kotlin.runCatching {
getDistributedInformation(httpHeaders, csr, path, params).bind().first?.deserializeAsMap().right()
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Error contacting CSR ${csr.id} at $path: ${e.message}")
logger.warn(e.stackTraceToString())
RevalidationFailedWarning(
"${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
).left()
}
)
}

suspend fun queryContextSourceEntities(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
count: Boolean,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, QueryEntityResponse> = either {
val path = "/ngsi-ld/v1/entities"

return kotlin.runCatching {
getDistributedInformation(httpHeaders, csr, path, params).bind().let {
(response, headers) ->
(response?.deserializeAsList() ?: emptyList()) to
if (count) headers.header(RESULTS_COUNT_HEADER).first().toInt() else null
}
.right()
}.fold(
onSuccess = { it },
onFailure = { e ->

logger.warn("Error contacting CSR ${csr.id} at $path: ${e.message}")
logger.warn(e.stackTraceToString())
RevalidationFailedWarning(
"${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
).left()
}
)
}

suspend fun getDistributedInformation(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
path: String,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
): Either<NGSILDWarning, Pair<String?, ClientResponse.Headers>> = either {
val uri = URI("${csr.endpoint}$path")

val queryParams = CollectionUtils.toMultiValueMap(params.toMutableMap())
Expand All @@ -52,19 +107,19 @@ object ContextSourceCaller {
}
.header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK))
return runCatching {
val (statusCode, response) = request
val (statusCode, response, headers) = request
.awaitExchange { response ->
response.statusCode() to response.awaitBodyOrNull<CompactedEntity>()
Triple(response.statusCode(), response.awaitBodyOrNull<String>(), response.headers())
}
when {
statusCode.is2xxSuccessful -> {
logger.info("Successfully received data from CSR ${csr.id} at $uri")
response.right()
(response to headers).right()
}

statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> {
logger.info("CSR returned 404 at $uri: $response")
null.right()
(null to headers).right()
}

else -> {
Expand All @@ -80,17 +135,11 @@ object ContextSourceCaller {
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
if (e is DecodingException) {
RevalidationFailedWarning(
"$uri returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
)
} else {
MiscellaneousWarning(
"Error connecting to $uri message : \"${e.cause}:${e.message}\"",
csr
)
}.left()
MiscellaneousWarning(
"Error connecting to $uri message : \"${e.cause}:${e.message}\"",
csr
)
.left()
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,33 @@ class ContextSourceRegistrationService(
${csrFilters.ids.joinToString(" OR ") { "'$it' ~ entity_info.idPattern" }}
)
""".trimIndent()
else "true"
else null

val typeFilter = if (!csrFilters.type.isNullOrBlank())
"""
(
entity_info.type is null OR
entity_info.type = '${csrFilters.type}'
)
""".trimIndent() else null

// we only filter on id since there is no easy way to know if two idPatterns overlap
// possible resources : https://meta.stackoverflow.com/questions/426313/canonical-for-overlapping-regex-questions
val idPatternFilter = if (!csrFilters.idPattern.isNullOrBlank())
"""
(
entity_info.id is null OR
entity_info.id ~ ('${csrFilters.idPattern}')
)
""".trimIndent()
else null

val csfFilter = if (csrFilters.csf != null && validationRegex.matches(csrFilters.csf)) {
val operations = operationRegex.toRegex().findAll(csrFilters.csf).map { it.groups[1]?.value }
"operations && ARRAY[${operations.joinToString(",") { "'$it'" }}]"
} else "true"
} else null

return """
$idFilter
AND
$csfFilter
""".trimMargin()
return listOfNotNull(idFilter, typeFilter, idPatternFilter, csfFilter).joinToString(" AND ")
}

private val rowToContextSourceRegistration: ((Map<String, Any>) -> ContextSourceRegistration) = { row ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,33 @@ import java.time.ZonedDateTime
import kotlin.random.Random.Default.nextBoolean

typealias CompactedEntityWithCSR = Pair<CompactedEntity, ContextSourceRegistration>
typealias CompactedEntitiesWithCSR = Pair<List<CompactedEntity>, ContextSourceRegistration>

typealias AttributeByDatasetId = Map<String?, CompactedAttributeInstance>
object ContextSourceUtils {

fun mergeEntitiesLists(
localEntities: List<CompactedEntity>,
remoteEntitiesWithCSR: List<CompactedEntitiesWithCSR>
): IorNel<NGSILDWarning, List<CompactedEntity>> {
val mergedEntityMap = localEntities.associateBy { it[JSONLD_ID_TERM] }.toMutableMap()

val warnings = remoteEntitiesWithCSR.mapNotNull { (entities, csr) ->
val test = either {
entities.forEach { entity ->
val id = entity[JSONLD_ID_TERM]
mergedEntityMap[id] = mergedEntityMap[id]
?.let { getMergeNewValues(it, entity, csr).bind() } ?: entity
}
null
}
test.leftOrNull()
}.toNonEmptyListOrNull()

val entities = mergedEntityMap.values.toList()
return if (warnings == null) Ior.Right(entities) else Ior.Both(warnings, entities)
}

fun mergeEntities(
localEntity: CompactedEntity?,
remoteEntitiesWithCSR: List<CompactedEntityWithCSR>
Expand Down Expand Up @@ -129,7 +153,7 @@ object ContextSourceUtils {
attribute.associateBy { it[NGSILD_DATASET_ID_TERM] as? String }.right()
}
else -> {
RevalidationFailedWarning(
RevalidationFailedWarning( // could be avoided if Json payload is validated beforehand
"The received payload is invalid. Attribute is nor List nor a Map : $attribute",
csr
).left()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ class EntityHandler(
@RequestHeader httpHeaders: HttpHeaders,
@AllowedParameters(
implemented = [
QP.OPTIONS, QP.FORMAT, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q,
QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q,
QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY,
QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID,
],
notImplemented = [QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA]
notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA]
)
@RequestParam queryParams: MultiValueMap<String, String>
): ResponseEntity<*> = either {
Expand All @@ -213,25 +213,76 @@ class EntityHandler(
).bind()
.validateMinimalQueryEntitiesParameters().bind()

val (entities, count) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind()
val csrFilters =
CSRFilters(
ids = entitiesQuery.ids,
idPattern = entitiesQuery.idPattern,
type = entitiesQuery.typeSelection,
operations = listOf(
Operation.QUERY_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_ENTITY,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

val localResponse = either {
val (entities, localCount) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind()

val filteredEntities = entities.filterAttributes(entitiesQuery.attrs, entitiesQuery.datasetId)

val filteredEntities = entities.filterAttributes(entitiesQuery.attrs, entitiesQuery.datasetId)
val compactedEntities =
compactEntities(filteredEntities, contexts).let {
linkedEntityService.processLinkedEntities(it, entitiesQuery, sub.getOrNull()).bind()
}
compactedEntities to localCount
}

val compactedEntities =
compactEntities(filteredEntities, contexts).let {
linkedEntityService.processLinkedEntities(it, entitiesQuery, sub.getOrNull()).bind()
val (warnings, remoteEntitiesWithCSR, remoteCounts) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.queryContextSourceEntities(
httpHeaders,
csr,
count = entitiesQuery.paginationQuery.count,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { (entities, count) -> Triple(entities, csr, count) }
}.separateEither()
.let { (warnings, response) ->
Triple(
warnings.toMutableList(),
response.map { (entities, csr, _) -> entities to csr },
response.map { (_, _, counts) -> counts }
)
}

// todo is it possible to have a non blocking error ? (like the 404 for the retrieve)
val (localEntities, localCount) = localResponse.bind()

val (mergeWarnings, mergedEntities) = ContextSourceUtils.mergeEntitiesLists(
localEntities,
remoteEntitiesWithCSR
).toPair()

mergeWarnings?.let { warnings.addAll(it) }

if (mergedEntities == null) {
val localError = localResponse.leftOrNull()
return localError!!.toErrorResponse().addWarnings(warnings)
}

val ngsiLdDataRepresentation = parseRepresentations(queryParams, mediaType)
buildQueryResponse(
compactedEntities.toFinalRepresentation(ngsiLdDataRepresentation),
count,
mergedEntities.toFinalRepresentation(ngsiLdDataRepresentation),
localCount,
"/ngsi-ld/v1/entities",
entitiesQuery.paginationQuery,
queryParams,
mediaType,
contexts
)
).addWarnings(warnings)
}.fold(
{ it.toErrorResponse() },
{ it }
Expand Down Expand Up @@ -264,7 +315,15 @@ class EntityHandler(
).bind()

val csrFilters =
CSRFilters(ids = setOf(entityId), operations = listOf(Operation.FEDERATION_OPS, Operation.RETRIEVE_ENTITY))
CSRFilters(
ids = setOf(entityId),
operations = listOf(
Operation.FEDERATION_OPS,
Operation.RETRIEVE_ENTITY,
Operation.RETRIEVE_ENTITY,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

Expand All @@ -280,10 +339,10 @@ class EntityHandler(

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
val (warnings, remoteEntitiesWithCSR) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.getDistributedInformation(
val response = ContextSourceCaller.retrieveContextSourceEntity(
httpHeaders,
csr,
"/ngsi-ld/v1/entities/$entityId",
entityId,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
Expand All @@ -293,7 +352,6 @@ class EntityHandler(
warnings.toMutableList() to maybeResponses.filterNotNull()
}

// we could simplify the code if we check the JsonPayload beforehand
val (mergeWarnings, mergedEntity) = ContextSourceUtils.mergeEntities(
localEntity.getOrNull(),
remoteEntitiesWithCSR
Expand Down
Loading

0 comments on commit 24c0cd4

Please sign in to comment.