Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/temporal pagination #1179

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,25 @@ import java.time.ZonedDateTime

const val WHOLE_TIME_RANGE_DURATION = "PT0S"

const val TIMEREL_PARAM = "timerel" // do i put them in TemporalQuery companion object?
const val TIMEAT_PARAM = "timeAt"
const val ENDTIMEAT_PARAM = "endTimeAt"
const val AGGRPERIODDURATION_PARAM = "aggrPeriodDuration"
const val AGGRMETHODS_PARAM = "aggrMethods"
const val LASTN_PARAM = "lastN"
const val TIMEPROPERTY_PARAM = "timeproperty"

data class TemporalQuery(
val timerel: Timerel? = null,
val timeAt: ZonedDateTime? = null,
val endTimeAt: ZonedDateTime? = null,
val aggrPeriodDuration: String? = null,
val aggrMethods: List<Aggregate>? = null,
val lastN: Int? = null,
val asLastN: Boolean,
val limit: Int,
val timeproperty: AttributeInstance.TemporalProperty = AttributeInstance.TemporalProperty.OBSERVED_AT
) {

enum class Timerel {
BEFORE,
AFTER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ScopeService(
): Either<APIException, List<ScopeInstanceResult>> {
val temporalQuery = temporalEntitiesQuery.temporalQuery
val sqlQueryBuilder = StringBuilder()

val limit = temporalQuery.limit
sqlQueryBuilder.append(composeSearchSelectStatement(temporalEntitiesQuery, origin))

sqlQueryBuilder.append(
Expand All @@ -106,17 +106,21 @@ class ScopeService(
TemporalQuery.Timerel.BETWEEN -> sqlQueryBuilder.append(
" AND time > '${temporalQuery.timeAt}' AND time < '${temporalQuery.endTimeAt}'"
)

null -> Unit
}

if (temporalEntitiesQuery.isAggregatedWithDefinedDuration())
sqlQueryBuilder.append(" GROUP BY entity_id, start")
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY entity_id")
if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
if (temporalQuery.asLastN)
// in order to get first or last instances, need to order by time
// final ascending ordering of instances is done in query service
sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}")
sqlQueryBuilder.append(" ORDER BY start DESC")
else sqlQueryBuilder.append(" ORDER BY start ASC")

sqlQueryBuilder.append(" LIMIT $limit")

return databaseClient.sql(sqlQueryBuilder.toString())
.bind("entities_ids", entitiesIds)
Expand Down Expand Up @@ -149,11 +153,13 @@ class ScopeService(
} else
"SELECT entity_id, min(time) as start, max(time) as end, $allAggregates "
}

temporalEntitiesQuery.temporalQuery.timeproperty == TemporalProperty.OBSERVED_AT -> {
"""
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start
"""
}

else -> {
"""
SELECT entity_id, ARRAY(SELECT jsonb_array_elements_text(value)) as value, time as start, sub
Expand Down Expand Up @@ -245,19 +251,22 @@ class ScopeService(
)
)
}

OperationType.APPEND_ATTRIBUTES, OperationType.MERGE_ENTITY -> {
val newScopes = (currentScopes ?: emptyList()).toSet().plus(scopes).toList()
val newPayload = newScopes.map { mapOf(JsonLdUtils.JSONLD_VALUE to it) }
val updatedPayload = currentPayload.replaceScopeValue(newPayload)
Pair(newScopes, updatedPayload)
}

OperationType.APPEND_ATTRIBUTES_OVERWRITE_ALLOWED,
OperationType.MERGE_ENTITY_OVERWRITE_ALLOWED,
OperationType.PARTIAL_ATTRIBUTE_UPDATE,
OperationType.REPLACE_ATTRIBUTE -> {
val updatedPayload = currentPayload.replaceScopeValue(expandedAttributeInstances)
Pair(scopes, updatedPayload)
}

else -> Pair(null, Json.of("{}"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import arrow.core.right
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.model.*
import com.egm.stellio.search.model.AggregatedAttributeInstanceResult.AggregateResult
import com.egm.stellio.search.model.TemporalQuery.Timerel
import com.egm.stellio.search.util.*
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.INCONSISTENT_VALUES_IN_AGGREGATION_MESSAGE
import com.egm.stellio.shared.util.attributeOrInstanceNotFoundMessage
import com.egm.stellio.shared.util.ngsiLdDateTime
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.r2dbc.core.bind
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.net.URI
import java.time.ZonedDateTime
import java.util.UUID
import java.util.*

@Service
class AttributeInstanceService(
Expand Down Expand Up @@ -109,7 +112,7 @@ class AttributeInstanceService(
return create(attributeInstance)
}

suspend fun search(
suspend fun search( // que pour les tests?
temporalEntitiesQuery: TemporalEntitiesQuery,
temporalEntityAttribute: TemporalEntityAttribute,
origin: ZonedDateTime? = null
Expand Down Expand Up @@ -146,11 +149,12 @@ class AttributeInstanceService(
)

when (temporalQuery.timerel) {
TemporalQuery.Timerel.BEFORE -> sqlQueryBuilder.append(" AND time < '${temporalQuery.timeAt}'")
TemporalQuery.Timerel.AFTER -> sqlQueryBuilder.append(" AND time > '${temporalQuery.timeAt}'")
TemporalQuery.Timerel.BETWEEN -> sqlQueryBuilder.append(
Timerel.BEFORE -> sqlQueryBuilder.append(" AND time < '${temporalQuery.timeAt}'")
Timerel.AFTER -> sqlQueryBuilder.append(" AND time > '${temporalQuery.timeAt}'")
Timerel.BETWEEN -> sqlQueryBuilder.append(
" AND time > '${temporalQuery.timeAt}' AND time < '${temporalQuery.endTimeAt}'"
)

null -> Unit
}

Expand All @@ -159,10 +163,13 @@ class AttributeInstanceService(
else if (temporalEntitiesQuery.withAggregatedValues)
sqlQueryBuilder.append(" GROUP BY temporal_entity_attribute")

if (temporalQuery.lastN != null)
// in order to get last instances, need to order by time desc
if (temporalQuery.asLastN)
// in order to get first or last instances, need to order by time
// final ascending ordering of instances is done in query service
sqlQueryBuilder.append(" ORDER BY start DESC LIMIT ${temporalQuery.lastN}")
sqlQueryBuilder.append(" ORDER BY start DESC")
else sqlQueryBuilder.append(" ORDER BY start ASC")

sqlQueryBuilder.append(" LIMIT ${temporalQuery.limit}")

val finalTemporalQuery = composeFinalTemporalQuery(temporalEntityAttributes, sqlQueryBuilder.toString())

Expand Down Expand Up @@ -194,6 +201,7 @@ class AttributeInstanceService(
JOIN LATERAL (
$aiLateralQuery
) ai_limited ON true;

""".trimIndent()
}

Expand All @@ -218,6 +226,7 @@ class AttributeInstanceService(
} else
"SELECT temporal_entity_attribute, min(time) as start, max(time) as end, $allAggregates "
}

else -> {
val valueColumn = when (temporalEntityAttributes[0].attributeValueType) {
TemporalEntityAttribute.AttributeValueType.NUMBER -> "measured_value as value"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class QueryService(
.groupBy {
it.attributeValueType
}.mapValues {
attributeInstanceService.search(temporalEntitiesQuery, it.value, origin).bind()
attributeInstanceService.search(temporalEntitiesQuery, temporalEntityAttributes = it.value, origin)
.bind()
}
.mapValues {
// when retrieved from DB, values of geo-properties are encoded in WKT and won't be automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package com.egm.stellio.search.util
import arrow.core.*
import arrow.core.raise.either
import com.egm.stellio.search.model.*
import com.egm.stellio.search.model.TemporalQuery.*
import com.egm.stellio.shared.config.ApplicationProperties
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.TooManyResultsException
import com.egm.stellio.shared.util.*
import org.springframework.util.MultiValueMap
import org.springframework.util.MultiValueMapAdapter
import java.time.ZonedDateTime
import java.util.Optional
import java.util.*

fun composeEntitiesQuery(
defaultPagination: ApplicationProperties.Pagination,
Expand Down Expand Up @@ -129,7 +131,8 @@ fun composeTemporalEntitiesQuery(
Optional.ofNullable(requestParams.getFirst(QUERY_PARAM_OPTIONS)),
OptionsParamValue.AGGREGATED_VALUES
)
val temporalQuery = buildTemporalQuery(requestParams, inQueryEntities, withAggregatedValues).bind()
val temporalQuery =
buildTemporalQuery(requestParams, defaultPagination, inQueryEntities, withAggregatedValues).bind()

TemporalEntitiesQuery(
entitiesQuery = entitiesQuery,
Expand Down Expand Up @@ -168,15 +171,20 @@ fun composeTemporalEntitiesQueryFromPostRequest(
)

val temporalParams = mapOf(
"timerel" to listOf(query.temporalQ?.timerel),
"timeAt" to listOf(query.temporalQ?.timeAt),
"endTimeAt" to listOf(query.temporalQ?.endTimeAt),
"aggrPeriodDuration" to listOf(query.temporalQ?.aggrPeriodDuration),
"aggrMethods" to query.temporalQ?.aggrMethods,
"lastN" to listOf(query.temporalQ?.lastN.toString()),
"timeproperty" to listOf(query.temporalQ?.timeproperty)
TIMEREL_PARAM to listOf(query.temporalQ?.timerel),
TIMEAT_PARAM to listOf(query.temporalQ?.timeAt),
ENDTIMEAT_PARAM to listOf(query.temporalQ?.endTimeAt),
AGGRPERIODDURATION_PARAM to listOf(query.temporalQ?.aggrPeriodDuration),
AGGRMETHODS_PARAM to query.temporalQ?.aggrMethods,
LASTN_PARAM to listOf(query.temporalQ?.lastN.toString()),
TIMEPROPERTY_PARAM to listOf(query.temporalQ?.timeproperty)
)
val temporalQuery = buildTemporalQuery(MultiValueMapAdapter(temporalParams), true, withAggregatedValues).bind()
val temporalQuery = buildTemporalQuery(
MultiValueMapAdapter(temporalParams),
defaultPagination,
true,
withAggregatedValues,
).bind()

TemporalEntitiesQuery(
entitiesQuery = entitiesQuery,
Expand All @@ -187,27 +195,26 @@ fun composeTemporalEntitiesQueryFromPostRequest(
)
}

@SuppressWarnings("ReturnCount")
fun buildTemporalQuery(
params: MultiValueMap<String, String>,
pagination: ApplicationProperties.Pagination,
inQueryEntities: Boolean = false,
withAggregatedValues: Boolean = false
withAggregatedValues: Boolean = false,
): Either<APIException, TemporalQuery> {
val timerelParam = params.getFirst("timerel")
val timeAtParam = params.getFirst("timeAt")
val endTimeAtParam = params.getFirst("endTimeAt")
val timerelParam = params.getFirst(TIMEREL_PARAM)
val timeAtParam = params.getFirst(TIMEAT_PARAM)
val endTimeAtParam = params.getFirst(ENDTIMEAT_PARAM)
val aggrPeriodDurationParam =
if (withAggregatedValues)
params.getFirst("aggrPeriodDuration") ?: "PT0S"
params.getFirst(AGGRPERIODDURATION_PARAM) ?: WHOLE_TIME_RANGE_DURATION
else null
val aggrMethodsParam = params.getFirst("aggrMethods")
val lastNParam = params.getFirst("lastN")
val timeproperty = params.getFirst("timeproperty")?.let {
val aggrMethodsParam = params.getFirst(AGGRMETHODS_PARAM)
val lastNParam = params.getFirst(LASTN_PARAM)
val timeproperty = params.getFirst(TIMEPROPERTY_PARAM)?.let {
AttributeInstance.TemporalProperty.forPropertyName(it)
} ?: AttributeInstance.TemporalProperty.OBSERVED_AT

if (timerelParam == "between" && endTimeAtParam == null)
return BadRequestDataException("'endTimeAt' request parameter is mandatory if 'timerel' is 'between'").left()

val endTimeAt = endTimeAtParam?.parseTimeParameter("'endTimeAt' parameter is not a valid date")
?.getOrElse {
return BadRequestDataException(it).left()
Expand All @@ -216,29 +223,42 @@ fun buildTemporalQuery(
val (timerel, timeAt) = buildTimerelAndTime(timerelParam, timeAtParam, inQueryEntities).getOrElse {
return BadRequestDataException(it).left()
}

if (timerel == Timerel.BETWEEN && endTimeAtParam == null)
return BadRequestDataException("'endTimeAt' request parameter is mandatory if 'timerel' is 'between'").left()

if (withAggregatedValues && aggrMethodsParam == null)
return BadRequestDataException("'aggrMethods' is mandatory if 'aggregatedValues' option is specified").left()

val aggregate = aggrMethodsParam?.split(",")?.map {
if (TemporalQuery.Aggregate.isSupportedAggregate(it))
TemporalQuery.Aggregate.forMethod(it)!!
if (Aggregate.isSupportedAggregate(it))
Aggregate.forMethod(it)!!
else
return BadRequestDataException(
"'$it' is not a recognized aggregation method for 'aggrMethods' parameter"
).left()
}

val lastN = lastNParam?.toIntOrNull()?.let {
if (it >= 1) it else null
if (it > pagination.temporalLimitMax) return TooManyResultsException(
"You asked for the $it last temporal entities, but the supported maximum limit is ${
pagination.temporalLimitMax
}"
).left()
else if (it >= 1) it
else null
}
val limit = lastN ?: pagination.temporalLimitDefault
val asLastN = lastN != null

return TemporalQuery(
timerel = timerel,
timeAt = timeAt,
endTimeAt = endTimeAt,
aggrPeriodDuration = aggrPeriodDurationParam,
aggrMethods = aggregate,
lastN = lastN,
limit = limit,
asLastN = asLastN,
timeproperty = timeproperty
).right()
}
Expand All @@ -247,13 +267,13 @@ fun buildTimerelAndTime(
timerelParam: String?,
timeAtParam: String?,
inQueryEntities: Boolean
): Either<String, Pair<TemporalQuery.Timerel?, ZonedDateTime?>> =
): Either<String, Pair<Timerel?, ZonedDateTime?>> =
// when querying a specific temporal entity, timeAt and timerel are optional
if (timerelParam == null && timeAtParam == null && !inQueryEntities) {
Pair(null, null).right()
} else if (timerelParam != null && timeAtParam != null) {
val timeRelResult = try {
TemporalQuery.Timerel.valueOf(timerelParam.uppercase()).right()
Timerel.valueOf(timerelParam.uppercase()).right()
} catch (e: IllegalArgumentException) {
"'timerel' is not valid, it should be one of 'before', 'between', or 'after'".left()
}
Expand Down
Loading
Loading