diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 2feba9aff..dd66d168f 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -23,6 +23,11 @@ + + + + + @@ -34,27 +39,126 @@ - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docker/Dockerfile b/docker/Dockerfile index b2726f4ca..a6179a440 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,7 @@ ENV SYSTEM_API_PORT="5555" ## folder with configuration files, all of them are applied one on top of another in alphabetical order ENV EVITA_BIN_DIR="$EVITA_HOME/bin/" -ENV EVITA_CONFIG_DIR="$EVITA_HOME/conf/" +ENV EVITA_CONFIG_DIR="$EVITA_HOME/config/" ENV EVITA_STORAGE_DIR="$EVITA_HOME/data/" ENV EVITA_EXPORT_DIR="$EVITA_HOME/export/" ENV EVITA_CERTIFICATE_DIR="$EVITA_HOME/certificates/" diff --git a/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/model/HealthProblem.java b/evita_api/src/main/java/io/evitadb/api/observability/HealthProblem.java similarity index 97% rename from evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/model/HealthProblem.java rename to evita_api/src/main/java/io/evitadb/api/observability/HealthProblem.java index 966dd7b22..d1c24ebd2 100644 --- a/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/model/HealthProblem.java +++ b/evita_api/src/main/java/io/evitadb/api/observability/HealthProblem.java @@ -21,7 +21,7 @@ * limitations under the License. */ -package io.evitadb.externalApi.api.system.model; +package io.evitadb.api.observability; /** * This enum represents the possible health problems that can be signaled by the server. diff --git a/evita_engine/src/main/java/io/evitadb/store/spi/model/storageParts/RemovedStoragePart.java b/evita_api/src/main/java/io/evitadb/api/observability/ReadinessState.java similarity index 54% rename from evita_engine/src/main/java/io/evitadb/store/spi/model/storageParts/RemovedStoragePart.java rename to evita_api/src/main/java/io/evitadb/api/observability/ReadinessState.java index 27f02e760..0254be081 100644 --- a/evita_engine/src/main/java/io/evitadb/store/spi/model/storageParts/RemovedStoragePart.java +++ b/evita_api/src/main/java/io/evitadb/api/observability/ReadinessState.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023 + * Copyright (c) 2024 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -21,28 +21,35 @@ * limitations under the License. */ -package io.evitadb.store.spi.model.storageParts; +package io.evitadb.api.observability; -import io.evitadb.store.model.StoragePart; -import io.evitadb.store.service.KeyCompressor; - -import javax.annotation.Nonnull; -import java.io.Serial; /** - * This class marks removed {@link StoragePart} in the transactional memory. + * Enum representing overall readiness state of the server. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ -public class RemovedStoragePart implements StoragePart { - public static final RemovedStoragePart INSTANCE = new RemovedStoragePart(); - @Serial private static final long serialVersionUID = 2485318464734970542L; +public enum ReadinessState { - @Override - public Long getStoragePartPK() { - throw new UnsupportedOperationException(); - } + /** + * At least one API is not ready. + */ + STARTING, + /** + * All APIs are ready. + */ + READY, + /** + * At least one API that was ready is not ready anymore. + */ + STALLING, + /** + * Server is shutting down. None of the APIs are ready. + */ + SHUTDOWN, + /** + * Unknown state - cannot determine the state of the APIs (should not happen). + */ + UNKNOWN - @Override - public long computeUniquePartIdAndSet(@Nonnull KeyCompressor keyCompressor) { - throw new UnsupportedOperationException(); - } } diff --git a/evita_api/src/main/java/module-info.java b/evita_api/src/main/java/module-info.java index deeb58def..da9049a2c 100644 --- a/evita_api/src/main/java/module-info.java +++ b/evita_api/src/main/java/module-info.java @@ -21,13 +21,11 @@ * limitations under the License. */ -import io.evitadb.api.observability.trace.TracingContext; - /** * Module contains external API of the evitaDB. */ module evita.api { - uses TracingContext; + uses io.evitadb.api.observability.trace.TracingContext; opens io.evitadb.api.configuration to com.fasterxml.jackson.databind; opens io.evitadb.api.requestResponse.extraResult to com.graphqljava; @@ -67,6 +65,7 @@ exports io.evitadb.api.requestResponse.schema.mutation.sortableAttributeCompound; exports io.evitadb.api.requestResponse.data.annotation; exports io.evitadb.api.requestResponse.transaction; + exports io.evitadb.api.observability; exports io.evitadb.api.observability.trace; exports io.evitadb.api.observability.annotation; diff --git a/evita_engine/src/main/java/io/evitadb/core/Catalog.java b/evita_engine/src/main/java/io/evitadb/core/Catalog.java index 83e1a0802..58d52bbd6 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Catalog.java +++ b/evita_engine/src/main/java/io/evitadb/core/Catalog.java @@ -72,6 +72,7 @@ import io.evitadb.api.requestResponse.system.TimeFlow; import io.evitadb.api.requestResponse.transaction.TransactionMutation; import io.evitadb.api.task.ServerTask; +import io.evitadb.core.EntityCollection.EntityCollectionHeaderWithCollection; import io.evitadb.core.async.ObservableExecutorService; import io.evitadb.core.async.Scheduler; import io.evitadb.core.buffer.DataStoreChanges; @@ -849,7 +850,7 @@ public boolean goLive() { return true; } finally { - goingLive.set(false); + this.goingLive.set(false); } } @@ -920,7 +921,11 @@ public void terminate() { // in warmup state try to persist all changes in volatile memory if (warmingUpState) { final long lastSeenVersion = entityCollection.getVersion(); - entityHeaders.add(entityCollection.flush()); + entityHeaders.add( + updateIndexIfNecessary( + entityCollection.flush() + ) + ); changeOccurred = changeOccurred || entityCollection.getVersion() != lastSeenVersion; } // in all states terminate collection operations @@ -933,7 +938,7 @@ public void terminate() { this.persistenceService.storeHeader( this.catalogId, getCatalogState(), - this.versionId.getId(), + getVersion(), this.entityTypeSequence.get(), null, entityHeaders, @@ -1193,7 +1198,7 @@ public void flush(long catalogVersion, @Nonnull TransactionMutation lastProcesse if (changeOccurred) { this.persistenceService.flushTrappedUpdates( catalogVersion, - this.dataStoreBuffer.getTrappedIndexChanges() + this.dataStoreBuffer.getTrappedChanges() ); this.persistenceService.storeHeader( this.catalogId, @@ -1311,14 +1316,23 @@ void flush() { final List entityHeaders = new ArrayList<>(this.entityCollections.size()); for (EntityCollection entityCollection : this.entityCollections.values()) { final long lastSeenVersion = entityCollection.getVersion(); - entityHeaders.add(entityCollection.flush()); + entityHeaders.add( + updateIndexIfNecessary( + entityCollection.flush() + ) + ); changeOccurred = changeOccurred || entityCollection.getVersion() != lastSeenVersion; } if (changeOccurred) { this.persistenceService.flushTrappedUpdates( 0L, - this.dataStoreBuffer.getTrappedIndexChanges() + this.dataStoreBuffer.getTrappedChanges() + ); + final CatalogHeader catalogHeader = this.persistenceService.getCatalogHeader(0L); + Assert.isPremiseValid( + catalogHeader != null && catalogHeader.catalogState() == CatalogState.WARMING_UP, + "Catalog header is expected to be present in the storage in WARMING_UP flag!" ); this.persistenceService.storeHeader( this.catalogId, @@ -1333,6 +1347,27 @@ void flush() { } } + /** + * Method transparently updates the contents of {@link #entityCollections} map with the new collection, if the + * passed {@link EntityCollectionHeaderWithCollection} contains a different collection than the one stored in + * the index. + * + * @param flushResult The result containing the header and the entity collection to potentially update. + * @return The entity collection header from the flush result. + */ + @Nonnull + private EntityCollectionHeader updateIndexIfNecessary( + @Nonnull EntityCollectionHeaderWithCollection flushResult + ) { + final EntityCollectionHeader header = flushResult.header(); + this.entityCollections.computeIfPresent( + header.entityType(), + (entityType, entityCollection) -> entityCollection == flushResult.collection() ? + entityCollection : flushResult.collection() + ); + return header; + } + /* PRIVATE METHODS */ @@ -1484,7 +1519,11 @@ private CatalogSchemaContract removeEntitySchema( final long catalogVersion = getVersion(); this.persistenceService.deleteEntityCollection( catalogVersion, - catalogVersion > 0L ? collectionToRemove.flush(catalogVersion) : collectionToRemove.flush() + catalogVersion > 0L ? + collectionToRemove.flush(catalogVersion) : + updateIndexIfNecessary( + collectionToRemove.flush() + ) ); } final CatalogSchemaContract result; @@ -1591,7 +1630,9 @@ private void doReplaceEntityCollectionInternal( entityCollectionToBeReplacedWith.updateSchema(getSchema(), modifyEntitySchemaName); this.entityCollections.remove(entityCollectionNameToBeReplacedWith); if (!transactionOpen) { - entityCollectionToBeReplacedWith.flush(); + updateIndexIfNecessary( + entityCollectionToBeReplacedWith.flush() + ); final long catalogVersion = getVersion(); Assert.isPremiseValid(catalogVersion == 0L, "Catalog version is expected to be `0`!"); final EntityCollectionPersistenceService newPersistenceService = this.persistenceService.replaceCollectionWith( @@ -1611,7 +1652,9 @@ private void doReplaceEntityCollectionInternal( entityCollectionNameToBeReplacedWith, entityCollectionToBeReplacedWith ); if (schemaUpdated) { - otherCollection.flush(); + updateIndexIfNecessary( + otherCollection.flush() + ); } } // store catalog with a new file pointer diff --git a/evita_engine/src/main/java/io/evitadb/core/EntityCollection.java b/evita_engine/src/main/java/io/evitadb/core/EntityCollection.java index ba7c56429..e54c912c2 100644 --- a/evita_engine/src/main/java/io/evitadb/core/EntityCollection.java +++ b/evita_engine/src/main/java/io/evitadb/core/EntityCollection.java @@ -1156,16 +1156,28 @@ public EntitySchema getInternalSchema() { * is ignored when there are no changes present in {@link CatalogPersistenceService}. */ @Nonnull - public EntityCollectionHeader flush() { - this.persistenceService.flushTrappedUpdates(0L, this.dataStoreBuffer.getTrappedIndexChanges()); + public EntityCollectionHeaderWithCollection flush() { + this.persistenceService.flushTrappedUpdates(0L, this.dataStoreBuffer.getTrappedChanges()); return this.catalogPersistenceService.flush( 0L, this.headerInfoSupplier, this.persistenceService.getEntityCollectionHeader(), this.dataStoreBuffer ) - .map(EntityCollectionPersistenceService::getEntityCollectionHeader) - .orElseGet(this::getEntityCollectionHeader); + .map( + it -> { + final EntityCollectionHeader theHeader = it.getEntityCollectionHeader(); + return this.persistenceService == it ? + new EntityCollectionHeaderWithCollection(theHeader, this) : + new EntityCollectionHeaderWithCollection( + theHeader, + this.createCopyWithNewPersistenceService(theHeader.version(), CatalogState.WARMING_UP, it) + ); + } + ) + .orElseGet( + () -> new EntityCollectionHeaderWithCollection(this.getEntityCollectionHeader(), this) + ); } /** @@ -1299,7 +1311,11 @@ public EntityCollection createCopyWithMergedTransactionalMemory(@Nullable DataSt * @return a new EntityCollection object with the updated persistence service */ @Nonnull - public EntityCollection createCopyWithNewPersistenceService(long catalogVersion, @Nonnull CatalogState catalogState, @Nonnull EntityCollectionPersistenceService newPersistenceService) { + public EntityCollection createCopyWithNewPersistenceService( + long catalogVersion, + @Nonnull CatalogState catalogState, + @Nonnull EntityCollectionPersistenceService newPersistenceService + ) { final EntityCollection entityCollection = new EntityCollection( catalogVersion, catalogState, @@ -1385,7 +1401,7 @@ EntityCollectionHeader getEntityCollectionHeader() { */ @Nonnull EntityCollectionHeader flush(long catalogVersion) { - this.persistenceService.flushTrappedUpdates(catalogVersion, this.dataStoreBuffer.getTrappedIndexChanges()); + this.persistenceService.flushTrappedUpdates(catalogVersion, this.dataStoreBuffer.getTrappedChanges()); return this.catalogPersistenceService.flush( catalogVersion, this.headerInfoSupplier, @@ -2054,4 +2070,14 @@ public List getIndexKeys() { .collect(Collectors.toList()); } } + + /** + * The EntityCollectionHeaderWithCollection record encapsulates both an EntityCollectionHeader and an EntityCollection. + * It's used to detect whether it's needed to replace collection instance in the catalog index. + */ + public record EntityCollectionHeaderWithCollection( + @Nonnull EntityCollectionHeader header, + @Nonnull EntityCollection collection + ) {} + } diff --git a/evita_engine/src/main/java/io/evitadb/core/LocalMutationExecutorCollector.java b/evita_engine/src/main/java/io/evitadb/core/LocalMutationExecutorCollector.java index 3212a10c9..1966a8192 100644 --- a/evita_engine/src/main/java/io/evitadb/core/LocalMutationExecutorCollector.java +++ b/evita_engine/src/main/java/io/evitadb/core/LocalMutationExecutorCollector.java @@ -155,6 +155,11 @@ public EntityWithFetchCount execute( // are automatically generated when top level mutation is applied (replayed) if (level == 0) { this.entityMutations.add(entityMutation); + // root level changes are applied immediately + changeCollector.setTrapChanges(false); + } else { + // while implicit mutations are trapped in memory and stored on next flush + changeCollector.setTrapChanges(true); } // apply mutations using applicators diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreChanges.java b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreChanges.java index a1fbc0c91..665668de8 100644 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreChanges.java +++ b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreChanges.java @@ -23,57 +23,310 @@ package io.evitadb.core.buffer; +import com.carrotsearch.hppc.LongObjectHashMap; +import com.carrotsearch.hppc.LongObjectMap; +import com.carrotsearch.hppc.cursors.LongObjectCursor; +import io.evitadb.core.EntityCollection; +import io.evitadb.index.EntityIndex; +import io.evitadb.index.Index; +import io.evitadb.index.IndexKey; import io.evitadb.store.model.StoragePart; import io.evitadb.store.service.KeyCompressor; import io.evitadb.store.spi.StoragePartPersistenceService; +import io.evitadb.store.spi.model.storageParts.StoragePartKey; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import java.io.Serial; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Optional.ofNullable; /** * This class is used as transactional memory of the {@link DataStoreChanges} and stores the changes of the storage * keys directly to the target {@link StoragePartPersistenceService}, but traps the changes in the indexes in the memory - * buffer. + * buffer. It provides methods to get, create, remove, and track modifications to indexes. The changes are cached in + * memory and can be persisted to the storage using the {@link #popTrappedUpdates()} method. + * + * This mechanism allows to buffer frequent changes in indexes whose persistence is costly and flush the changes once in + * a while to the persistent storage. * - * @see DataStoreIndexMemoryBuffer * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2021 + * @see DataStoreMemoryBuffer */ @NotThreadSafe -public class DataStoreChanges extends DataStoreIndexMemoryBuffer { +public class DataStoreChanges { + /** + * This map contains index of "dirty" entity indexes - i.e. subset of {@link EntityCollection indexes} that were + * modified and not yet persisted. + */ + private Map> dirtyEntityIndexes = new HashMap<>(64); /** * Contains reference to the I/O service, that allows reading/writing records to the persistent storage. */ - @Nonnull private final StoragePartPersistenceService persistenceService; + @Nonnull private StoragePartPersistenceService persistenceService; + /** + * This map contains index of "dirty" storage parts - i.e. subset of {@link StoragePart storage parts} that were + * modified and not yet persisted. Usually the storage parts are stored directly in the persistent storage but + * the issue #689 revealed that it's beneficial to + * store some of them in memory and flush them once in a while to the persistent storage. + */ + private Map, LongObjectMap> trappedChanges; public DataStoreChanges(@Nonnull StoragePartPersistenceService persistenceService) { this.persistenceService = persistenceService; } + /** + * Allows exchanging the persistence service for this memory buffer in case of internal store compaction. + * + * @param persistenceService the persistence service to be used for storing data + */ + public void setPersistenceService(@Nonnull StoragePartPersistenceService persistenceService) { + this.persistenceService = persistenceService; + } + + /** + * Returns set containing {@link StoragePartKey keys} that lead to the data structures in memory that were modified + * (are dirty) and needs to be persisted into the persistent storage. This is performance optimization that minimizes + * I/O operations for frequently changed data structures such as indexes and these are stored once in a while in + * the moments when it has a sense. + */ + @Nonnull + public Stream popTrappedUpdates() { + final Map> theDirtyEntityIndexes = this.dirtyEntityIndexes; + this.dirtyEntityIndexes = new HashMap<>(64); + final Stream dirtyIndexesStream = theDirtyEntityIndexes + .values() + .stream() + .flatMap(it -> it.getModifiedStorageParts().stream()); + final Map, LongObjectMap> theTrappedChanges = this.trappedChanges; + this.trappedChanges = null; + return theTrappedChanges == null ? + dirtyIndexesStream : + Stream.concat( + theTrappedChanges.values() + .stream() + .flatMap(map -> StreamSupport.stream(map.values().spliterator(), false).map(it -> it.value)), + dirtyIndexesStream + ); + } + + /** + * Returns a KeyCompressor that contains indexes of keys assigned to key-comparable objects which are expensive + * to store redundantly during serialization. + * + * @return a read-only KeyCompressor instance to be used for key compression. + */ @Nonnull public KeyCompressor getReadOnlyKeyCompressor() { return this.persistenceService.getReadOnlyKeyCompressor(); } + /** + * Retrieves a storage part from the local trapped changes cache if available, otherwise fetches it from the persistence service. + * + * @param catalogVersion the current version of the catalog to read from + * @param primaryKey primary key of the storage part to retrieve + * @param containerType class type of the storage part container + * @param type of the storage part container + * @return the storage part if found, otherwise null + */ @Nullable public T getStoragePart(long catalogVersion, long primaryKey, @Nonnull Class containerType) { + if (this.trappedChanges != null) { + final LongObjectMap trappedChanges = this.trappedChanges.get(containerType); + if (trappedChanges != null) { + final StoragePart storagePart = trappedChanges.get(primaryKey); + if (storagePart != null) { + return storagePart instanceof RemovedStoragePart ? + null : + containerType.cast(storagePart); + } + } + } return this.persistenceService.getStoragePart(catalogVersion, primaryKey, containerType); } + /** + * Retrieves a storage part as a binary array. The storage part is first searched for in the local trapped changes + * cache. If found, it is serialized and returned unless it is a {@link RemovedStoragePart}; in which case, null is returned. + * If not found in the cache, it fetches the storage part from the persistence service and returns it as a binary array. + * + * @param catalogVersion the current version of the catalog to read from + * @param primaryKey primary key of the storage part to retrieve + * @param containerType class type of the storage part container + * @param type of the storage part container + * @return byte array representing the storage part if found, otherwise null + */ @Nullable public byte[] getStoragePartAsBinary(long catalogVersion, long primaryKey, @Nonnull Class containerType) { + if (this.trappedChanges != null) { + final LongObjectMap trappedChanges = this.trappedChanges.get(containerType); + if (trappedChanges != null) { + final StoragePart storagePart = trappedChanges.get(primaryKey); + if (storagePart != null) { + return storagePart instanceof RemovedStoragePart ? + null : + this.persistenceService.serializeStoragePart(storagePart); + } + } + } return this.persistenceService.getStoragePartAsBinary(catalogVersion, primaryKey, containerType); } + /** + * Removes a storage part identified by the given catalog version, primary key, and entity class. + * + * @param catalogVersion the version of the catalog to modify + * @param primaryKey the primary key of the storage part to remove + * @param entityClass the class type of the storage part to remove + * @param the type of the storage part + * @return true if the storage part was successfully removed, false otherwise + */ public boolean removeStoragePart(long catalogVersion, long primaryKey, @Nonnull Class entityClass) { + if (this.trappedChanges != null) { + ofNullable(this.trappedChanges.get(entityClass)).ifPresent(it -> it.remove(primaryKey)); + } return this.persistenceService.removeStoragePart(catalogVersion, primaryKey, entityClass); } + public boolean trapRemoveStoragePart(long catalogVersion, long primaryKey, @Nonnull Class entityClass) { + this.trappedChanges = this.trappedChanges == null ? new HashMap<>(64) : this.trappedChanges; + if (this.persistenceService.containsStoragePart(catalogVersion, primaryKey, entityClass)) { + this.trappedChanges.computeIfAbsent(entityClass, aClass -> new LongObjectHashMap<>(256)) + .put( + primaryKey, + new RemovedStoragePart(entityClass, primaryKey) + ); + return true; + } else { + return false; + } + } + + /** + * Stores the provided storage part and manages any trapped changes related to it. + * + * @param catalogVersion the current version of the catalog to write to + * @param value the storage part to store, must not be null + * @param the type of the storage part + */ public void putStoragePart(long catalogVersion, @Nonnull T value) { + if (this.trappedChanges != null) { + ofNullable(this.trappedChanges.get(value.getClass())).ifPresent(it -> it.remove(value.getStoragePartPK())); + } this.persistenceService.putStoragePart(catalogVersion, value); } + /** + * Adds the specified storage part to the local trapped changes cache. + * + * @param the type of the storage part + * @param value the storage part to be added, must not be null + */ + public void trapPutStoragePart(@Nonnull T value) { + this.trappedChanges = this.trappedChanges == null ? new HashMap<>(64) : this.trappedChanges; + final Long storagePartPK = value.getStoragePartPK(); + final Class containerType = value.getClass(); + this.trappedChanges.computeIfAbsent(containerType, aClass -> new LongObjectHashMap<>(256)) + .put(storagePartPK, value); + } + + /** + * Counts the total number of storage parts of a specific type in a catalog version, + * accounting for trapped changes such as insertions and removals. + * + * @param catalogVersion the version of the catalog to count storage parts from + * @param containerType the class type of the storage part containers to count + * @return the total number of storage parts, adjusted for trapped changes + */ public int countStorageParts(long catalogVersion, Class containerType) { - return this.persistenceService.countStorageParts(catalogVersion, containerType); + final int storedCount = this.persistenceService.countStorageParts(catalogVersion, containerType); + if (this.trappedChanges == null || this.trappedChanges.isEmpty()) { + return storedCount; + } else { + final LongObjectMap trappedChanges = this.trappedChanges.get(containerType); + if (trappedChanges == null) { + return storedCount; + } else { + int inserts = 0; + int removals = 0; + for (LongObjectCursor trappedChange : trappedChanges) { + if (trappedChange.value instanceof RemovedStoragePart) { + removals++; + } else if (!this.persistenceService.containsStoragePart(catalogVersion, trappedChange.key, containerType)) { + inserts++; + } + } + return storedCount + inserts - removals; + } + } + } + + /** + * Method checks and returns the requested index from the local "dirty" memory. If it isn't there, it's fetched + * using `accessorWhenMissing` lambda and stores into the "dirty" memory before returning. + */ + @Nonnull + public > I getOrCreateIndexForModification(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing) { + //noinspection unchecked,rawtypes + return (I) dirtyEntityIndexes.computeIfAbsent( + indexKey, (Function) accessorWhenMissing + ); + } + + /** + * Method checks and returns the requested index from the local "dirty" memory. If it isn't there, it's fetched + * using `accessorWhenMissing` and returned without adding to "dirty" memory. + */ + @Nullable + public > I getIndexIfExists(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing) { + //noinspection unchecked + return ofNullable((I)dirtyEntityIndexes.get(indexKey)) + .orElseGet(() -> accessorWhenMissing.apply(indexKey)); + } + + /** + * Removes {@link EntityIndex} from the change set. After removal (either successfully or unsuccessful) + * `removalPropagation` function is called to propagate deletion to the origin collection. + */ + @Nonnull + public > I removeIndex(@Nonnull IK entityIndexKey, @Nonnull Function removalPropagation) { + //noinspection unchecked + final I dirtyIndexesRemoval = (I) dirtyEntityIndexes.remove(entityIndexKey); + final I baseIndexesRemoval = removalPropagation.apply(entityIndexKey); + return ofNullable(dirtyIndexesRemoval).orElse(baseIndexesRemoval); } + + /** + * RemovedStoragePart is a specific implementation of the StoragePart interface which represents a part of storage + * that should be removed. + * + * @param containerType the type of the container that was removed + * @param storagePartPK the primary key of the storage part that was removed + */ + public record RemovedStoragePart( + @Nonnull Class containerType, + long storagePartPK + ) implements StoragePart { + @Serial private static final long serialVersionUID = -3939591252705809288L; + + @Nullable + @Override + public Long getStoragePartPK() { + return this.storagePartPK; + } + + @Override + public long computeUniquePartIdAndSet(@Nonnull KeyCompressor keyCompressor) { + return this.storagePartPK; + } + } + } diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexChanges.java b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexChanges.java deleted file mode 100644 index 8bd52c8ba..000000000 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexChanges.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * _ _ ____ ____ - * _____ _(_) |_ __ _| _ \| __ ) - * / _ \ \ / / | __/ _` | | | | _ \ - * | __/\ V /| | || (_| | |_| | |_) | - * \___| \_/ |_|\__\__,_|____/|____/ - * - * Copyright (c) 2024 - * - * Licensed under the Business Source License, Version 1.1 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://github.com/FgForrest/evitaDB/blob/master/LICENSE - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.evitadb.core.buffer; - -import io.evitadb.index.EntityIndex; -import io.evitadb.index.Index; -import io.evitadb.index.IndexKey; -import io.evitadb.store.model.StoragePart; -import io.evitadb.store.spi.model.storageParts.StoragePartKey; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.function.Function; -import java.util.stream.Stream; - -/** - * This interface represents a collection of changes to the index data structures stored in memory. It provides methods7 - * to get, create, remove, and track modifications to indexes. The changes are cached in memory and can be persisted to - * the storage using the {@link #popTrappedUpdates()} method. - * - * This mechanism allows to buffer frequent changes in indexes whose persistence is costly and flush the changes once in - * a while to the persistent storage. - * - * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 - */ -public interface DataStoreIndexChanges { - - /** - * Returns set containing {@link StoragePartKey keys} that lead to the data structures in memory that were modified - * (are dirty) and needs to be persisted into the persistent storage. This is performance optimization that minimizes - * I/O operations for frequently changed data structures such as indexes and these are stored once in a while in - * the moments when it has a sense. - */ - @Nonnull - Stream popTrappedUpdates(); - - /** - * Method checks and returns the requested index from the local "dirty" memory. If it isn't there, it's fetched - * using `accessorWhenMissing` lambda and stores into the "dirty" memory before returning. - */ - @Nonnull - > I getOrCreateIndexForModification(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing); - - /** - * Method checks and returns the requested index from the local "dirty" memory. If it isn't there, it's fetched - * using `accessorWhenMissing` and returned without adding to "dirty" memory. - */ - @Nullable - > I getIndexIfExists(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing); - - /** - * Removes {@link EntityIndex} from the change set. After removal (either successfully or unsuccessful) - * `removalPropagation` function is called to propagate deletion to the origin collection. - */ - @Nonnull - > I removeIndex(@Nonnull IK entityIndexKey, @Nonnull Function removalPropagation); - -} diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexMemoryBuffer.java b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexMemoryBuffer.java deleted file mode 100644 index adfffecbe..000000000 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreIndexMemoryBuffer.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * _ _ ____ ____ - * _____ _(_) |_ __ _| _ \| __ ) - * / _ \ \ / / | __/ _` | | | | _ \ - * | __/\ V /| | || (_| | |_| | |_) | - * \___| \_/ |_|\__\__,_|____/|____/ - * - * Copyright (c) 2023-2024 - * - * Licensed under the Business Source License, Version 1.1 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://github.com/FgForrest/evitaDB/blob/master/LICENSE - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.evitadb.core.buffer; - -import io.evitadb.core.EntityCollection; -import io.evitadb.index.Index; -import io.evitadb.index.IndexKey; -import io.evitadb.store.model.StoragePart; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Stream; - -import static java.util.Optional.ofNullable; - -/** - * This class contains all trapped changes in the local memory data structures. The reason for this is the cost of - * storing indexes with each change to persistent storage - so we collect them in memory first and store them once in - * {@link #popTrappedUpdates()} method. - */ -@NotThreadSafe -public class DataStoreIndexMemoryBuffer implements DataStoreIndexChanges { - /** - * This map contains index of "dirty" entity indexes - i.e. subset of {@link EntityCollection indexes} that were - * modified and not yet persisted. - */ - private Map> dirtyEntityIndexes = new HashMap<>(64); - - @Override - @Nonnull - public Stream popTrappedUpdates() { - final Map> dirtyEntityIndexes = this.dirtyEntityIndexes; - this.dirtyEntityIndexes = new HashMap<>(64); - return dirtyEntityIndexes - .values() - .stream() - .flatMap(it -> it.getModifiedStorageParts().stream()); - } - - @Override - @Nonnull - public > I getOrCreateIndexForModification(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing) { - //noinspection unchecked,rawtypes - return (I) dirtyEntityIndexes.computeIfAbsent( - indexKey, (Function) accessorWhenMissing - ); - } - - @Override - @Nullable - public > I getIndexIfExists(@Nonnull IK indexKey, @Nonnull Function accessorWhenMissing) { - //noinspection unchecked - return ofNullable((I)dirtyEntityIndexes.get(indexKey)) - .orElseGet(() -> accessorWhenMissing.apply(indexKey)); - } - - @Override - @Nonnull - public > I removeIndex(@Nonnull IK entityIndexKey, @Nonnull Function removalPropagation) { - //noinspection unchecked - final I dirtyIndexesRemoval = (I) dirtyEntityIndexes.remove(entityIndexKey); - final I baseIndexesRemoval = removalPropagation.apply(entityIndexKey); - return ofNullable(dirtyIndexesRemoval).orElse(baseIndexesRemoval); - } - -} diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreMemoryBuffer.java b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreMemoryBuffer.java index f234f4a50..5584c98ac 100644 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreMemoryBuffer.java +++ b/evita_engine/src/main/java/io/evitadb/core/buffer/DataStoreMemoryBuffer.java @@ -34,7 +34,7 @@ /** * DataStoreMemoryBuffer represents volatile temporal memory between the {@link EntityCollection} and persistent - * storage that keeps frequently changed data in the {@link DataStoreIndexMemoryBuffer}. + * storage that keeps frequently changed data in the {@link DataStoreMemoryBuffer}. * * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ @@ -61,6 +61,12 @@ public interface DataStoreMemoryBuffer extends DataStoreReader { */ boolean removeByPrimaryKey(long catalogVersion, long primaryKey, @Nonnull Class entityClass); + /** + * Removes container from the target storage in the memory only. Changes are written at the moment when + * buffer is flushed and {@link #getTrappedChanges()} is called. + */ + boolean trapRemoveByPrimaryKey(long catalogVersion, long primaryKey, @Nonnull Class entityClass); + /** * Inserts or updates container in the target storage. If transaction is opened, the changes are written only in * the transactional layer and are not really written to the persistent storage. Changes are written at the moment @@ -68,8 +74,15 @@ public interface DataStoreMemoryBuffer extends DataStoreReader { */ void update(long catalogVersion, @Nonnull T value); + /** + * Inserts or updates container in the target storage in the memory only. Changes are written at the moment when + * buffer is flushed and {@link #getTrappedChanges()} is called. + */ + void trapUpdate(long catalogVersion, @Nonnull T value); + /** * Method returns current buffer with trapped changes. */ - DataStoreIndexChanges getTrappedIndexChanges(); + DataStoreChanges getTrappedChanges(); + } diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/TransactionalDataStoreMemoryBuffer.java b/evita_engine/src/main/java/io/evitadb/core/buffer/TransactionalDataStoreMemoryBuffer.java index 0c903077d..8ef5e9d91 100644 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/TransactionalDataStoreMemoryBuffer.java +++ b/evita_engine/src/main/java/io/evitadb/core/buffer/TransactionalDataStoreMemoryBuffer.java @@ -44,7 +44,7 @@ /** * TransactionalDataStoreMemoryBuffer represents volatile temporal memory between the {@link EntityCollection} and persistent * storage that takes {@link io.evitadb.core.Transaction} into an account. Even if transactional memory is not available - * this buffer traps updates of certain objects in {@link DataStoreIndexMemoryBuffer} to avoid persistence of large + * this buffer traps updates of certain objects in {@link DataStoreMemoryBuffer} to avoid persistence of large * indexes with each update (which would drastically slow initial bulk database setup). * * All reads-writes are primarily targeting transactional memory if it's present for the current thread. If the value @@ -60,7 +60,7 @@ public class TransactionalDataStoreMemoryBuffer implements DataStoreMemoryBuffer /** * DTO contains all trapped changes in this memory buffer. */ - @Nonnull private final DataStoreIndexChanges dataStoreIndexChanges = new DataStoreIndexMemoryBuffer(); + @Nonnull private final DataStoreChanges dataStoreChanges; /** * Contains reference to the I/O service, that allows reading/writing records to the persistent storage. */ @@ -72,13 +72,14 @@ public TransactionalDataStoreMemoryBuffer( ) { this.transactionalMemoryDataSource = transactionalMemoryDataSource; this.persistenceService = persistenceService; + this.dataStoreChanges = new DataStoreChanges(persistenceService); } @Override public > I getOrCreateIndexForModification(@Nonnull IK entityIndexKey, @Nonnull Function accessorWhenMissing) { final DataStoreChanges layer = Transaction.getOrCreateTransactionalMemoryLayer(transactionalMemoryDataSource); if (layer == null) { - return dataStoreIndexChanges.getOrCreateIndexForModification(entityIndexKey, accessorWhenMissing); + return dataStoreChanges.getOrCreateIndexForModification(entityIndexKey, accessorWhenMissing); } else { return layer.getOrCreateIndexForModification(entityIndexKey, accessorWhenMissing); } @@ -88,7 +89,7 @@ public > I getOrCreateIndexForModificat public > I getIndexIfExists(@Nonnull IK entityIndexKey, @Nonnull Function accessorWhenMissing) { final DataStoreChanges layer = getTransactionalMemoryLayerIfExists(transactionalMemoryDataSource); if (layer == null) { - return dataStoreIndexChanges.getIndexIfExists(entityIndexKey, accessorWhenMissing); + return dataStoreChanges.getIndexIfExists(entityIndexKey, accessorWhenMissing); } else { return layer.getIndexIfExists(entityIndexKey, accessorWhenMissing); } @@ -98,7 +99,7 @@ public > I getIndexIfExists(@Nonnull IK public > I removeIndex(@Nonnull IK entityIndexKey, @Nonnull Function removalPropagation) { final DataStoreChanges layer = getTransactionalMemoryLayerIfExists(transactionalMemoryDataSource); if (layer == null) { - return dataStoreIndexChanges.removeIndex(entityIndexKey, removalPropagation); + return dataStoreChanges.removeIndex(entityIndexKey, removalPropagation); } else { return layer.removeIndex(entityIndexKey, removalPropagation); } @@ -193,11 +194,31 @@ public void update(long catalogVersion, @Nonnull T value } @Override - public DataStoreIndexChanges getTrappedIndexChanges() { + public boolean trapRemoveByPrimaryKey(long catalogVersion, long primaryKey, @Nonnull Class entityClass) { + final DataStoreChanges layer = Transaction.getOrCreateTransactionalMemoryLayer(transactionalMemoryDataSource); + if (layer == null) { + return this.dataStoreChanges.trapRemoveStoragePart(catalogVersion, primaryKey, entityClass); + } else { + return layer.trapRemoveStoragePart(catalogVersion, primaryKey, entityClass); + } + } + + @Override + public void trapUpdate(long catalogVersion, @Nonnull T value) { + final DataStoreChanges layer = Transaction.getOrCreateTransactionalMemoryLayer(transactionalMemoryDataSource); + if (layer == null) { + this.dataStoreChanges.trapPutStoragePart(value); + } else { + layer.trapPutStoragePart(value); + } + } + + @Override + public DataStoreChanges getTrappedChanges() { final DataStoreChanges layer = Transaction.getTransactionalMemoryLayerIfExists(transactionalMemoryDataSource); // return current transactional layer that contains trapped updates // or fallback to shared memory buffer with trapped updates - return Objects.requireNonNullElse(layer, this.dataStoreIndexChanges); + return Objects.requireNonNullElse(layer, this.dataStoreChanges); } } diff --git a/evita_engine/src/main/java/io/evitadb/core/buffer/WarmUpDataStoreMemoryBuffer.java b/evita_engine/src/main/java/io/evitadb/core/buffer/WarmUpDataStoreMemoryBuffer.java index 1e08ed489..468ec7e78 100644 --- a/evita_engine/src/main/java/io/evitadb/core/buffer/WarmUpDataStoreMemoryBuffer.java +++ b/evita_engine/src/main/java/io/evitadb/core/buffer/WarmUpDataStoreMemoryBuffer.java @@ -52,17 +52,12 @@ public class WarmUpDataStoreMemoryBuffer implements DataStoreMemoryBuffer { /** * DTO contains all trapped changes in this memory buffer. */ - @Nonnull private final DataStoreIndexChanges dataStoreIndexChanges = new DataStoreIndexMemoryBuffer(); - /** - * Contains reference to the I/O service, that allows reading/writing records to the persistent storage. - * This reference can be exchanged in case of internal store compaction. - */ - @Nonnull private StoragePartPersistenceService persistenceService; + @Nonnull private final DataStoreChanges dataStoreChanges; public WarmUpDataStoreMemoryBuffer( @Nonnull StoragePartPersistenceService persistenceService ) { - this.persistenceService = persistenceService; + this.dataStoreChanges = new DataStoreChanges(persistenceService); } /** @@ -71,53 +66,53 @@ public WarmUpDataStoreMemoryBuffer( * @param persistenceService new persistence service to be used */ public void setPersistenceService(@Nonnull StoragePartPersistenceService persistenceService) { - this.persistenceService = persistenceService; + this.dataStoreChanges.setPersistenceService(persistenceService); } @Override public > I getOrCreateIndexForModification(@Nonnull IK entityIndexKey, @Nonnull Function accessorWhenMissing) { - return dataStoreIndexChanges.getOrCreateIndexForModification(entityIndexKey, accessorWhenMissing); + return dataStoreChanges.getOrCreateIndexForModification(entityIndexKey, accessorWhenMissing); } @Override public > I getIndexIfExists(@Nonnull IK entityIndexKey, @Nonnull Function accessorWhenMissing) { - return dataStoreIndexChanges.getIndexIfExists(entityIndexKey, accessorWhenMissing); + return dataStoreChanges.getIndexIfExists(entityIndexKey, accessorWhenMissing); } @Override public > I removeIndex(@Nonnull IK entityIndexKey, @Nonnull Function removalPropagation) { - return dataStoreIndexChanges.removeIndex(entityIndexKey, removalPropagation); + return dataStoreChanges.removeIndex(entityIndexKey, removalPropagation); } @Override public int countStorageParts(long catalogVersion, @Nonnull Class containerType) { - return persistenceService.countStorageParts(catalogVersion, containerType); + return dataStoreChanges.countStorageParts(catalogVersion, containerType); } @Override @Nullable public T fetch(long catalogVersion, long primaryKey, @Nonnull Class containerType) { - return persistenceService.getStoragePart(catalogVersion, primaryKey, containerType); + return dataStoreChanges.getStoragePart(catalogVersion, primaryKey, containerType); } @Override @Nullable public byte[] fetchBinary(long catalogVersion, long primaryKey, @Nonnull Class containerType) { - return persistenceService.getStoragePartAsBinary(catalogVersion, primaryKey, containerType); + return dataStoreChanges.getStoragePartAsBinary(catalogVersion, primaryKey, containerType); } @Override @Nullable public > T fetch(long catalogVersion, @Nonnull U originalKey, @Nonnull Class containerType, @Nonnull BiFunction compressedKeyComputer) { final OptionalLong storagePartId = compressedKeyComputer.apply( - this.persistenceService.getReadOnlyKeyCompressor(), + this.dataStoreChanges.getReadOnlyKeyCompressor(), originalKey ); if (storagePartId.isEmpty()) { // key wasn't yet assigned return null; } else { - return this.persistenceService.getStoragePart(catalogVersion, storagePartId.getAsLong(), containerType); + return this.dataStoreChanges.getStoragePart(catalogVersion, storagePartId.getAsLong(), containerType); } } @@ -125,30 +120,40 @@ public > T fetch(long catalogVers @Nullable public > byte[] fetchBinary(long catalogVersion, @Nonnull U originalKey, @Nonnull Class containerType, @Nonnull BiFunction compressedKeyComputer) { final OptionalLong storagePartId = compressedKeyComputer.apply( - this.persistenceService.getReadOnlyKeyCompressor(), + this.dataStoreChanges.getReadOnlyKeyCompressor(), originalKey ); if (storagePartId.isEmpty()) { // key wasn't yet assigned return null; } else { - return this.persistenceService.getStoragePartAsBinary(catalogVersion, storagePartId.getAsLong(), containerType); + return this.dataStoreChanges.getStoragePartAsBinary(catalogVersion, storagePartId.getAsLong(), containerType); } } @Override public boolean removeByPrimaryKey(long catalogVersion, long primaryKey, @Nonnull Class entityClass) { - return this.persistenceService.removeStoragePart(catalogVersion, primaryKey, entityClass); + return this.dataStoreChanges.removeStoragePart(catalogVersion, primaryKey, entityClass); } @Override public void update(long catalogVersion, @Nonnull T value) { - this.persistenceService.putStoragePart(catalogVersion, value); + this.dataStoreChanges.putStoragePart(catalogVersion, value); + } + + @Override + public boolean trapRemoveByPrimaryKey(long catalogVersion, long primaryKey, @Nonnull Class entityClass) { + return this.dataStoreChanges.trapRemoveStoragePart(catalogVersion, primaryKey, entityClass); + } + + @Override + public void trapUpdate(long catalogVersion, @Nonnull T value) { + this.dataStoreChanges.trapPutStoragePart(value); } @Override - public DataStoreIndexChanges getTrappedIndexChanges() { - return this.dataStoreIndexChanges; + public DataStoreChanges getTrappedChanges() { + return this.dataStoreChanges; } } diff --git a/evita_engine/src/main/java/io/evitadb/index/attribute/FilterIndex.java b/evita_engine/src/main/java/io/evitadb/index/attribute/FilterIndex.java index 6ca22c182..1f1a2fb7d 100644 --- a/evita_engine/src/main/java/io/evitadb/index/attribute/FilterIndex.java +++ b/evita_engine/src/main/java/io/evitadb/index/attribute/FilterIndex.java @@ -213,9 +213,10 @@ public FilterIndex(@Nonnull AttributeKey attributeKey, @Nonnull Class attribu this.attributeKey = attributeKey; this.attributeType = attributeType; this.dirty = new TransactionalBoolean(); - this.rangeIndex = Range.class.isAssignableFrom(attributeType) ? new RangeIndex() : null; - this.comparator = getComparator(attributeKey, attributeType); - this.normalizer = getNormalizer(attributeType); + final Class plainType = attributeType.isArray() ? attributeType.getComponentType() : attributeType; + this.rangeIndex = Range.class.isAssignableFrom(plainType) ? new RangeIndex() : null; + this.comparator = getComparator(attributeKey, plainType); + this.normalizer = getNormalizer(plainType); this.invertedIndex = new InvertedIndex<>(this.comparator); } @@ -229,8 +230,9 @@ public > FilterIndex( this.attributeType = attributeType; this.dirty = new TransactionalBoolean(); this.rangeIndex = rangeIndex; - this.comparator = getComparator(attributeKey, attributeType); - this.normalizer = getNormalizer(attributeType); + final Class plainType = attributeType.isArray() ? attributeType.getComponentType() : attributeType; + this.comparator = getComparator(attributeKey, plainType); + this.normalizer = getNormalizer(plainType); this.invertedIndex = new InvertedIndex<>(valueToRecords, (Comparator) this.comparator); } @@ -246,8 +248,9 @@ public > FilterIndex( this.attributeType = attributeType; this.dirty = new TransactionalBoolean(); this.rangeIndex = rangeIndex; - this.comparator = getComparator(attributeKey, attributeType); - this.normalizer = getNormalizer(attributeType); + final Class plainType = attributeType.isArray() ? attributeType.getComponentType() : attributeType; + this.comparator = getComparator(attributeKey, plainType); + this.normalizer = getNormalizer(plainType); if (updateSortedValues) { if (this.normalizer != NO_NORMALIZATION) { for (int i = 0; i < valueToRecords.length; i++) { diff --git a/evita_engine/src/main/java/io/evitadb/index/mutation/storagePart/ContainerizedLocalMutationExecutor.java b/evita_engine/src/main/java/io/evitadb/index/mutation/storagePart/ContainerizedLocalMutationExecutor.java index c29605f1f..c3a1d158c 100644 --- a/evita_engine/src/main/java/io/evitadb/index/mutation/storagePart/ContainerizedLocalMutationExecutor.java +++ b/evita_engine/src/main/java/io/evitadb/index/mutation/storagePart/ContainerizedLocalMutationExecutor.java @@ -85,10 +85,13 @@ import io.evitadb.store.entity.model.entity.price.MinimalPriceInternalIdContainer; import io.evitadb.store.entity.model.entity.price.PriceInternalIdContainer; import io.evitadb.store.model.EntityStoragePart; +import io.evitadb.store.model.StoragePart; import io.evitadb.store.spi.model.storageParts.accessor.AbstractEntityStorageContainerAccessor; import io.evitadb.store.spi.model.storageParts.accessor.WritableEntityStorageContainerAccessor; import io.evitadb.utils.ArrayUtils; import io.evitadb.utils.Assert; +import lombok.Getter; +import lombok.Setter; import org.roaringbitmap.RoaringBitmap; import javax.annotation.Nonnull; @@ -98,6 +101,7 @@ import java.util.*; import java.util.PrimitiveIterator.OfInt; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -141,6 +145,7 @@ public final class ContainerizedLocalMutationExecutor extends AbstractEntityStor @Nonnull private final Function dataStoreReaderAccessor; private final boolean removeOnly; private final DataStoreMemoryBuffer dataStoreUpdater; + @Getter @Setter private boolean trapChanges; private EntityBodyStoragePart entityContainer; private PricesStoragePart pricesContainer; private ReferencesStoragePart initialReferencesStorageContainer; @@ -284,21 +289,30 @@ public void applyMutation(@Nonnull LocalMutation localMutation) { @Override public void commit() { + final BiConsumer remover = this.trapChanges ? + (catalogVersion, part) -> this.dataStoreUpdater.trapRemoveByPrimaryKey( + catalogVersion, + part.getStoragePartPK(), + part.getClass() + ) + : (catalogVersion, part) -> this.dataStoreUpdater.removeByPrimaryKey( + catalogVersion, + part.getStoragePartPK(), + part.getClass() + ); + final BiConsumer updater = this.trapChanges ? + this.dataStoreUpdater::trapUpdate : this.dataStoreUpdater::update; // now store all dirty containers getChangedEntityStorageParts() .forEach(part -> { if (part.isEmpty()) { - this.dataStoreUpdater.removeByPrimaryKey( - catalogVersion, - part.getStoragePartPK(), - part.getClass() - ); + remover.accept(catalogVersion, part); } else { Assert.isPremiseValid( !removeOnly, "Only removal operations are expected to happen!" ); - this.dataStoreUpdater.update(catalogVersion, part); + updater.accept(catalogVersion, part); } }); } diff --git a/evita_engine/src/main/java/io/evitadb/store/spi/CatalogPersistenceService.java b/evita_engine/src/main/java/io/evitadb/store/spi/CatalogPersistenceService.java index df14a8d23..84ec30571 100644 --- a/evita_engine/src/main/java/io/evitadb/store/spi/CatalogPersistenceService.java +++ b/evita_engine/src/main/java/io/evitadb/store/spi/CatalogPersistenceService.java @@ -38,7 +38,6 @@ import io.evitadb.api.task.ServerTask; import io.evitadb.core.Catalog; import io.evitadb.core.EntityCollection; -import io.evitadb.core.buffer.DataStoreIndexChanges; import io.evitadb.core.buffer.DataStoreMemoryBuffer; import io.evitadb.dataType.PaginatedList; import io.evitadb.exception.GenericEvitaInternalError; diff --git a/evita_engine/src/main/java/io/evitadb/store/spi/EntityCollectionPersistenceService.java b/evita_engine/src/main/java/io/evitadb/store/spi/EntityCollectionPersistenceService.java index 316d8c35d..3ee6f71cb 100644 --- a/evita_engine/src/main/java/io/evitadb/store/spi/EntityCollectionPersistenceService.java +++ b/evita_engine/src/main/java/io/evitadb/store/spi/EntityCollectionPersistenceService.java @@ -37,7 +37,7 @@ import io.evitadb.api.requestResponse.data.structure.predicate.ReferenceContractSerializablePredicate; import io.evitadb.api.requestResponse.schema.dto.EntitySchema; import io.evitadb.core.EntityCollection; -import io.evitadb.core.buffer.DataStoreIndexChanges; +import io.evitadb.core.buffer.DataStoreChanges; import io.evitadb.core.buffer.DataStoreReader; import io.evitadb.index.EntityIndex; import io.evitadb.store.model.EntityStoragePart; @@ -69,7 +69,7 @@ public non-sealed interface EntityCollectionPersistenceService extends Persisten /** * Returns current instance of {@link EntityCollectionHeader}. The header is initialized in the instance constructor - * and (because it's immutable) is exchanged with each {@link #flushTrappedUpdates(long, DataStoreIndexChanges)} + * and (because it's immutable) is exchanged with each {@link #flushTrappedUpdates(long, DataStoreChanges)} * method call. */ @Nonnull @@ -200,8 +200,8 @@ EntityIndex readEntityIndex( long getSizeOnDiskInBytes(); /** - * Closes the entity collection persistent storage. If you don't call {@link #flushTrappedUpdates(long, DataStoreIndexChanges)} - * or {@link #flushTrappedUpdates(long, DataStoreIndexChanges)} you'll lose the data in the buffers. + * Closes the entity collection persistent storage. If you don't call {@link #flushTrappedUpdates(long, DataStoreChanges)} + * or {@link #flushTrappedUpdates(long, DataStoreChanges)} you'll lose the data in the buffers. */ @Override void close(); diff --git a/evita_engine/src/main/java/io/evitadb/store/spi/PersistenceService.java b/evita_engine/src/main/java/io/evitadb/store/spi/PersistenceService.java index c7bbf9096..350bbdf79 100644 --- a/evita_engine/src/main/java/io/evitadb/store/spi/PersistenceService.java +++ b/evita_engine/src/main/java/io/evitadb/store/spi/PersistenceService.java @@ -23,7 +23,7 @@ package io.evitadb.store.spi; -import io.evitadb.core.buffer.DataStoreIndexChanges; +import io.evitadb.core.buffer.DataStoreChanges; import javax.annotation.Nonnull; import java.io.Closeable; @@ -46,7 +46,7 @@ sealed interface PersistenceService * Flushes all trapped memory data to the persistent storage. * This method doesn't take transactional memory into an account but only flushes changes for trapped updates. */ - void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreIndexChanges dataStoreIndexChanges); + void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreChanges dataStoreChanges); /** * Returns true if the persistence service is closed. diff --git a/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/ProbesProvider.java b/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/ProbesProvider.java index 244114a12..fffdcbf55 100644 --- a/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/ProbesProvider.java +++ b/evita_external_api/evita_external_api_core/src/main/java/io/evitadb/externalApi/api/system/ProbesProvider.java @@ -24,7 +24,8 @@ package io.evitadb.externalApi.api.system; import io.evitadb.api.EvitaContract; -import io.evitadb.externalApi.api.system.model.HealthProblem; +import io.evitadb.api.observability.HealthProblem; +import io.evitadb.api.observability.ReadinessState; import io.evitadb.externalApi.http.ExternalApiProviderRegistrar; import io.evitadb.externalApi.http.ExternalApiServer; @@ -94,32 +95,4 @@ record ApiState( boolean isReady ) {} - /** - * Enum representing overall readiness state of the server. - */ - enum ReadinessState { - - /** - * At least one API is not ready. - */ - STARTING, - /** - * All APIs are ready. - */ - READY, - /** - * At least one API that was ready is not ready anymore. - */ - STALLING, - /** - * Server is shutting down. None of the APIs are ready. - */ - SHUTDOWN, - /** - * Unknown state - cannot determine the state of the APIs (should not happen). - */ - UNKNOWN - - } - } diff --git a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java index 5fbea1051..6fe19f296 100644 --- a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java +++ b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java @@ -1652,7 +1652,9 @@ private T executeWithBlockingEvitaSessionService( final Timeout timeout = callTimeout.peek(); try { return executeWithEvitaSessionService( - lambda, this.evitaSessionServiceFutureStub + lambda, this.evitaSessionServiceFutureStub.withDeadlineAfter( + timeout.timeout(), timeout.timeoutUnit() + ) ).get(timeout.timeout(), timeout.timeoutUnit()); } catch (ExecutionException e) { if (e.getCause() instanceof EvitaInvalidUsageException invalidUsageException) { diff --git a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java index d8fbd4d02..39473d792 100644 --- a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java +++ b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java @@ -31,6 +31,7 @@ import io.evitadb.api.exception.FileForFetchNotFoundException; import io.evitadb.api.exception.ReadOnlyException; import io.evitadb.api.file.FileForFetch; +import io.evitadb.api.observability.ReadinessState; import io.evitadb.api.requestResponse.system.SystemStatus; import io.evitadb.api.task.Task; import io.evitadb.api.task.TaskStatus; @@ -44,7 +45,6 @@ import io.evitadb.externalApi.api.system.ProbesProvider; import io.evitadb.externalApi.api.system.ProbesProvider.ApiState; import io.evitadb.externalApi.api.system.ProbesProvider.Readiness; -import io.evitadb.externalApi.api.system.ProbesProvider.ReadinessState; import io.evitadb.externalApi.configuration.AbstractApiConfiguration; import io.evitadb.externalApi.grpc.constants.GrpcHeaders; import io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter; diff --git a/evita_external_api/evita_external_api_grpc/shared/pom.xml b/evita_external_api/evita_external_api_grpc/shared/pom.xml index 6b052c6df..5f90184ff 100644 --- a/evita_external_api/evita_external_api_grpc/shared/pom.xml +++ b/evita_external_api/evita_external_api_grpc/shared/pom.xml @@ -147,11 +147,6 @@ $1package io.evitadb.externalApi.grpc.generated; evita_api ${project.parent.version} - - ${project.parent.groupId} - evita_external_api_core - ${project.parent.version} - ${project.parent.groupId} evita_query @@ -204,9 +199,5 @@ $1package io.evitadb.externalApi.grpc.generated; armeria ${armeria.version} - - io.evitadb - evita_external_api_core - diff --git a/evita_external_api/evita_external_api_grpc/shared/src/main/java/io/evitadb/externalApi/grpc/requestResponse/EvitaEnumConverter.java b/evita_external_api/evita_external_api_grpc/shared/src/main/java/io/evitadb/externalApi/grpc/requestResponse/EvitaEnumConverter.java index fb215955d..ba6d31550 100644 --- a/evita_external_api/evita_external_api_grpc/shared/src/main/java/io/evitadb/externalApi/grpc/requestResponse/EvitaEnumConverter.java +++ b/evita_external_api/evita_external_api_grpc/shared/src/main/java/io/evitadb/externalApi/grpc/requestResponse/EvitaEnumConverter.java @@ -25,6 +25,8 @@ import io.evitadb.api.CatalogState; import io.evitadb.api.TransactionContract.CommitBehavior; +import io.evitadb.api.observability.HealthProblem; +import io.evitadb.api.observability.ReadinessState; import io.evitadb.api.query.filter.AttributeSpecialValue; import io.evitadb.api.query.order.OrderDirection; import io.evitadb.api.query.require.EmptyHierarchicalEntityBehaviour; @@ -57,8 +59,6 @@ import io.evitadb.dataType.ContainerType; import io.evitadb.exception.EvitaInternalError; import io.evitadb.exception.GenericEvitaInternalError; -import io.evitadb.externalApi.api.system.ProbesProvider.ReadinessState; -import io.evitadb.externalApi.api.system.model.HealthProblem; import io.evitadb.externalApi.grpc.generated.*; import io.evitadb.utils.NamingConvention; import lombok.AccessLevel; diff --git a/evita_external_api/evita_external_api_grpc/shared/src/main/java/module-info.java b/evita_external_api/evita_external_api_grpc/shared/src/main/java/module-info.java index cd2cb336d..5d577ac5b 100644 --- a/evita_external_api/evita_external_api_grpc/shared/src/main/java/module-info.java +++ b/evita_external_api/evita_external_api_grpc/shared/src/main/java/module-info.java @@ -33,7 +33,6 @@ requires evita.common; requires evita.api; - requires evita.external.api.core; requires evita.query; requires com.google.common; requires io.grpc; diff --git a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/EvitaJfrEventRegistry.java b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/EvitaJfrEventRegistry.java index 0ede36b13..dc90a38b2 100644 --- a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/EvitaJfrEventRegistry.java +++ b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/EvitaJfrEventRegistry.java @@ -465,6 +465,19 @@ public class EvitaJfrEventRegistry { } ) ); + // Events related to method profiling + JDK_EVENT_GROUPS.put( + "MethodProfiling", + new JdkEventGroup( + "MethodProfiling", + "JDK - Method profiling", + "Events related to performance profiling.", + new String[]{ + "jdk.ExecutionSample", + "jdk.MethodProfiling" + } + ) + ); final String[] unknownJdkEvents = FlightRecorder.getFlightRecorder() .getEventTypes() diff --git a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/ObservabilityProbesDetector.java b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/ObservabilityProbesDetector.java index 3e5a2a867..7e1b0b922 100644 --- a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/ObservabilityProbesDetector.java +++ b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/metric/ObservabilityProbesDetector.java @@ -24,10 +24,11 @@ package io.evitadb.externalApi.observability.metric; import io.evitadb.api.EvitaContract; +import io.evitadb.api.observability.HealthProblem; +import io.evitadb.api.observability.ReadinessState; import io.evitadb.core.Evita; import io.evitadb.core.async.ObservableExecutorService; import io.evitadb.externalApi.api.system.ProbesProvider; -import io.evitadb.externalApi.api.system.model.HealthProblem; import io.evitadb.externalApi.http.ExternalApiProvider; import io.evitadb.externalApi.http.ExternalApiProviderRegistrar; import io.evitadb.externalApi.http.ExternalApiServer; diff --git a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/task/JfrRecorderTask.java b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/task/JfrRecorderTask.java index 5c746b254..d45294314 100644 --- a/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/task/JfrRecorderTask.java +++ b/evita_external_api/evita_external_api_observability/src/main/java/io/evitadb/externalApi/observability/task/JfrRecorderTask.java @@ -33,7 +33,6 @@ import io.evitadb.externalApi.observability.exception.JfRException; import io.evitadb.externalApi.observability.metric.EvitaJfrEventRegistry; import io.evitadb.externalApi.observability.metric.EvitaJfrEventRegistry.EvitaEventGroup; -import io.evitadb.externalApi.observability.metric.EvitaJfrEventRegistry.JdkEventGroup; import io.evitadb.externalApi.observability.task.JfrRecorderTask.RecordingSettings; import io.evitadb.utils.StringUtils; import jdk.jfr.EventType; @@ -54,7 +53,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -125,7 +123,7 @@ private void start() { final Set allowedEvents = new HashSet<>(Arrays.asList(settings.allowedEvents())); // first disable all JDK events, that are not wanted - disableUnwantedJdkEvents(allowedEvents); + enableJdkEvents(allowedEvents); // then register all custom event groups enableEvitaEvents(allowedEvents); @@ -190,18 +188,16 @@ protected FileForFetch stopInternal() { * Disables all JDK events that are not in the allowed set. * @param allowedEvents set of allowed events */ - private void disableUnwantedJdkEvents(@Nonnull Set allowedEvents) { - final Set disabledJdkEvents = EvitaJfrEventRegistry.getJdkEventGroups() + private void enableJdkEvents(@Nonnull Set allowedEvents) { + final Set unrolledAllowedEvents = EvitaJfrEventRegistry.getJdkEventGroups() .entrySet() .stream() - .filter(entry -> !allowedEvents.contains(entry.getKey())) - .map(Entry::getValue) - .map(JdkEventGroup::events) - .flatMap(Arrays::stream) + .filter(entry -> allowedEvents.contains(entry.getKey())) + .flatMap(entry -> Arrays.stream(entry.getValue().events())) .collect(Collectors.toSet()); for (EventType eventType : FlightRecorder.getFlightRecorder().getEventTypes()) { - if (disabledJdkEvents.contains(eventType.getName())) { - this.recording.disable(eventType.getName()); + if (unrolledAllowedEvents.contains(eventType.getName())) { + this.recording.enable(eventType.getName()); } } } diff --git a/evita_external_api/evita_external_api_system/src/main/java/io/evitadb/externalApi/system/SystemProviderRegistrar.java b/evita_external_api/evita_external_api_system/src/main/java/io/evitadb/externalApi/system/SystemProviderRegistrar.java index 775c2d0de..2e6ebd648 100644 --- a/evita_external_api/evita_external_api_system/src/main/java/io/evitadb/externalApi/system/SystemProviderRegistrar.java +++ b/evita_external_api/evita_external_api_system/src/main/java/io/evitadb/externalApi/system/SystemProviderRegistrar.java @@ -31,13 +31,13 @@ import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.file.HttpFile; +import io.evitadb.api.observability.HealthProblem; +import io.evitadb.api.observability.ReadinessState; import io.evitadb.api.requestResponse.system.SystemStatus; import io.evitadb.core.Evita; import io.evitadb.exception.GenericEvitaInternalError; import io.evitadb.externalApi.api.system.ProbesProvider; import io.evitadb.externalApi.api.system.ProbesProvider.Readiness; -import io.evitadb.externalApi.api.system.ProbesProvider.ReadinessState; -import io.evitadb.externalApi.api.system.model.HealthProblem; import io.evitadb.externalApi.configuration.ApiOptions; import io.evitadb.externalApi.configuration.CertificatePath; import io.evitadb.externalApi.configuration.CertificateSettings; diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EntityByAttributeFilteringFunctionalTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EntityByAttributeFilteringFunctionalTest.java index ee60ba29f..fe3074265 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EntityByAttributeFilteringFunctionalTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EntityByAttributeFilteringFunctionalTest.java @@ -2808,6 +2808,8 @@ void shouldReturnEntitiesByAttributeNumberInRangeUsingRanges(Evita evita, List Predecessor.HEAD + faker -> ReferencedEntityPredecessor.HEAD ) .build(); @@ -432,7 +432,7 @@ DataCarrier setUpMinimal(Evita evita) { Entities.PRODUCT, Cardinality.ONE_OR_MORE, whichIs -> whichIs.indexed() - .withAttribute(ATTRIBUTE_CATEGORY_ORDER, Predecessor.class, AttributeSchemaEditor::sortable) + .withAttribute(ATTRIBUTE_CATEGORY_ORDER, ReferencedEntityPredecessor.class, AttributeSchemaEditor::sortable) ); } ); @@ -481,7 +481,7 @@ DataCarrier setUpMinimal(Evita evita) { (reference, referencedProducts) -> { final int theIndex = ArrayUtils.indexOf(reference.getReferencedPrimaryKey(), referencedProducts); return theIndex == 0 ? - Predecessor.HEAD : new Predecessor(referencedProducts[theIndex - 1]); + ReferencedEntityPredecessor.HEAD : new ReferencedEntityPredecessor(referencedProducts[theIndex - 1]); } ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaWarmUpTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaWarmUpTest.java new file mode 100644 index 000000000..da1119a29 --- /dev/null +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaWarmUpTest.java @@ -0,0 +1,244 @@ +/* + * + * _ _ ____ ____ + * _____ _(_) |_ __ _| _ \| __ ) + * / _ \ \ / / | __/ _` | | | | _ \ + * | __/\ V /| | || (_| | |_| | |_) | + * \___| \_/ |_|\__\__,_|____/|____/ + * + * Copyright (c) 2023-2024 + * + * Licensed under the Business Source License, Version 1.1 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/FgForrest/evitaDB/blob/master/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.evitadb.api; + +import com.github.javafaker.Faker; +import io.evitadb.api.configuration.EvitaConfiguration; +import io.evitadb.api.configuration.ServerOptions; +import io.evitadb.api.configuration.StorageOptions; +import io.evitadb.api.configuration.ThreadPoolOptions; +import io.evitadb.api.query.order.OrderDirection; +import io.evitadb.api.requestResponse.schema.AttributeSchemaEditor; +import io.evitadb.api.requestResponse.schema.Cardinality; +import io.evitadb.api.requestResponse.schema.OrderBehaviour; +import io.evitadb.api.requestResponse.schema.SealedEntitySchema; +import io.evitadb.api.requestResponse.schema.SortableAttributeCompoundSchemaContract.AttributeElement; +import io.evitadb.core.Evita; +import io.evitadb.dataType.Predecessor; +import io.evitadb.test.Entities; +import io.evitadb.test.EvitaTestSupport; +import io.evitadb.test.generator.DataGenerator; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import java.nio.file.Path; +import java.util.function.BiFunction; + +import static io.evitadb.test.generator.DataGenerator.ATTRIBUTE_CODE; +import static io.evitadb.test.generator.DataGenerator.ATTRIBUTE_URL; + +/** + * This test contains various integration tests for {@link Evita}. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2021 + */ +@Slf4j +class EvitaWarmUpTest implements EvitaTestSupport { + public static final String DIR_EVITA_TEST = "evitaTest"; + private static final String ATTRIBUTE_ORDER = "order"; + private static final String ATTRIBUTE_CATEGORY_ORDER = "categoryOrder"; + private static final String REFERENCE_CATEGORY_PRODUCTS = "products"; + private static final String ATTRIBUTE_CATEGORY_MARKET = "market"; + private static final String ATTRIBUTE_INCEPTION_YEAR = "inceptionYear"; + private static final String ATTRIBUTE_MARKET_INCEPTION_YEAR = "marketInceptionYear"; + private static final int SEED = 40; + private static final int PRODUCT_COUNT = 5_000; + private static final int CATEGORY_COUNT = 10; + private Evita evita; + + @BeforeEach + void setUp() { + cleanTestSubDirectoryWithRethrow(DIR_EVITA_TEST); + evita = new Evita( + getEvitaConfiguration() + ); + evita.defineCatalog(TEST_CATALOG); + } + + @AfterEach + void tearDown() { + evita.close(); + cleanTestSubDirectoryWithRethrow(DIR_EVITA_TEST); + } + + /** + * This test tries to simulate situation when there is a load of entities inserted with reference to a small amount + * of entities having reflected reference to that entity. This scenario tries to simulate problem that was documented + * in issue #689. + */ + @Tag(LONG_RUNNING_TEST) + @Test + void shouldGenerateLoadOfDataInWarmUpPhase() { + evita.defineCatalog(TEST_CATALOG); + evita.defineCatalog("otherCatalog"); + + evita.updateCatalog( + TEST_CATALOG, + session -> { + session.updateCatalogSchema( + session.getCatalogSchema() + .openForWrite() + .withAttribute(ATTRIBUTE_CODE, String.class, whichIs -> whichIs.sortable().uniqueGlobally().nullable()) + .withAttribute(ATTRIBUTE_URL, String.class, whichIs -> whichIs.localized().uniqueGlobally().nullable()) + ); + + final DataGenerator dataGenerator = new DataGenerator.Builder() + .registerValueGenerator( + Entities.PRODUCT, ATTRIBUTE_ORDER, + faker -> Predecessor.HEAD + ) + // we need to update the order in second pass + .registerValueGenerator( + Entities.PRODUCT, ATTRIBUTE_CATEGORY_ORDER, + faker -> Predecessor.HEAD + ) + .build(); + + final BiFunction randomEntityPicker = (entityType, faker) -> { + final int entityCount = session.getEntityCollectionSize(entityType); + final int primaryKey = entityCount == 0 ? 0 : faker.random().nextInt(1, entityCount); + return primaryKey == 0 ? null : primaryKey; + }; + + // we need to create category schema first + final SealedEntitySchema categorySchema = dataGenerator.getSampleCategorySchema( + session, + schemaBuilder -> { + schemaBuilder + .withReflectedReferenceToEntity( + REFERENCE_CATEGORY_PRODUCTS, + Entities.PRODUCT, + Entities.CATEGORY, + whichIs -> whichIs + .withAttributesInherited() + .withCardinality(Cardinality.ZERO_OR_MORE) + ); + } + ); + + // then the product schema + final SealedEntitySchema productSchema = dataGenerator.getSampleProductSchema( + session, + schemaBuilder -> { + schemaBuilder + .withAttribute( + ATTRIBUTE_ORDER, Predecessor.class, + AttributeSchemaEditor::sortable + ) + .withReferenceToEntity( + Entities.CATEGORY, + Entities.CATEGORY, + Cardinality.EXACTLY_ONE, + whichIs -> whichIs.indexed() + .withAttribute(ATTRIBUTE_CATEGORY_ORDER, Predecessor.class, AttributeSchemaEditor::sortable) + .withAttribute(ATTRIBUTE_CATEGORY_MARKET, String.class, thatIs -> thatIs.nullable().sortable()) + .withAttribute(ATTRIBUTE_INCEPTION_YEAR, String.class, thatIs -> thatIs.nullable().sortable()) + .withSortableAttributeCompound( + ATTRIBUTE_MARKET_INCEPTION_YEAR, + new AttributeElement(ATTRIBUTE_CATEGORY_MARKET, OrderDirection.ASC, OrderBehaviour.NULLS_LAST), + new AttributeElement(ATTRIBUTE_INCEPTION_YEAR, OrderDirection.DESC, OrderBehaviour.NULLS_LAST) + ) + ); + // we need only category references in this test + for (String referenceName : schemaBuilder.getReferences().keySet()) { + if (!referenceName.equals(Entities.CATEGORY)) { + schemaBuilder.withoutReferenceTo(referenceName); + } + } + } + ); + + // and now data for both of them (since they are intertwined via reflected reference) + dataGenerator.generateEntities( + categorySchema, + randomEntityPicker, + SEED + ) + .limit(CATEGORY_COUNT) + .forEach(session::upsertEntity); + + dataGenerator.generateEntities( + productSchema, + (s, faker) -> faker.random().nextInt(1, CATEGORY_COUNT + 1), + SEED + ) + .limit(PRODUCT_COUNT) + .forEach(session::upsertEntity); + + session.goLiveAndClose(); + } + ); + + log.info("Set-up completed"); + + evita.replaceCatalog(TEST_CATALOG, "otherCatalog"); + + evita.queryCatalog("otherCatalog", session -> { + Assertions.assertEquals(CatalogState.ALIVE, session.getCatalogState()); + }); + } + + @Nonnull + private EvitaConfiguration getEvitaConfiguration() { + return getEvitaConfiguration(-1); + } + + @Nonnull + private EvitaConfiguration getEvitaConfiguration(int inactivityTimeoutInSeconds) { + return EvitaConfiguration.builder() + .server( + ServerOptions.builder() + .serviceThreadPool( + ThreadPoolOptions.serviceThreadPoolBuilder() + .minThreadCount(1) + .maxThreadCount(1) + .queueSize(10_000) + .build() + ) + .closeSessionsAfterSecondsOfInactivity(inactivityTimeoutInSeconds) + .build() + ) + .storage( + StorageOptions.builder() + .storageDirectory(getEvitaTestDirectory()) + .exportDirectory(getEvitaTestDirectory()) + .timeTravelEnabled(false) + .fileSizeCompactionThresholdBytes(1_000_000) + .minimalActiveRecordShare(0.8) + .build() + ) + .build(); + } + + @Nonnull + private Path getEvitaTestDirectory() { + return getTestDirectory().resolve(DIR_EVITA_TEST); + } + +} diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java index bc7e4bb83..e26da536a 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java @@ -287,9 +287,9 @@ void shouldSerializeAndDeserializeCatalogHeader() { ); final List entityHeaders = new ArrayList<>(3); - entityHeaders.add(productCollection.flush()); - entityHeaders.add(brandCollection.flush()); - entityHeaders.add(storeCollection.flush()); + entityHeaders.add(productCollection.flush().header()); + entityHeaders.add(brandCollection.flush().header()); + entityHeaders.add(storeCollection.flush().header()); // try to serialize ioService.storeHeader( @@ -726,9 +726,9 @@ private Path prepareInvalidCatalogContents() { CatalogState.WARMING_UP, 0L, 0, null, Arrays.asList( - productCollection.flush(), - brandCollection.flush(), - storeCollection.flush() + productCollection.flush().header(), + brandCollection.flush().header(), + storeCollection.flush().header() ), new WarmUpDataStoreMemoryBuffer(ioService.getStoragePartPersistenceService(0L)) ); diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationService.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationService.java index b10015df8..8a1626644 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationService.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationService.java @@ -97,7 +97,7 @@ public static long countFileOffsetTableSize( @Nonnull StorageOptions storageOptions ) { final long estimatedSize = (long) recordCount * (MEM_TABLE_RECORD_SIZE); - return estimatedSize + (long) StorageRecord.getOverheadSize() * (PREVIOUS_MEM_TABLE_FRAGMENT_POINTER_SIZE + computeExpectedRecordCount(storageOptions, recordCount).fragments()); + return estimatedSize + ((long) StorageRecord.getOverheadSize() + PREVIOUS_MEM_TABLE_FRAGMENT_POINTER_SIZE) * (long) computeExpectedRecordCount(storageOptions, recordCount).fragments(); } /** diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java index fe6c01cc8..1aa47ab15 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java @@ -49,7 +49,7 @@ import io.evitadb.core.CatalogVersionBeyondTheHorizonListener; import io.evitadb.core.EntityCollection; import io.evitadb.core.async.Scheduler; -import io.evitadb.core.buffer.DataStoreIndexChanges; +import io.evitadb.core.buffer.DataStoreChanges; import io.evitadb.core.buffer.DataStoreMemoryBuffer; import io.evitadb.core.buffer.WarmUpDataStoreMemoryBuffer; import io.evitadb.core.file.ExportFileService; @@ -266,6 +266,13 @@ protected Kryo create() { * storage part persistence service for a given catalog version. */ private long[] catalogPersistenceServiceVersions; + /** + * Contains information about cardinality of the warm-up version of the catalog - i.e. zero. This version is special + * in the sense, it may be used repeatedly (version doesn't increment with catalog flushes) and because the array + * {@link #catalogPersistenceServiceVersions} cannot contain multiple zeros, this counter is used to keep track of + * the number of times the zero version was used. + */ + private int warmUpVersionCardinality; /** * Contains the instance of {@link CatalogBootstrap} that contains the last bootstrap record that is currently used. */ @@ -862,6 +869,7 @@ public DefaultCatalogPersistenceService( ) ); this.catalogPersistenceServiceVersions = new long[]{catalogVersion}; + this.warmUpVersionCardinality = 1; if (lastCatalogBootstrap.fileLocation() == null) { this.bootstrapUsed = recordBootstrap(catalogVersion, this.catalogName, 0, null); @@ -945,6 +953,7 @@ public DefaultCatalogPersistenceService( catalogStoragePartPersistenceService ); this.catalogPersistenceServiceVersions = new long[]{catalogVersion}; + this.warmUpVersionCardinality = catalogVersion == 0 ? 1 : 0; final File restoreFlagFile = catalogStoragePath.resolve(CatalogPersistenceService.RESTORE_FLAG).toFile(); verifyCatalogNameMatches( @@ -1015,6 +1024,7 @@ private DefaultCatalogPersistenceService( catalogStoragePartPersistenceService ); this.catalogPersistenceServiceVersions = new long[]{catalogVersion}; + this.warmUpVersionCardinality = catalogVersion == 0 ? 1 : 0; final CatalogHeader catalogHeader = catalogStoragePartPersistenceService.getCatalogHeader(catalogVersion); this.entityCollectionPersistenceServices = CollectionUtils.createConcurrentHashMap( @@ -1300,7 +1310,8 @@ public Optional flush( long catalogVersion, @Nonnull HeaderInfoSupplier headerInfoSupplier, @Nonnull EntityCollectionHeader entityCollectionHeader, - @Nonnull DataStoreMemoryBuffer dataStoreBuffer) { + @Nonnull DataStoreMemoryBuffer dataStoreBuffer + ) { final CollectionFileReference collectionFileReference = new CollectionFileReference( entityCollectionHeader.entityType(), @@ -1912,9 +1923,9 @@ public boolean isNew() { } @Override - public void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreIndexChanges dataStoreIndexChanges) { + public void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreChanges dataStoreChanges) { // now store all the entity trapped updates - dataStoreIndexChanges.popTrappedUpdates() + dataStoreChanges.popTrappedUpdates() .forEach(it -> getStoragePartPersistenceService(catalogVersion).putStoragePart(catalogVersion, it)); } @@ -2058,11 +2069,23 @@ CatalogBootstrap recordBootstrap( nonFlushedBlock -> this.reportNonFlushedContents(catalogName, nonFlushedBlock), oldestRecordTimestamp -> DefaultCatalogPersistenceService.reportOldestHistoricalRecord(catalogName, oldestRecordTimestamp.orElse(null)) ); - this.catalogStoragePartPersistenceService.put( + final CatalogOffsetIndexStoragePartPersistenceService previousService = this.catalogStoragePartPersistenceService.put( catalogVersion, newPersistenceService ); - this.catalogPersistenceServiceVersions = ArrayUtils.insertLongIntoOrderedArray(catalogVersion, this.catalogPersistenceServiceVersions); + if (previousService != null) { + previousService.close(); + if (catalogVersion == 0) { + this.warmUpVersionCardinality++; + } else { + throw new GenericEvitaInternalError( + "Persistence storage instance is unexpectedly already registered!", + "Persistence storage instance for version `" + catalogVersion + "` is unexpectedly already registered!" + ); + } + } else { + this.catalogPersistenceServiceVersions = ArrayUtils.insertLongIntoOrderedArray(catalogVersion, this.catalogPersistenceServiceVersions); + } this.obsoleteFileMaintainer.removeFileWhenNotUsed( catalogVersion, @@ -2445,11 +2468,15 @@ private void removeCatalogPersistenceServiceForVersion(long catalogVersion) { lookupIndex >= 0 && lookupIndex < this.catalogPersistenceServiceVersions.length, () -> new GenericEvitaInternalError("Catalog version " + catalogVersion + " not found in the catalog persistence service versions!") ); - final long versionToRemove = this.catalogPersistenceServiceVersions[lookupIndex]; - this.catalogPersistenceServiceVersions = ArrayUtils.removeLongFromArrayOnIndex(this.catalogPersistenceServiceVersions, lookupIndex); - // remove the service and release its resources - final CatalogOffsetIndexStoragePartPersistenceService storageService = this.catalogStoragePartPersistenceService.remove(versionToRemove); - storageService.close(); + if (catalogVersion == 0 && this.warmUpVersionCardinality > 0) { + this.warmUpVersionCardinality--; + } else { + final long versionToRemove = this.catalogPersistenceServiceVersions[lookupIndex]; + this.catalogPersistenceServiceVersions = ArrayUtils.removeLongFromArrayOnIndex(this.catalogPersistenceServiceVersions, lookupIndex); + // remove the service and release its resources + final CatalogOffsetIndexStoragePartPersistenceService storageService = this.catalogStoragePartPersistenceService.remove(versionToRemove); + storageService.close(); + } } catch (InterruptedException e) { throw new GenericEvitaInternalError( "Failed to lock the catalog persistence service for catalog `" + this.catalogName + "`!", diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java index 98f024309..15e4d0446 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java @@ -50,7 +50,8 @@ import io.evitadb.core.Catalog; import io.evitadb.core.CatalogVersionBeyondTheHorizonListener; import io.evitadb.core.EntityCollection; -import io.evitadb.core.buffer.DataStoreIndexChanges; +import io.evitadb.core.buffer.DataStoreChanges; +import io.evitadb.core.buffer.DataStoreChanges.RemovedStoragePart; import io.evitadb.core.buffer.DataStoreReader; import io.evitadb.core.metric.event.storage.DataFileCompactEvent; import io.evitadb.core.metric.event.storage.FileType; @@ -840,10 +841,18 @@ public boolean isNew() { } @Override - public void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreIndexChanges dataStoreIndexChanges) { + public void flushTrappedUpdates(long catalogVersion, @Nonnull DataStoreChanges dataStoreChanges) { // now store all entity trapped updates - dataStoreIndexChanges.popTrappedUpdates() - .forEach(it -> this.storagePartPersistenceService.putStoragePart(catalogVersion, it)); + dataStoreChanges.popTrappedUpdates() + .forEach(it -> { + if (it instanceof RemovedStoragePart removedStoragePart) { + this.storagePartPersistenceService.removeStoragePart( + catalogVersion, removedStoragePart.getStoragePartPK(), removedStoragePart.containerType() + ); + } else { + this.storagePartPersistenceService.putStoragePart(catalogVersion, it); + } + }); } @Override