Skip to content

Commit

Permalink
fix: updating catalog name on catalog rename
Browse files Browse the repository at this point in the history
  • Loading branch information
novoj committed Feb 6, 2025
1 parent 0704f03 commit b2a4f7c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
2 changes: 2 additions & 0 deletions evita_engine/src/main/java/io/evitadb/core/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ public Catalog(
previousCatalogVersion.getInternalSchema(),
this.entitySchemaAccessor
);

this.trafficRecordingEngine.updateCatalogName(catalogSchema.getName());
this.schema = new TransactionalReference<>(new CatalogSchemaDecorator(catalogSchema));
this.dataStoreBuffer = catalogState == CatalogState.WARMING_UP ?
new WarmUpDataStoreMemoryBuffer(storagePartPersistenceService) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,11 @@
* @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024
*/
@Slf4j
@RequiredArgsConstructor
public class TrafficRecordingEngine implements TrafficRecordingReader {
public static final String LABEL_TRACE_ID = "trace-id";
public static final String LABEL_CLIENT_ID = "client-id";
public static final String LABEL_IP_ADDRESS = "ip-address";
private final String catalogName;
private final AtomicReference<String> catalogName;
private final StorageOptions storageOptions;
@Getter private final TrafficRecordingOptions trafficOptions;
private final ExportFileService exportFileService;
Expand Down Expand Up @@ -151,20 +150,51 @@ public TrafficRecordingEngine(
@Nonnull ExportFileService exportFileService,
@Nonnull Scheduler scheduler
) {
this.catalogName = catalogName;
this.catalogName = new AtomicReference<>(catalogName);
this.storageOptions = configuration.storage();
this.trafficOptions = configuration.server().trafficRecording();
this.exportFileService = exportFileService;
this.scheduler = scheduler;
if (configuration.server().trafficRecording().enabled()) {
this.tracingContext = tracingContext;
initializeTrafficRecorder(catalogName);
}

/**
* Initializes the traffic recorder for the specified catalog. The traffic recorder can be enabled or disabled
* based on the configuration provided. When enabled, a specific traffic recorder instance is initialized;
* otherwise, a no-operation (NoOp) traffic recorder is set.
*
* @param catalogName The name of the catalog for which the traffic recorder should be initialized.
* This parameter must not be null.
*/
private void initializeTrafficRecorder(@Nonnull String catalogName) {
final TrafficRecorder existingTrafficRecorder = this.trafficRecorder.get();
if (existingTrafficRecorder != null) {
IOUtils.closeQuietly(existingTrafficRecorder::close);
}
if (this.trafficOptions.enabled()) {
final TrafficRecorder trafficRecorderInstance = getRichTrafficRecorderIfPossible(
this.catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions
catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions
);
this.trafficRecorder.set(trafficRecorderInstance);
} else {
this.trafficRecorder.set(NoOpTrafficRecorder.INSTANCE);
}
this.tracingContext = tracingContext;
}

/**
* Updates the catalog name and initializes the traffic recorder with the new catalog name
* if the provided name differs from the current one.
*
* @param catalogName the new name of the catalog. This value must not be null.
*/
public void updateCatalogName(@Nonnull String catalogName) {
this.catalogName.getAndUpdate(previous -> {
if (!Objects.equals(previous, catalogName)) {
initializeTrafficRecorder(catalogName);
}
return catalogName;
});
}

/**
Expand All @@ -181,7 +211,7 @@ public void startRecording(int samplingRate, @Nullable SessionSink sessionSink)
final TrafficRecorder defaultTrafficRecorder = this.trafficRecorder.get();
if (defaultTrafficRecorder instanceof NoOpTrafficRecorder) {
final TrafficRecorder richTrafficRecorderInstance = getRichTrafficRecorderIfPossible(
this.catalogName, this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions
this.catalogName.get(), this.exportFileService, this.scheduler, this.storageOptions, this.trafficOptions
);
this.suppressedTrafficRecorder.set(defaultTrafficRecorder);
this.trafficRecorder.set(richTrafficRecorderInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.esotericsoftware.kryo.util.Pool;
import io.evitadb.api.CatalogContract;
import io.evitadb.api.CatalogState;
import io.evitadb.api.configuration.EvitaConfiguration;
import io.evitadb.api.configuration.ServerOptions;
import io.evitadb.api.configuration.StorageOptions;
import io.evitadb.api.configuration.ThreadPoolOptions;
import io.evitadb.api.configuration.TrafficRecordingOptions;
Expand Down Expand Up @@ -205,11 +207,19 @@ private static void trimAndCheck(
private static TrafficRecordingEngine createTrafficRecordingEngine(@Nonnull SealedCatalogSchema catalogSchema) {
return new TrafficRecordingEngine(
catalogSchema.getName(),
StorageOptions.builder().build(),
TrafficRecordingOptions.builder().build(),
DefaultTracingContext.INSTANCE,
EvitaConfiguration.builder()
.storage(StorageOptions.builder().build())
.server(
ServerOptions.builder()
.trafficRecording(
TrafficRecordingOptions.builder()
.build()
).build()
)
.build(),
Mockito.mock(ExportFileService.class),
Mockito.mock(Scheduler.class),
DefaultTracingContext.INSTANCE
Mockito.mock(Scheduler.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,10 @@ public void indexData(
diskBufferFileReadInputStream,
reader,
e -> {
throw new IndexNotReady(0);
// session would be invalid, remove it from the index
index.removeSession(sessionLocation.sequenceOrder());
log.error("Error while reading session records: {}", e.getMessage());
return null;
}
)
.forEach(tr -> index.indexRecording(sessionLocation.sequenceOrder(), tr));
Expand Down

0 comments on commit b2a4f7c

Please sign in to comment.