From b2a4f7cda93a85b3e31ce04de17950a79941e5a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Novotn=C3=BD?= Date: Thu, 6 Feb 2025 16:37:37 +0100 Subject: [PATCH] fix: updating catalog name on catalog rename --- .../main/java/io/evitadb/core/Catalog.java | 2 + .../core/traffic/TrafficRecordingEngine.java | 44 ++++++++++++++++--- .../DefaultCatalogPersistenceServiceTest.java | 18 ++++++-- .../evitadb/store/traffic/DiskRingBuffer.java | 5 ++- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/evita_engine/src/main/java/io/evitadb/core/Catalog.java b/evita_engine/src/main/java/io/evitadb/core/Catalog.java index 6878219f5..fe2c5608b 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Catalog.java +++ b/evita_engine/src/main/java/io/evitadb/core/Catalog.java @@ -558,6 +558,8 @@ public Catalog( previousCatalogVersion.getInternalSchema(), this.entitySchemaAccessor ); + + this.trafficRecordingEngine.updateCatalogName(catalogSchema.getName()); this.schema = new TransactionalReference<>(new CatalogSchemaDecorator(catalogSchema)); this.dataStoreBuffer = catalogState == CatalogState.WARMING_UP ? new WarmUpDataStoreMemoryBuffer(storagePartPersistenceService) : diff --git a/evita_engine/src/main/java/io/evitadb/core/traffic/TrafficRecordingEngine.java b/evita_engine/src/main/java/io/evitadb/core/traffic/TrafficRecordingEngine.java index e9e74b3f6..630bc7ee0 100644 --- a/evita_engine/src/main/java/io/evitadb/core/traffic/TrafficRecordingEngine.java +++ b/evita_engine/src/main/java/io/evitadb/core/traffic/TrafficRecordingEngine.java @@ -85,12 +85,11 @@ * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ @Slf4j -@RequiredArgsConstructor public class TrafficRecordingEngine implements TrafficRecordingReader { public static final String LABEL_TRACE_ID = "trace-id"; public static final String LABEL_CLIENT_ID = "client-id"; public static final String LABEL_IP_ADDRESS = "ip-address"; - private final String catalogName; + private final AtomicReference catalogName; private final StorageOptions storageOptions; @Getter private final TrafficRecordingOptions trafficOptions; private final ExportFileService exportFileService; @@ -151,20 +150,51 @@ public TrafficRecordingEngine( @Nonnull ExportFileService exportFileService, @Nonnull Scheduler scheduler ) { - this.catalogName = catalogName; + this.catalogName = new AtomicReference<>(catalogName); this.storageOptions = configuration.storage(); this.trafficOptions = configuration.server().trafficRecording(); this.exportFileService = exportFileService; this.scheduler = scheduler; - if (configuration.server().trafficRecording().enabled()) { + this.tracingContext = tracingContext; + initializeTrafficRecorder(catalogName); + } + + /** + * Initializes the traffic recorder for the specified catalog. The traffic recorder can be enabled or disabled + * based on the configuration provided. When enabled, a specific traffic recorder instance is initialized; + * otherwise, a no-operation (NoOp) traffic recorder is set. + * + * @param catalogName The name of the catalog for which the traffic recorder should be initialized. + * This parameter must not be null. + */ + private void initializeTrafficRecorder(@Nonnull String catalogName) { + final TrafficRecorder existingTrafficRecorder = this.trafficRecorder.get(); + if (existingTrafficRecorder != null) { + IOUtils.closeQuietly(existingTrafficRecorder::close); + } + if (this.trafficOptions.enabled()) { final TrafficRecorder trafficRecorderInstance = getRichTrafficRecorderIfPossible( - this.catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions + catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions ); this.trafficRecorder.set(trafficRecorderInstance); } else { this.trafficRecorder.set(NoOpTrafficRecorder.INSTANCE); } - this.tracingContext = tracingContext; + } + + /** + * Updates the catalog name and initializes the traffic recorder with the new catalog name + * if the provided name differs from the current one. + * + * @param catalogName the new name of the catalog. This value must not be null. + */ + public void updateCatalogName(@Nonnull String catalogName) { + this.catalogName.getAndUpdate(previous -> { + if (!Objects.equals(previous, catalogName)) { + initializeTrafficRecorder(catalogName); + } + return catalogName; + }); } /** @@ -181,7 +211,7 @@ public void startRecording(int samplingRate, @Nullable SessionSink sessionSink) final TrafficRecorder defaultTrafficRecorder = this.trafficRecorder.get(); if (defaultTrafficRecorder instanceof NoOpTrafficRecorder) { final TrafficRecorder richTrafficRecorderInstance = getRichTrafficRecorderIfPossible( - this.catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions + this.catalogName.get(), this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions ); this.suppressedTrafficRecorder.set(defaultTrafficRecorder); this.trafficRecorder.set(richTrafficRecorderInstance); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java index 66146c295..788f29eee 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java @@ -27,6 +27,8 @@ import com.esotericsoftware.kryo.util.Pool; import io.evitadb.api.CatalogContract; import io.evitadb.api.CatalogState; +import io.evitadb.api.configuration.EvitaConfiguration; +import io.evitadb.api.configuration.ServerOptions; import io.evitadb.api.configuration.StorageOptions; import io.evitadb.api.configuration.ThreadPoolOptions; import io.evitadb.api.configuration.TrafficRecordingOptions; @@ -205,11 +207,19 @@ private static void trimAndCheck( private static TrafficRecordingEngine createTrafficRecordingEngine(@Nonnull SealedCatalogSchema catalogSchema) { return new TrafficRecordingEngine( catalogSchema.getName(), - StorageOptions.builder().build(), - TrafficRecordingOptions.builder().build(), + DefaultTracingContext.INSTANCE, + EvitaConfiguration.builder() + .storage(StorageOptions.builder().build()) + .server( + ServerOptions.builder() + .trafficRecording( + TrafficRecordingOptions.builder() + .build() + ).build() + ) + .build(), Mockito.mock(ExportFileService.class), - Mockito.mock(Scheduler.class), - DefaultTracingContext.INSTANCE + Mockito.mock(Scheduler.class) ); } diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/DiskRingBuffer.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/DiskRingBuffer.java index a51028f58..c7bd61e0d 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/DiskRingBuffer.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/traffic/DiskRingBuffer.java @@ -544,7 +544,10 @@ public void indexData( diskBufferFileReadInputStream, reader, e -> { - throw new IndexNotReady(0); + // session would be invalid, remove it from the index + index.removeSession(sessionLocation.sequenceOrder()); + log.error("Error while reading session records: {}", e.getMessage()); + return null; } ) .forEach(tr -> index.indexRecording(sessionLocation.sequenceOrder(), tr));