Skip to content

Commit

Permalink
Merge pull request #703 from FgForrest/700-allow-extending-readiness-…
Browse files Browse the repository at this point in the history
…timeouts

700 allow extending readiness timeouts
  • Loading branch information
novoj authored Oct 21, 2024
2 parents e518164 + 98d4cba commit 2495774
Show file tree
Hide file tree
Showing 50 changed files with 797 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public EntityDecorator(
entity.hierarchyPredicate,
entity.attributePredicate,
entity.associatedDataPredicate,
entity.referencePredicate,
entity.referencePredicate.createRicherCopyWith(referenceFetcher.getEnvelopingEntityRequest()),
entity.pricePredicate,
entity.alignedNow,
referenceFetcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023
* Copyright (c) 2023-2024
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
package io.evitadb.api.requestResponse.data.structure;

import io.evitadb.api.EntityCollectionContract;
import io.evitadb.api.requestResponse.EvitaRequest;
import io.evitadb.api.requestResponse.data.EntityClassifierWithParent;
import io.evitadb.api.requestResponse.data.ReferenceContract;
import io.evitadb.api.requestResponse.data.SealedEntity;
Expand Down Expand Up @@ -56,6 +57,12 @@ public <T extends SealedEntity> List<T> initReferenceIndex(@Nonnull List<T> enti
return entities;
}

@Nonnull
@Override
public EvitaRequest getEnvelopingEntityRequest() {
throw new UnsupportedOperationException("No implementation");
}

@Nullable
@Override
public Function<Integer, EntityClassifierWithParent> getParentEntityFetcher() {
Expand Down Expand Up @@ -172,4 +179,13 @@ public BiPredicate<Integer, ReferenceDecorator> getEntityFilter(@Nonnull Referen
@Nullable
BiPredicate<Integer, ReferenceDecorator> getEntityFilter(@Nonnull ReferenceSchemaContract referenceSchema);

/**
* Returns evita request that should be used to fetch top-level (enveloping) entity. The request may contain
* extended requirements so that the comparators have all the necessary data.
*
* @return request that should be used to fetch top-level entity
*/
@Nonnull
EvitaRequest getEnvelopingEntityRequest();

}
45 changes: 23 additions & 22 deletions evita_common/src/main/java/io/evitadb/utils/NetworkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -63,12 +64,7 @@
*/
@Slf4j
public class NetworkUtils {
/**
* This shouldn't be changed - only in tests which needs to extend this timeout for slower machines runnning
* parallel tests and squeezing the resources.
*/
public static int DEFAULT_CLIENT_TIMEOUT = 1000;
private static OkHttpClient HTTP_CLIENT;
private static Map<Long, OkHttpClient> HTTP_CLIENT = CollectionUtils.createConcurrentHashMap(8);

/**
* Returns human comprehensible host name of the given host.
Expand Down Expand Up @@ -107,12 +103,13 @@ public static String getLocalHostName() {
*/
public static boolean isReachable(
@Nonnull String url,
long timeoutInMillis,
@Nullable Consumer<String> errorConsumer,
@Nullable Consumer<String> timeoutConsumer
) {
try {
try (
final Response response = getHttpClient().newCall(
final Response response = getHttpClient(timeoutInMillis).newCall(
new Builder()
.url(url)
.get()
Expand Down Expand Up @@ -144,6 +141,7 @@ public static boolean isReachable(
* @param method HTTP method to use
* @param contentType content type to use
* @param body body to send
* @param timeoutInMillis timeout in milliseconds
* @return the content of the URL as a string or empty optional if the URL is not reachable or
* does not return any content
*/
Expand All @@ -153,6 +151,7 @@ public static Optional<String> fetchContent(
@Nullable String method,
@Nonnull String contentType,
@Nullable String body,
long timeoutInMillis,
@Nullable Consumer<String> errorConsumer,
@Nullable Consumer<String> timeoutConsumer
) {
Expand All @@ -161,7 +160,7 @@ public static Optional<String> fetchContent(
.map(theBody -> RequestBody.create(theBody, MediaType.parse(contentType)))
.orElse(null);
try (
final Response response = getHttpClient().newCall(
final Response response = getHttpClient(timeoutInMillis).newCall(
new Request.Builder()
.url(url)
.addHeader("Accept", contentType)
Expand Down Expand Up @@ -231,26 +230,28 @@ private static String readBodyString(@Nonnull Response response) {
* @return the HTTP client instance
*/
@Nonnull
private static OkHttpClient getHttpClient() {
if (HTTP_CLIENT == null) {
try {
// Get a new SSL context
final SSLContext sc = SSLContext.getInstance("TLSv1.3");
sc.init(null, new TrustManager[]{TrustAllX509TrustManager.INSTANCE}, new java.security.SecureRandom());
private static OkHttpClient getHttpClient(long timeoutInMillis) {
try {
// Get a new SSL context
final SSLContext sc = SSLContext.getInstance("TLSv1.3");
sc.init(null, new TrustManager[]{TrustAllX509TrustManager.INSTANCE}, new java.security.SecureRandom());

HTTP_CLIENT = new OkHttpClient.Builder()
return HTTP_CLIENT.computeIfAbsent(
timeoutInMillis,
(t) -> new OkHttpClient.Builder()
.hostnameVerifier((hostname, session) -> true)
.sslSocketFactory(sc.getSocketFactory(), TrustAllX509TrustManager.INSTANCE)
.protocols(Arrays.asList(Protocol.HTTP_1_1, Protocol.HTTP_2))
.readTimeout(DEFAULT_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.callTimeout(DEFAULT_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.readTimeout(timeoutInMillis, TimeUnit.MILLISECONDS)
.callTimeout(timeoutInMillis, TimeUnit.MILLISECONDS)
.connectTimeout(timeoutInMillis, TimeUnit.MILLISECONDS)
.writeTimeout(timeoutInMillis, TimeUnit.MILLISECONDS)
.connectionPool(new ConnectionPool(0, 1, TimeUnit.MILLISECONDS))
.build();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Failed to create HTTP client", e);
}
.build()
);
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new IllegalStateException("Failed to create HTTP client", e);
}
return HTTP_CLIENT;
}

/**
Expand Down
11 changes: 4 additions & 7 deletions evita_engine/src/main/java/io/evitadb/core/EntityCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,18 +719,15 @@ public SealedEntity[] deleteEntitiesAndReturnThem(@Nonnull EvitaRequest evitaReq
.mapToInt(EntityCollection::getPrimaryKey)
.toArray();

final List<ServerEntityDecorator> removedEntities = new ArrayList<>(entitiesToRemove.length);
final List<SealedEntity> removedEntities = new ArrayList<>(entitiesToRemove.length);
for (int entityToRemove : entitiesToRemove) {
removedEntities.add(
wrapToDecorator(evitaRequest, deleteEntityInternal(entityToRemove, evitaRequest), false)
);
}

final ReferenceFetcher referenceFetcher = createReferenceFetcher(evitaRequest, session);
return referenceFetcher.initReferenceIndex(removedEntities, this)
.stream()
.map(it -> applyReferenceFetcherInternal(it, referenceFetcher))
.toArray(SealedEntity[]::new);
return applyReferenceFetcher(removedEntities, referenceFetcher).toArray(SealedEntity[]::new);

}

Expand Down Expand Up @@ -1066,8 +1063,8 @@ public List<SealedEntity> applyReferenceFetcher(
if (referenceFetcher == ReferenceFetcher.NO_IMPLEMENTATION) {
return sealedEntities;
} else {
referenceFetcher.initReferenceIndex(sealedEntities, this);
return sealedEntities.stream()
return referenceFetcher.initReferenceIndex(sealedEntities, this)
.stream()
.map(it -> applyReferenceFetcherInternal((ServerEntityDecorator) it, referenceFetcher))
.map(SealedEntity.class::cast)
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.evitadb.core.query.algebra.prefetch.PrefetchFactory;
import io.evitadb.core.query.algebra.prefetch.PrefetchFormulaVisitor;
import io.evitadb.core.query.extraResult.ExtraResultProducer;
import io.evitadb.core.query.fetch.FetchRequirementCollector;
import io.evitadb.core.query.filter.FilterByVisitor;
import io.evitadb.core.query.indexSelection.TargetIndexes;
import io.evitadb.core.query.sort.NoSorter;
Expand All @@ -51,7 +52,7 @@
* be used for filtering/sorting instead of accessing the indexes.
*/
@RequiredArgsConstructor
public class QueryPlanBuilder implements PrefetchRequirementCollector {
public class QueryPlanBuilder implements FetchRequirementCollector {
/**
* Reference to the query context that allows to access entity bodies, indexes, original request and much more.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.evitadb.core.query.algebra.base.EmptyFormula;
import io.evitadb.core.query.algebra.utils.FormulaFactory;
import io.evitadb.core.query.extraResult.translator.hierarchyStatistics.AbstractHierarchyTranslator.TraversalDirection;
import io.evitadb.core.query.fetch.DefaultPrefetchRequirementCollector;
import io.evitadb.core.query.filter.FilterByVisitor;
import io.evitadb.core.query.filter.FilterByVisitor.ProcessingScope;
import io.evitadb.core.query.indexSelection.TargetIndexes;
Expand All @@ -75,6 +76,7 @@
import io.evitadb.core.query.sort.Sorter;
import io.evitadb.core.query.sort.attribute.translator.EntityNestedQueryComparator;
import io.evitadb.dataType.array.CompositeIntArray;
import io.evitadb.exception.GenericEvitaInternalError;
import io.evitadb.index.GlobalEntityIndex;
import io.evitadb.index.ReducedEntityIndex;
import io.evitadb.index.bitmap.ArrayBitmap;
Expand Down Expand Up @@ -168,6 +170,12 @@ public class ReferencedEntityFetcher implements ReferenceFetcher {
* is either {@link EntityReferenceWithParent} if bodies were not requested, or full {@link SealedEntity} otherwise.
*/
@Nullable private IntObjectMap<EntityClassifierWithParent> parentEntities;
/**
* This request is used to extend the original request on top-level entity. It solves the scenario, when the nested
* references are ordered by reference attribute. In that case we need to extend the original request with additional
* requirements to fetch the reference attribute for ordering comparator.
*/
private EvitaRequest envelopingEntityRequest;

/**
* Utility function that fetches and returns filtered map of {@link SealedEntity} indexed by their primary key
Expand Down Expand Up @@ -811,7 +819,7 @@ public <T extends SealedEntity> T initReferenceIndex(@Nonnull T entity, @Nonnull
);
}
// prefetch the entities
prefetchEntities(
this.envelopingEntityRequest = prefetchEntities(
this.requirementContext,
this.defaultRequirementContext,
this.executionContext,
Expand Down Expand Up @@ -901,7 +909,7 @@ public <T extends SealedEntity> List<T> initReferenceIndex(@Nonnull List<T> enti
);

// prefetch the entities
prefetchEntities(
this.envelopingEntityRequest = prefetchEntities(
this.requirementContext,
this.defaultRequirementContext,
this.executionContext,
Expand Down Expand Up @@ -931,6 +939,16 @@ public <T extends SealedEntity> List<T> initReferenceIndex(@Nonnull List<T> enti
return richEnoughEntities;
}

@Nonnull
@Override
public EvitaRequest getEnvelopingEntityRequest() {
Assert.isPremiseValid(
this.envelopingEntityRequest != null,
() -> new GenericEvitaInternalError("Enveloping entity request must be initialized before it's accessed.")
);
return this.envelopingEntityRequest;
}

@Nullable
@Override
public Function<Integer, EntityClassifierWithParent> getParentEntityFetcher() {
Expand Down Expand Up @@ -1000,7 +1018,8 @@ public BiPredicate<Integer, ReferenceDecorator> getEntityFilter(@Nonnull Referen
* @param referencedEntityGroupIdsFormula the formula containing superset of all possible referenced entity groups
* @param entityPrimaryKey the array of top entity primary keys for which the references are being fetched
*/
private void prefetchEntities(
@Nonnull
private EvitaRequest prefetchEntities(
@Nonnull Map<String, RequirementContext> requirementContext,
@Nullable RequirementContext defaultRequirementContext,
@Nonnull QueryExecutionContext executionContext,
Expand All @@ -1024,6 +1043,7 @@ private void prefetchEntities(
)
);

final DefaultPrefetchRequirementCollector globalPrefetchCollector = new DefaultPrefetchRequirementCollector(null);
this.fetchedEntities = collectedRequirements
.filter(it -> it.getValue().requiresInit())
.collect(
Expand All @@ -1033,9 +1053,17 @@ private void prefetchEntities(
final String referenceName = it.getKey();
final RequirementContext requirements = it.getValue();
final ReferenceSchemaContract referenceSchema = entitySchema.getReferenceOrThrowException(referenceName);
final DefaultPrefetchRequirementCollector localPrefetchCollector = new DefaultPrefetchRequirementCollector(requirements.entityFetch());

final Optional<OrderingDescriptor> orderingDescriptor = ofNullable(requirements.orderBy())
.map(ob -> ReferenceOrderByVisitor.getComparator(executionContext.getQueryContext(), ob, entitySchema, referenceSchema));
.map(ob -> ReferenceOrderByVisitor.getComparator(
executionContext.getQueryContext(),
localPrefetchCollector,
ob,
entitySchema,
referenceSchema
)
);

final ValidEntityToReferenceMapping validityMapping = new ValidEntityToReferenceMapping(entityPrimaryKey.length);

Expand Down Expand Up @@ -1098,14 +1126,17 @@ private void prefetchEntities(
)
);

if (requirements.entityFetch() != null && !ArrayUtils.isEmpty(filteredReferencedEntityIds)) {
final EntityFetch entityFetch = localPrefetchCollector.getEntityFetch();
if (entityFetch != null && !ArrayUtils.isEmpty(filteredReferencedEntityIds)) {
// collect global requirements
globalPrefetchCollector.addRequirementToPrefetch(entityFetch.getRequirements());
// if so, fetch them
entityIndex = fetchReferencedEntities(
executionContext,
referenceSchema,
referenceSchema.getReferencedEntityType(),
pk -> existingEntityRetriever.getExistingEntity(referenceName, pk),
requirements.entityFetch(),
entityFetch,
filteredReferencedEntityIds
);
} else {
Expand All @@ -1128,6 +1159,10 @@ private void prefetchEntities(
}
)
);

return ofNullable(globalPrefetchCollector.getEntityFetch())
.map(it -> executionContext.getEvitaRequest().deriveCopyWith(executionContext.getSchema().getName(), it))
.orElse(executionContext.getEvitaRequest());
}

/**
Expand Down Expand Up @@ -1524,4 +1559,5 @@ public Map<Integer, Entity> get() {
return memoizedResult;
}
}

}
Loading

0 comments on commit 2495774

Please sign in to comment.