Skip to content

Commit

Permalink
Feat:externalize partition filter in ES query
Browse files Browse the repository at this point in the history
  • Loading branch information
mbarbet committed Jun 3, 2024
1 parent 72fa2b0 commit 65abba7
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ElasticFluidSearch extends FluidSearchService {
private int elasticMaxPrecisionThreshold;
private SearchRequest.Builder requestBuilder;
private BoolQuery.Builder boolQueryBuilder;
private BoolQuery.Builder boolPartitionQueryBuilder;


public ElasticFluidSearch(CollectionReference collectionReference, int elasticMaxPrecisionThreshold) {
Expand All @@ -73,6 +74,8 @@ public ElasticFluidSearch(CollectionReference collectionReference, int elasticMa
.index(collectionReference.params.indexName)
.trackTotalHits(b -> b.enabled(true));
boolQueryBuilder = new BoolQuery.Builder();
boolPartitionQueryBuilder = new BoolQuery.Builder();

}

public ElasticClient getClient() {
Expand All @@ -87,6 +90,10 @@ public ElasticFluidSearch setClient(ElasticClient client) {
public BoolQuery.Builder getBoolQueryBuilder() {
return boolQueryBuilder;
}
public BoolQuery.Builder getBoolPartitionQueryBuilder() {
return boolPartitionQueryBuilder;
}


public SearchResponse<Map> exec() throws ArlasException {
Pair<String[], String[]> includeExclude = computeIncludeExclude(false);
Expand All @@ -98,15 +105,15 @@ public SearchResponse<Map> exec() throws ArlasException {
.excludes(Arrays.asList(includeExclude.getRight()))
)
)
.query(boolQueryBuilder.build()._toQuery())
.query(boolQueryBuilder.must(boolPartitionQueryBuilder.build()._toQuery()).build()._toQuery())
.build();

// https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/reading.html#_reading_raw_json
return client.search(request);
}

@Override
public FluidSearchService filter(MultiValueFilter<Expression> f, String dateFormat, Boolean rightHand )throws ArlasException {
public FluidSearchService filter(MultiValueFilter<Expression> f, String dateFormat, Boolean rightHand) throws ArlasException {
List<Query> queries = new ArrayList<>();
for (Expression fFilter : f) {
queries.add(filter(fFilter, dateFormat, rightHand));
Expand All @@ -115,6 +122,37 @@ public FluidSearchService filter(MultiValueFilter<Expression> f, String dateForm
return this;
}

@Override
public FluidSearchService partitionFilter(List<Filter> filters) throws ArlasException {
List<Query> finalQueries = new ArrayList<>();
for (Filter filter : filters){
// OR LEVEL
BoolQuery.Builder builder = new BoolQuery.Builder();
if(filter.f != null){
for (MultiValueFilter<Expression> f : filter.f) {
// AND LEVEL
List<Query> queries = new ArrayList<>();
for (Expression fFilter : f) {
//OR LEVEL
queries.add(filter(fFilter, filter.dateformat, filter.righthand));
}
builder = builder.filter(QueryBuilders.bool().should(queries).minimumShouldMatch("1").build()._toQuery());
}
}
if(filter.q != null){
for (MultiValueFilter<String> q : filter.q) {
BoolQuery.Builder orBoolQueryBuilder = getFilterQBuilder(q);
builder = builder.filter(orBoolQueryBuilder.build()._toQuery());
}
}
//AND OF OR
finalQueries.add(builder.build()._toQuery());
}
// OR OF (AND OF OR)
boolPartitionQueryBuilder = boolPartitionQueryBuilder.should(finalQueries).minimumShouldMatch("1");
return this;
}

private Query filter(Expression expression, String dateFormat, Boolean rightHand) throws ArlasException {
BoolQuery.Builder ret = new BoolQuery.Builder();
if (StringUtil.isNullOrEmpty(expression.field) || expression.op == null || StringUtil.isNullOrEmpty(expression.value)) {
Expand Down Expand Up @@ -287,6 +325,12 @@ protected RangeQuery.Builder getRangeQueryBuilder(String field, String value, St

@Override
public FluidSearchService filterQ(MultiValueFilter<String> q) throws ArlasException {
BoolQuery.Builder orBoolQueryBuilder = getFilterQBuilder(q);
boolQueryBuilder = boolQueryBuilder.filter(orBoolQueryBuilder.build()._toQuery());
return this;
}

private BoolQuery.Builder getFilterQBuilder(MultiValueFilter<String> q) throws InvalidParameterException {
BoolQuery.Builder orBoolQueryBuilder = new BoolQuery.Builder().minimumShouldMatch("1");
for (String qFilter : q) {
String[] operands = qFilter.split(":",2);
Expand All @@ -300,8 +344,7 @@ public FluidSearchService filterQ(MultiValueFilter<String> q) throws ArlasExcept
throw new InvalidParameterException(INVALID_Q_FILTER);
}
}
boolQueryBuilder = boolQueryBuilder.filter(orBoolQueryBuilder.build()._toQuery());
return this;
return orBoolQueryBuilder;
}

public List<Query> filterPWithin(String field, String pwithinFilter) throws ArlasException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package io.arlas.server.core.model.request;


import java.util.List;

public class Request {
public List<Filter> partitionFilter;
public Filter filter;
public Form form;
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ public void setValidGeoFilters(CollectionReference collectionReference, Request
if (request != null && request.filter != null) {
request.filter = ParamsParser.getFilterWithValidGeos(collectionReference, request.filter);
}
if (request != null && request.partitionFilter != null) {
List<Filter> newFilters = new ArrayList<>();
for (Filter f : request.partitionFilter){
newFilters.add(ParamsParser.getFilterWithValidGeos(collectionReference, f));
}
request.partitionFilter = newFilters;
}
}

public Map<String, Object> flat(AggregationResponse element,
Expand Down Expand Up @@ -154,6 +161,23 @@ public void applyFilter(Filter filter, FluidSearchService fluidSearch) throws Ar
}
}

public void applyPartitionFilter(List<Filter> filters, FluidSearchService fluidSearch) throws ArlasException {
if (filters != null) {
for(Filter f:filters){
if (f != null) {
CheckParams.checkFilter(f);
if (f.f != null && !f.f.isEmpty()) {
CollectionReference collectionReference = fluidSearch.getCollectionReference();
if (!filterFHasDateQuery(f, collectionReference) && !StringUtil.isNullOrEmpty(f.dateformat)) {
throw new BadRequestException("dateformat is specified but no date field is queried in f filter (gt, lt, gte, lte or range operations)");
}
}
}
}
fluidSearch = fluidSearch.partitionFilter(filters);
}
}

/**
* This method checks whether in all the expressions of the filter `f`, a date field has been queried using `lte`, `gte`, `lt`, `gt` or `range` operations
* **/
Expand Down Expand Up @@ -333,7 +357,7 @@ public AggregationResponse aggregate(MixedRequest request,
FluidSearchService fluidSearch = getFluidSearch(collectionReference);
applyFilter(collectionReference.params.filter, fluidSearch);
applyFilter(request.basicRequest.filter, fluidSearch);
applyFilter(request.headerRequest.filter, fluidSearch);
applyPartitionFilter(request.headerRequest.partitionFilter, fluidSearch);
List<Aggregation> aggregations = ((AggregationsRequest) request.basicRequest).aggregations;
if (aggregations != null && !aggregations.isEmpty()) {
fluidSearch.aggregate(aggregations, isGeoAggregation);
Expand All @@ -347,7 +371,7 @@ public ComputationResponse compute(MixedRequest request,
FluidSearchService fluidSearch = getFluidSearch(collectionReference);
applyFilter(collectionReference.params.filter, fluidSearch);
applyFilter(request.basicRequest.filter, fluidSearch);
applyFilter(request.headerRequest.filter, fluidSearch);
applyPartitionFilter(request.headerRequest.partitionFilter, fluidSearch);
String field = ((ComputationRequest)request.basicRequest).field;
ComputationEnum metric = ((ComputationRequest)request.basicRequest).metric;
int precisionThreshold = ((ComputationRequest)request.basicRequest).precisionThreshold;
Expand All @@ -360,7 +384,7 @@ public Hits count(MixedRequest request,
FluidSearchService fluidSearch = getFluidSearch(collectionReference);
applyFilter(collectionReference.params.filter, fluidSearch);
applyFilter(request.basicRequest.filter, fluidSearch);
applyFilter(request.headerRequest.filter, fluidSearch);
applyPartitionFilter(request.headerRequest.partitionFilter, fluidSearch);
return count(collectionReference, fluidSearch);
}

Expand All @@ -374,7 +398,7 @@ protected FluidSearchService getSearchRequest(MixedRequest request, CollectionRe
FluidSearchService fluidSearch = getFluidSearch(collectionReference);
applyFilter(collectionReference.params.filter, fluidSearch);
applyFilter(request.basicRequest.filter, fluidSearch);
applyFilter(request.headerRequest.filter, fluidSearch);
applyPartitionFilter(request.headerRequest.partitionFilter, fluidSearch);
paginate(((Search) request.basicRequest).page, collectionReference, fluidSearch);
applyProjection(((Search) request.basicRequest).projection, fluidSearch, request.columnFilter, collectionReference);
return fluidSearch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import io.arlas.server.core.model.CollectionReference;
import io.arlas.server.core.model.enumerations.AggregationTypeEnum;
import io.arlas.server.core.model.enumerations.ComputationEnum;
import io.arlas.server.core.model.request.Aggregation;
import io.arlas.server.core.model.request.Expression;
import io.arlas.server.core.model.request.MultiValueFilter;
import io.arlas.server.core.model.request.Page;
import io.arlas.server.core.model.request.*;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -193,6 +190,8 @@ public static String getAggregationName(String aggName) {

abstract public FluidSearchService filter(MultiValueFilter<Expression> f, String dateFormat, Boolean rightHand) throws ArlasException;

public abstract FluidSearchService partitionFilter(List<Filter> filters)throws ArlasException;

abstract public FluidSearchService filterQ(MultiValueFilter<String> q) throws ArlasException;

abstract public FluidSearchService sort(String sort) throws ArlasException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public static String getValidAggregationFormat(String aggFormat) {
return Objects.requireNonNullElse(aggFormat, "yyyy-MM-dd-HH:mm:ss");
}

public static Filter getFilter(CollectionReference collectionReference, String serializedFilter) throws InvalidParameterException {
public static List<Filter> getPartitionFilter(CollectionReference collectionReference, String serializedFilter) throws InvalidParameterException {
if (serializedFilter != null) {
List<Filter> fList;
String sf = "[" + serializedFilter + "]";
Expand All @@ -278,19 +278,7 @@ public static Filter getFilter(CollectionReference collectionReference, String s
throw new InvalidParameterException(INVALID_FILTER + ": '" + sf + "'", ex);
}
}
// if not null and parsing ok then we have at least one filter
if (fList != null && fList.size() > 0) {
Filter retFilter = fList.get(0);
if (retFilter.righthand == null) {
retFilter.righthand = Boolean.TRUE;
}

for (int i = 1; i < fList.size(); i++) {
// for now, a list of partition filters is combined with OR. TODO: support more complex combination
retFilter.f.get(0).addAll(fList.get(i).f.get(0));
}
return retFilter;
}
return fList;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Response aggregate(
ColumnFilterUtil.assertRequestAllowed(Optional.ofNullable(columnFilter), collectionReference, aggregationsRequest);

AggregationsRequest aggregationsRequestHeader = new AggregationsRequest();
aggregationsRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
aggregationsRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
request.basicRequest = aggregationsRequest;
exploreService.setValidGeoFilters(collectionReference, aggregationsRequestHeader);
Expand Down Expand Up @@ -235,7 +235,7 @@ public Response aggregatePost(
}

AggregationsRequest aggregationsRequestHeader = new AggregationsRequest();
aggregationsRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
aggregationsRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
exploreService.setValidGeoFilters(collectionReference, aggregationsRequest);
exploreService.setValidGeoFilters(collectionReference, aggregationsRequestHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public Response geoaggregatePost(
}

AggregationsRequest aggregationsRequestHeader = new AggregationsRequest();
aggregationsRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
aggregationsRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
exploreService.setValidGeoFilters(collectionReference, aggregationRequest);
exploreService.setValidGeoFilters(collectionReference, aggregationsRequestHeader);
Expand Down Expand Up @@ -765,7 +765,7 @@ public Response shapeaggregatePost(
}

AggregationsRequest aggregationsRequestHeader = new AggregationsRequest();
aggregationsRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
aggregationsRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
exploreService.setValidGeoFilters(collectionReference, aggregationRequest);
exploreService.setValidGeoFilters(collectionReference, aggregationsRequestHeader);
Expand Down Expand Up @@ -799,7 +799,7 @@ private MixedRequest getGeoaggregateRequest(CollectionReference collectionRefere
ColumnFilterUtil.assertRequestAllowed(columnFilter, collectionReference, aggregationsRequest);

AggregationsRequest aggregationsRequestHeader = new AggregationsRequest();
aggregationsRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
aggregationsRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
request.basicRequest = aggregationsRequest;
exploreService.setValidGeoFilters(collectionReference, aggregationsRequestHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Response compute(
ColumnFilterUtil.assertRequestAllowed(Optional.ofNullable(columnFilter), collectionReference, computationRequest);

ComputationRequest computationRequestHeader = new ComputationRequest();
computationRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
computationRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
exploreService.setValidGeoFilters(collectionReference, computationRequestHeader);
MixedRequest request = new MixedRequest();
request.basicRequest = computationRequest;
Expand Down Expand Up @@ -233,7 +233,7 @@ public Response computePost(
throw new NotFoundException(collection);
}
ComputationRequest computationRequestHeader = new ComputationRequest();
computationRequestHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
computationRequestHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
exploreService.setValidGeoFilters(collectionReference, computationRequest);
exploreService.setValidGeoFilters(collectionReference, computationRequestHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Response count(
MixedRequest request = new MixedRequest();
request.basicRequest = count;
Count countHeader = new Count();
countHeader.filter = ParamsParser.getFilter(collectionReference, partitionfilter);
countHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionfilter);
exploreService.setValidGeoFilters(collectionReference, countHeader);
request.headerRequest = countHeader;
request.columnFilter = ColumnFilterUtil.getCollectionRelatedColumnFilter(Optional.ofNullable(columnFilter), collectionReference);
Expand Down Expand Up @@ -213,7 +213,7 @@ public Response countPost(

request.basicRequest = count;
Count countHeader = new Count();
countHeader.filter = ParamsParser.getFilter(collectionReference, partitionfilter);
countHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionfilter);
exploreService.setValidGeoFilters(collectionReference, countHeader);
request.headerRequest = countHeader;
request.columnFilter = ColumnFilterUtil.getCollectionRelatedColumnFilter(Optional.ofNullable(columnFilter), collectionReference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public Response geosearchPost(
CheckParams.checkReturnedGeometries(collectionReference, includes, excludes, search.returned_geometries);

Search searchHeader = new Search();
searchHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
searchHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
exploreService.setValidGeoFilters(collectionReference, search);
exploreService.setValidGeoFilters(collectionReference, searchHeader);

Expand Down Expand Up @@ -644,7 +644,7 @@ public Response shapesearchPost(
CheckParams.checkReturnedGeometries(collectionReference, includes, excludes, search.returned_geometries);

Search searchHeader = new Search();
searchHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
searchHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);

exploreService.setValidGeoFilters(collectionReference, search);
exploreService.setValidGeoFilters(collectionReference, searchHeader);
Expand Down Expand Up @@ -689,7 +689,7 @@ private Response geosearch(CollectionReference collectionReference, Filter filte
search.projection = ParamsParser.enrichIncludes(search.projection, returned_geometries);

Search searchHeader = new Search();
searchHeader.filter = ParamsParser.getFilter(collectionReference, partitionFilter);
searchHeader.partitionFilter = ParamsParser.getPartitionFilter(collectionReference, partitionFilter);
MixedRequest request = new MixedRequest();
request.basicRequest = search;
exploreService.setValidGeoFilters(collectionReference, searchHeader);
Expand Down
Loading

0 comments on commit 65abba7

Please sign in to comment.