From c5c1f4c825ec8a60ed1591a18ab877e5b292b7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Novotn=C3=BD?= Date: Sat, 4 Jan 2025 20:19:29 +0100 Subject: [PATCH] feat: Make fsSync configurable Explicit file system synchronisation (java.io.FileDescriptor#sync) is an expensive operation that is not necessary in all scenarios. It ensures that committed data is safely written to persistent storage, but in integration testing, for example, this kind of guarantee is useless. The measured impact of this explicit synchronisation in a test integration suite of 1400 E2E tests heavily using evitaDB resulted in 25% better performance (shorter test suite duration), which is a significant improvement. So it's a good idea to make this fsync a configurable option, enabled by default, but able to be disabled. Refs: #767 --- documentation/user/en/operate/configure.md | 9 +++ .../api/configuration/StorageOptions.java | 28 ++++++- .../api/EvitaTransactionalFunctionalTest.java | 7 +- .../DefaultCatalogPersistenceServiceTest.java | 6 +- .../DefaultIsolatedWalServiceTest.java | 3 +- .../OffsetIndexSerializationServiceTest.java | 4 +- .../store/offsetIndex/OffsetIndexTest.java | 32 ++++---- ...teOnlyOffHeapWithFileBackupHandleTest.java | 12 +-- .../CatalogWriteAheadLogIntegrationTest.java | 4 +- .../main/resources/evita-configuration.yaml | 1 + .../store/offsetIndex/OffsetIndex.java | 80 +++++++++++-------- .../offsetIndex/io/WriteOnlyFileHandle.java | 22 +++-- .../WriteOnlyOffHeapWithFileBackupHandle.java | 22 +++-- .../offsetIndex/model/VersionedValue.java | 6 +- ...setIndexStoragePartPersistenceService.java | 26 +++--- .../DefaultCatalogPersistenceService.java | 9 ++- ...ultEntityCollectionPersistenceService.java | 4 +- ...actionalStoragePartPersistenceService.java | 3 +- .../main/resources/evita-configuration.yaml | 1 + 19 files changed, 185 insertions(+), 94 deletions(-) diff --git a/documentation/user/en/operate/configure.md b/documentation/user/en/operate/configure.md index 94fa5dcf9a..cde850600b 100644 --- a/documentation/user/en/operate/configure.md +++ b/documentation/user/en/operate/configure.md @@ -41,6 +41,7 @@ storage: # [see Storage configuration]( waitOnCloseSeconds: 60 outputBufferSize: 4MB maxOpenedReadHandles: 12 + syncWrites: true computeCRC32C: true minimalActiveRecordShare: 0.5 fileSizeCompactionThresholdBytes: 100MB @@ -490,6 +491,14 @@ This section contains configuration options for the storage layer of the databas [MacOS](https://gist.github.com/tombigel/d503800a282fcadbee14b537735d202c) +
syncWrites
+
+

**Default:** `true`

+

Determines whether the storage layer forces the operating system to flush the internal buffers to disk at + regular "safe points" or not. The default is true, so that data is not lost in the event of a power failure. + There are situations where disabling this feature can improve performance and the client can accept the risk + of data loss (e.g. when running automated tests, etc.).

+
computeCRC32C

**Default:** `true`

diff --git a/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java b/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java index b8684cbbb2..54958d72fa 100644 --- a/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java +++ b/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,13 @@ * purposes. The size of the buffer limits the maximum size of an individual record in the * key/value data store. * @param maxOpenedReadHandles Maximum number of simultaneously opened {@link java.io.InputStream} to file offset index file. - * @param computeCRC32C Contains setting that determined whether CRC32C checksums will be computed for written + * @param syncWrites Determines whether the storage layer forces the operating system to flush + * the internal buffers to disk at regular "safe points" or not. The default + * is true, so that data is not lost in the event of a power failure. There + * are situations where disabling this feature can improve performance and + * the client can accept the risk of data loss (e.g. when running automated + * tests, etc.). + * @param computeCRC32C Determines whether CRC32C checksums will be computed for written * records and also whether the CRC32C checksum will be checked on record read. * @param minimalActiveRecordShare Minimal share of active records in the file. If the share is lower, the file will * be compacted. @@ -79,6 +85,7 @@ public record StorageOptions( long waitOnCloseSeconds, int outputBufferSize, int maxOpenedReadHandles, + boolean syncWrites, boolean computeCRC32C, double minimalActiveRecordShare, long fileSizeCompactionThresholdBytes, @@ -93,6 +100,7 @@ public record StorageOptions( public static final int DEFAULT_LOCK_TIMEOUT_SECONDS = 5; public static final int DEFAULT_WAIT_ON_CLOSE_SECONDS = 5; public static final int DEFAULT_MAX_OPENED_READ_HANDLES = Runtime.getRuntime().availableProcessors(); + public static final boolean DEFAULT_SYNC_WRITES = true; public static final boolean DEFAULT_COMPUTE_CRC = true; public static final double DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE = 0.5; public static final long DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD = 104_857_600L; // 100MB @@ -103,12 +111,14 @@ public record StorageOptions( /** * Builder method is planned to be used only in tests. */ + @Nonnull public static StorageOptions temporary() { return new StorageOptions( Path.of(System.getProperty("java.io.tmpdir"), "evita/data"), Path.of(System.getProperty("java.io.tmpdir"), "evita/export"), 5, 5, DEFAULT_OUTPUT_BUFFER_SIZE, Runtime.getRuntime().availableProcessors(), + false, true, DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE, DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD, @@ -121,6 +131,7 @@ public static StorageOptions temporary() { /** * Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future. */ + @Nonnull public static StorageOptions.Builder builder() { return new StorageOptions.Builder(); } @@ -128,6 +139,7 @@ public static StorageOptions.Builder builder() { /** * Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future. */ + @Nonnull public static StorageOptions.Builder builder(@Nonnull StorageOptions storageOptions) { return new StorageOptions.Builder(storageOptions); } @@ -140,6 +152,7 @@ public StorageOptions() { DEFAULT_WAIT_ON_CLOSE_SECONDS, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_MAX_OPENED_READ_HANDLES, + DEFAULT_SYNC_WRITES, DEFAULT_COMPUTE_CRC, DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE, DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD, @@ -156,6 +169,7 @@ public StorageOptions( long waitOnCloseSeconds, int outputBufferSize, int maxOpenedReadHandles, + boolean syncWrites, boolean computeCRC32C, double minimalActiveRecordShare, long fileSizeCompactionThresholdBytes, @@ -169,6 +183,7 @@ public StorageOptions( this.waitOnCloseSeconds = waitOnCloseSeconds; this.outputBufferSize = outputBufferSize; this.maxOpenedReadHandles = maxOpenedReadHandles; + this.syncWrites = syncWrites; this.computeCRC32C = computeCRC32C; this.minimalActiveRecordShare = minimalActiveRecordShare; this.fileSizeCompactionThresholdBytes = fileSizeCompactionThresholdBytes; @@ -188,6 +203,7 @@ public static class Builder { private long waitOnCloseSeconds = DEFAULT_WAIT_ON_CLOSE_SECONDS; private int outputBufferSize = DEFAULT_OUTPUT_BUFFER_SIZE; private int maxOpenedReadHandles = DEFAULT_MAX_OPENED_READ_HANDLES; + private boolean syncWrites = DEFAULT_SYNC_WRITES; private boolean computeCRC32C = DEFAULT_COMPUTE_CRC; private double minimalActiveRecordShare = DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE; private long fileSizeCompactionThresholdBytes = DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD; @@ -205,6 +221,7 @@ public static class Builder { this.waitOnCloseSeconds = storageOptions.waitOnCloseSeconds; this.outputBufferSize = storageOptions.outputBufferSize; this.maxOpenedReadHandles = storageOptions.maxOpenedReadHandles; + this.syncWrites = storageOptions.syncWrites; this.computeCRC32C = storageOptions.computeCRC32C; this.minimalActiveRecordShare = storageOptions.minimalActiveRecordShare; this.fileSizeCompactionThresholdBytes = storageOptions.fileSizeCompactionThresholdBytes; @@ -251,6 +268,12 @@ public Builder maxOpenedReadHandles(int maxOpenedReadHandles) { return this; } + @Nonnull + public Builder syncWrites(boolean syncWrites) { + this.syncWrites = syncWrites; + return this; + } + @Nonnull public Builder computeCRC32(boolean computeCRC32) { this.computeCRC32C = computeCRC32; @@ -296,6 +319,7 @@ public StorageOptions build() { waitOnCloseSeconds, outputBufferSize, maxOpenedReadHandles, + syncWrites, computeCRC32C, minimalActiveRecordShare, fileSizeCompactionThresholdBytes, diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java index 4d7df761a7..f53d50f72b 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -1610,7 +1610,10 @@ private Map> appendWal( UUID.randomUUID(), KryoFactory.createKryo(WalKryoConfigurer.INSTANCE), new WriteOnlyOffHeapWithFileBackupHandle( - isolatedWalFilePath, this.observableOutputKeeper, offHeapMemoryManager + isolatedWalFilePath, + false, + this.observableOutputKeeper, + offHeapMemoryManager ) ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java index 219176fee1..d107b5421d 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -159,6 +159,7 @@ class DefaultCatalogPersistenceServiceTest implements EvitaTestSupport { ); private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( getTestDirectory().resolve(transactionId.toString()), + false, observableOutputKeeper, new OffHeapMemoryManager(TEST_CATALOG, 512, 1) ); @@ -248,6 +249,7 @@ protected Kryo create() { catalogName, FileType.CATALOG, catalogName, + false, catalogFilePath, observableOutputKeeper ), @@ -796,7 +798,7 @@ private StorageOptions getStorageOptions() { getTestDirectory().resolve(DIR_DEFAULT_CATALOG_PERSISTENCE_SERVICE_TEST), 60, 60, StorageOptions.DEFAULT_OUTPUT_BUFFER_SIZE, 1, - true, 1.0, 0L, false, + false, true, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE ); } diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java index f7bd95d3e0..d9d39e6748 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,7 @@ class DefaultIsolatedWalServiceTest implements EvitaTestSupport { ); private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( getTestDirectory().resolve(transactionId.toString()), + false, observableOutputKeeper, new OffHeapMemoryManager(TEST_CATALOG, 512, 1) ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java index 19a836df48..3d6127a08f 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ class OffsetIndexSerializationServiceTest { void shouldComputeExpectedRecordCountProperly() { final StorageOptions testOptions = new StorageOptions( Path.of(""), Path.of(""), 1, 0, 55, 1, - false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE + false, false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE ); assertEquals(new OffsetIndexSerializationService.ExpectedCounts(0, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 0)); assertEquals(new OffsetIndexSerializationService.ExpectedCounts(1, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 1)); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java index f9535b00e0..afa0974900 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.function.Supplier; import static io.evitadb.store.offsetIndex.OffsetIndexSerializationService.computeExpectedRecordCount; import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST; @@ -199,7 +200,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { ), limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -221,7 +222,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { ), limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -242,7 +243,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { snapshotBootstrapDescriptor, limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(snapshotPath, observableOutputKeeper), + new WriteOnlyFileHandle(snapshotPath, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -284,7 +285,7 @@ void shouldRemoveRecord() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -328,7 +329,7 @@ void shouldReadBinaryRecordAndDeserializeManually() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -384,7 +385,7 @@ void shouldReadSingleRecordAndUsingManualDeserialization() { i ); - final EntityBodyStoragePart entityBody = OffsetIndex.readSingleRecord( + final Supplier entityBodySupplier = () -> OffsetIndex.readSingleRecord( targetFile, offsetIndexDescriptor.fileLocation(), key, @@ -395,8 +396,9 @@ void shouldReadSingleRecordAndUsingManualDeserialization() { .orElse(null) ); if (i < recordCount * (iterationCount - 1) && i % recordCount < removedRecords && i % recordCount > 0) { - assertNull(entityBody); + assertThrows(NullPointerException.class, entityBodySupplier::get); } else { + final EntityBodyStoragePart entityBody = entityBodySupplier.get(); assertNotNull(entityBody); assertEquals( new EntityBodyStoragePart(i), @@ -418,7 +420,7 @@ void shouldRefuseOperationAfterClose() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -448,7 +450,7 @@ void generationalProofTest(GenerationalTestInput input) { ), buildOptionsWithLimitedBuffer(), offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ) @@ -522,7 +524,7 @@ void generationalProofTest(GenerationalTestInput input) { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(currentFilePath.get(), observableOutputKeeper), + new WriteOnlyFileHandle(currentFilePath.get(), false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -610,7 +612,7 @@ void generationalProofTest(GenerationalTestInput input) { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(newPath, observableOutputKeeper), + new WriteOnlyFileHandle(newPath, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -653,7 +655,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex( ), storageOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -675,7 +677,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex( ), storageOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -718,7 +720,7 @@ private InsertionOutput createRecordsInFileOffsetIndex( ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java index a8a8d35138..c4e559f220 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ void shouldWriteDataToOffHeapChunk() { try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { writeHandle.checkAndExecuteAndSync( @@ -85,7 +85,7 @@ void shouldWriteLargeDataFirstToOffHeapChunkThatAutomaticallySwitchesToTemporary try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { @@ -116,7 +116,7 @@ void shouldWriteLargeDataFirstToOffHeapChunkThatAutomaticallySwitchesToTemporary try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { @@ -151,7 +151,7 @@ void shouldStartDirectlyWithFileBackupIfThereIsNoFreeMemoryRegionAvailable() { try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle realMemoryHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { // we need to write at least one byte to the real memory handle to force the memory manager @@ -164,7 +164,7 @@ void shouldStartDirectlyWithFileBackupIfThereIsNoFreeMemoryRegionAvailable() { // because there is only one region available - this will force the handle to use the file backup immediately try ( final WriteOnlyOffHeapWithFileBackupHandle forcedFileHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java index f8650cf794..b0043faaef 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -333,7 +333,7 @@ private Map> writeWal(@Nonnull OffHeapMemoryManager offHeap UUID.randomUUID(), KryoFactory.createKryo(WalKryoConfigurer.INSTANCE), new WriteOnlyOffHeapWithFileBackupHandle( - isolatedWalFilePath, observableOutputKeeper, offHeapMemoryManager + isolatedWalFilePath, false, observableOutputKeeper, offHeapMemoryManager ) ); diff --git a/evita_server/src/main/resources/evita-configuration.yaml b/evita_server/src/main/resources/evita-configuration.yaml index 038b8446b2..3d720e2abd 100644 --- a/evita_server/src/main/resources/evita-configuration.yaml +++ b/evita_server/src/main/resources/evita-configuration.yaml @@ -29,6 +29,7 @@ storage: waitOnCloseSeconds: ${storage.waitOnCloseSeconds:60} outputBufferSize: ${storage.outputBufferSize:4MB} maxOpenedReadHandles: ${storage.maxOpenedReadHandles:12} + syncWrites: ${storage.syncWrites:true} computeCRC32C: ${storage.computeCRC32C:true} minimalActiveRecordShare: ${storage.minimalActiveRecordShare:0.5} fileSizeCompactionThresholdBytes: ${storage.fileSizeCompactionThresholdBytes:100M} diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java index 2737d77998..63971e7a76 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -230,7 +230,7 @@ public class OffsetIndex { * @param The type of the storage part. * @return deserialized storage part or null if the record was not found */ - @Nullable + @Nonnull public static T readSingleRecord( @Nonnull Path filePath, @Nonnull FileLocation fileLocation, @@ -251,7 +251,9 @@ public static T readSingleRecord( fileLocation, filteringOffsetIndexBuilder ); - return storagePartReader.apply(filteringOffsetIndexBuilder, input); + return Objects.requireNonNull( + storagePartReader.apply(filteringOffsetIndexBuilder, input) + ); } catch (FileNotFoundException e) { throw new UnexpectedIOException( "Cannot create read offset file index from file `" + filePath + "`!", @@ -1649,7 +1651,7 @@ public int countDifference(long catalogVersion) { // scan non-flushed values final ConcurrentHashMap nvValues = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvValues != null) { int index = Arrays.binarySearch(nv, catalogVersion); if (index != -1) { final int startIndex = index >= 0 ? index : -index - 1; @@ -1673,7 +1675,7 @@ public int countDifference(long catalogVersion) { } if (hv != null) { int index = Arrays.binarySearch(hv, catalogVersion); - if (index != -1) { + if (index != -1 && hvValues != null) { final int startIndex = index >= 0 ? index : -index - 1; for (int ix = hv.length - 1; ix > startIndex && ix >= 0; ix--) { final PastMemory differenceSet = hvValues.get(hv[ix]); @@ -1701,7 +1703,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { final long[] nv = this.nonFlushedVersions; if (nv != null) { int index = Arrays.binarySearch(nv, catalogVersion); - if (index != -1) { + if (index != -1 && nvValues != null) { final int startIndex = index >= 0 ? index : -index - 1; for (int ix = nv.length - 1; ix >= startIndex && ix >= 0; ix--) { final NonFlushedValueSet nonFlushedValueSet = nvValues.get(nv[ix]); @@ -1721,7 +1723,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { } finally { this.lock.unlock(); } - if (hv != null) { + if (hv != null && hvValues != null) { int index = Arrays.binarySearch(hv, catalogVersion); if (index != -1) { final int startIndex = index >= 0 ? index : -index - 1; @@ -1747,7 +1749,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { public Optional getNonFlushedValueIfVersionMatches(long catalogVersion, @Nonnull RecordKey key) { final ConcurrentHashMap nvSet = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvSet != null) { int index = Arrays.binarySearch(nv, catalogVersion); final int startIndex = index >= 0 ? index : -index - 2; if (startIndex >= 0) { @@ -1784,7 +1786,8 @@ public OptionalLong getLastNonFlushedCatalogVersionIfExists() { * @return true if there are non-flushed values, false otherwise */ public boolean hasValuesToFlush() { - return !(nonFlushedValues == null || nonFlushedValues.isEmpty()); + final ConcurrentHashMap nvSet = this.nonFlushedValues; + return nvSet != null && !nvSet.isEmpty(); } /** @@ -1815,6 +1818,7 @@ public Optional getVolatileValueInformation(long catal final long examinedVersion = hv[ix]; final PastMemory pastMemory = hvValues.get(examinedVersion); if (pastMemory.getRemovedKeys().contains(key)) { + //noinspection DataFlowIssue addedInFuture = false; } if (pastMemory.getAddedKeys().contains(key) && examinedVersion != catalogVersion) { @@ -1871,7 +1875,7 @@ public void removeValue(long catalogVersion, @Nonnull RecordKey key, @Nonnull Fi public Collection getNonFlushedEntriesToPromote(long catalogVersion) { final ConcurrentHashMap nvSet = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvSet != null) { Assert.isPremiseValid( catalogVersion >= nv[nv.length - 1], "Catalog version is expected to be at least " + nv[nv.length - 1] + "!" @@ -1912,13 +1916,14 @@ public void recordHistoricalVersions( if (versionToPurge > -1) { try { this.lock.lock(); - if (this.historicalVersions != null) { - final long[] versionsToPurge = this.historicalVersions; + final long[] versionsToPurge = this.historicalVersions; + final ConcurrentHashMap theVolatileValues = this.volatileValues; + if (versionsToPurge != null && theVolatileValues != null) { int index = Arrays.binarySearch(versionsToPurge, versionToPurge); final int startIndex = index >= 0 ? index : -index - 2; if (index != -1) { for (int ix = startIndex; ix >= 0; ix--) { - this.volatileValues.remove(versionsToPurge[ix]); + theVolatileValues.remove(versionsToPurge[ix]); } } this.historicalVersions = Arrays.copyOfRange(versionsToPurge, startIndex + 1, versionsToPurge.length); @@ -1933,16 +1938,18 @@ public void recordHistoricalVersions( final long catalogVersion = valuesToPromote.getCatalogVersion(); try { this.lock.lock(); - if (this.historicalVersions == null) { + final long[] hv = this.historicalVersions; + final ConcurrentHashMap theVolatileValues = this.volatileValues; + if (hv == null || theVolatileValues == null) { + final ConcurrentHashMap newVolatileValues = CollectionUtils.createConcurrentHashMap(16); + newVolatileValues.put(catalogVersion, valuesToPromote.createFrom(keyToLocations)); this.historicalVersions = new long[]{catalogVersion}; - this.volatileValues = CollectionUtils.createConcurrentHashMap(16); - this.volatileValues.put(catalogVersion, valuesToPromote.createFrom(keyToLocations)); + this.volatileValues = newVolatileValues; } else { - this.volatileValues.compute( + theVolatileValues.compute( catalogVersion, (key, value) -> { if (value == null) { - final long[] hv = this.historicalVersions; this.historicalVersions = ArrayUtils.insertLongIntoOrderedArray(catalogVersion, hv); return valuesToPromote.createFrom(keyToLocations); } else { @@ -2041,14 +2048,17 @@ public Optional getOldestRecordKeptTimestamp() { */ public boolean contains(@Nonnull RecordKey key) { final long[] nv = this.nonFlushedVersions; - for (int i = nv.length - 1; i >= 0; i--) { - long nonFlushedVersion = nv[i]; - final NonFlushedValueSet nfSet = this.nonFlushedValues.get(nonFlushedVersion); - if (nfSet != null) { - if (nfSet.removedKeys.contains(key)) { - return false; - } else if (nfSet.addedKeys.contains(key)) { - return true; + final ConcurrentHashMap theNonVlushedValues = this.nonFlushedValues; + if (nv != null && theNonVlushedValues != null) { + for (int i = nv.length - 1; i >= 0; i--) { + long nonFlushedVersion = nv[i]; + final NonFlushedValueSet nfSet = theNonVlushedValues.get(nonFlushedVersion); + if (nfSet != null) { + if (nfSet.removedKeys.contains(key)) { + return false; + } else if (nfSet.addedKeys.contains(key)) { + return true; + } } } } @@ -2063,17 +2073,19 @@ public boolean contains(@Nonnull RecordKey key) { */ @Nonnull private NonFlushedValueSet getNonFlushedValues(long catalogVersion) { - if (this.nonFlushedVersions == null) { - this.nonFlushedValues = CollectionUtils.createConcurrentHashMap(16); + final long[] nv = this.nonFlushedVersions; + final ConcurrentHashMap theNonFlushedValues = this.nonFlushedValues; + if (nv == null || theNonFlushedValues == null) { + final ConcurrentHashMap newNonFlushedValues = CollectionUtils.createConcurrentHashMap(16); + final NonFlushedValueSet nvSet = new NonFlushedValueSet(catalogVersion, this::notifySizeIncrease); + newNonFlushedValues.put(catalogVersion, nvSet); + this.nonFlushedValues = newNonFlushedValues; this.nonFlushedVersions = new long[]{catalogVersion}; - final NonFlushedValueSet nv = new NonFlushedValueSet(catalogVersion, this::notifySizeIncrease); - this.nonFlushedValues.put(catalogVersion, nv); - return nv; + return nvSet; } else { - return this.nonFlushedValues.computeIfAbsent( + return theNonFlushedValues.computeIfAbsent( catalogVersion, cv -> { - final long[] nv = this.nonFlushedVersions; final long lastCatalogVersion = nv[nv.length - 1]; Assert.isPremiseValid( lastCatalogVersion == -1 || lastCatalogVersion <= catalogVersion, @@ -2253,7 +2265,7 @@ private record NonFlushedValuesWithFileLocation( * @param addedInFuture true if the value was added in future versions */ protected record VolatileValueInformation( - @Nonnull VersionedValue versionedValue, + @Nullable VersionedValue versionedValue, boolean removed, boolean addedInFuture ) { diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java index bee77eba33..85d16972cb 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -98,6 +98,11 @@ public class WriteOnlyFileHandle implements WriteOnlyHandle { * If a thread cannot acquire the lock within this time, a StorageException is thrown. */ private final long lockTimeoutSeconds; + /** + * Execute fsync when asked. When set to false, methods simply flush the buffers, but doesn't explicitly sync + * the data on the persistent storage - leaving it to the OS to decide when to do so. + */ + private final boolean fsSync; /** * The path to the target file that this handle is associated with. * This handle provides write-only access to the file at this path. @@ -177,11 +182,13 @@ static File getTargetFile(@Nonnull Path filePath) { * @param os The observable output stream to synchronize. * @throws SyncFailedException if the synchronization operation failed. */ - private static void doSync(@Nonnull ObservableOutput os) { + private static void doSync(@Nonnull ObservableOutput os, boolean fsSync) { // execute fsync so that data are really stored to the disk try { os.flush(); - os.getOutputStream().getFD().sync(); + if (fsSync) { + os.getOutputStream().getFD().sync(); + } } catch (IOException e) { throw new SyncFailedException(e); } @@ -189,15 +196,17 @@ private static void doSync(@Nonnull ObservableOutput os) { public WriteOnlyFileHandle( @Nonnull Path targetFile, + boolean fsSync, @Nonnull ObservableOutputKeeper observableOutputKeeper ) { - this(null, null, null, targetFile, observableOutputKeeper); + this(null, null, null, fsSync, targetFile, observableOutputKeeper); } public WriteOnlyFileHandle( @Nullable String catalogName, @Nullable FileType fileType, @Nullable String logicalName, + boolean fsSync, @Nonnull Path targetFile, @Nonnull ObservableOutputKeeper observableOutputKeeper ) { @@ -205,6 +214,7 @@ public WriteOnlyFileHandle( this.fileType = fileType; this.logicalName = logicalName; this.lockTimeoutSeconds = observableOutputKeeper.getLockTimeoutSeconds(); + this.fsSync = fsSync; this.targetFile = targetFile; Assert.isPremiseValid(getTargetFile(targetFile) != null, "Target file should be created or exception thrown!"); this.observableOutputKeeper = observableOutputKeeper; @@ -243,7 +253,7 @@ public void checkAndExecuteAndSync(@Nonnull String operation, @Nonnull Runnable OUTPUT_FACTORY, observableOutput -> { logic.accept(observableOutput); - doSync(observableOutput); + doSync(observableOutput, this.fsSync); } ); return; @@ -269,7 +279,7 @@ public T checkAndExecuteAndSync(@Nonnull String operation, @Nonnull Runna OUTPUT_FACTORY, observableOutput -> { final S result = logic.apply(observableOutput); - doSync(observableOutput); + doSync(observableOutput, this.fsSync); return postExecutionLogic.apply(observableOutput, result); } ); diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java index 43fde3e5bf..39c63ed5ca 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ import io.evitadb.utils.Assert; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.FileOutputStream; import java.io.IOException; @@ -89,11 +90,16 @@ public class WriteOnlyOffHeapWithFileBackupHandle implements WriteOnlyHandle { /** * OutputStream that is used to write data to the off-heap memory. */ - private ObservableOutput offHeapMemoryOutput; + @Nullable private ObservableOutput offHeapMemoryOutput; /** * OutputStream that is used to write data to the file. */ - private ObservableOutput fileOutput; + @Nullable private ObservableOutput fileOutput; + /** + * Execute fsync when asked. When set to false, methods simply flush the buffers, but doesn't explicitly sync + * the data on the persistent storage - leaving it to the OS to decide when to do so. + */ + private final boolean fsSync; /** * Contains the information about the last end byte of fully written record. */ @@ -105,11 +111,11 @@ public class WriteOnlyOffHeapWithFileBackupHandle implements WriteOnlyHandle { * @param os The observable output stream to synchronize. * @throws SyncFailedException if the synchronization operation failed. */ - private static void doSync(@Nonnull ObservableOutput os) { + private static void doSync(@Nonnull ObservableOutput os, boolean fsSync) { // execute fsync so that data are really stored to the disk try { os.flush(); - if (os.getOutputStream() instanceof FileOutputStream fileOutputStream) { + if (fsSync && os.getOutputStream() instanceof FileOutputStream fileOutputStream) { fileOutputStream.getFD().sync(); } } catch (IOException e) { @@ -119,11 +125,13 @@ private static void doSync(@Nonnull ObservableOutput os) { public WriteOnlyOffHeapWithFileBackupHandle( @Nonnull Path targetFile, + boolean fsSync, @Nonnull ObservableOutputKeeper observableOutputKeeper, @Nonnull OffHeapMemoryManager offHeapMemoryManager ) { - this.offHeapMemoryManager = offHeapMemoryManager; this.targetFile = targetFile; + this.fsSync = fsSync; + this.offHeapMemoryManager = offHeapMemoryManager; this.observableOutputKeeper = observableOutputKeeper; } @@ -333,7 +341,7 @@ private T executeLogic( ) { final T result = logic.apply(output); if (sync) { - doSync(output); + doSync(output, this.fsSync); } // update the last consistent written position lastConsistentWrittenPosition = Math.toIntExact(output.total()); diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java index 8b62714a79..adf821d209 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import io.evitadb.store.offsetIndex.OffsetIndex; import io.evitadb.utils.MemoryMeasuringConstants; -import javax.annotation.Nullable; +import javax.annotation.Nonnull; import java.io.Serial; import java.io.Serializable; @@ -43,7 +43,7 @@ public record VersionedValue( long primaryKey, byte recordType, - @Nullable FileLocation fileLocation + @Nonnull FileLocation fileLocation ) implements Serializable { @Serial private static final long serialVersionUID = -4467999274212489366L; public static final long MEMORY_SIZE = 2 * MemoryMeasuringConstants.OBJECT_HEADER_SIZE + diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java index 3ce15dfd61..441c51810c 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -172,6 +173,7 @@ public static CatalogOffsetIndexStoragePartPersistenceService create( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -273,10 +275,12 @@ static OffsetIndexDescriptor loadOffsetIndexDescriptor( final Kryo kryo = KryoFactory.createKryo( SharedClassesConfigurer.INSTANCE.andThen(CatalogHeaderKryoConfigurer.INSTANCE) ); - final CatalogHeader theCatalogHeader = StorageRecord.read( - theInput, catalogHeaderLocation, - (input, recordLength) -> kryo.readObject(input, CatalogHeader.class) - ).payload(); + final CatalogHeader theCatalogHeader = Objects.requireNonNull( + StorageRecord.read( + theInput, catalogHeaderLocation, + (input, recordLength) -> kryo.readObject(input, CatalogHeader.class) + ).payload() + ); catalogHeaderConsumer.accept(theCatalogHeader); return new OffsetIndexDescriptor( @@ -335,6 +339,7 @@ private static OffsetIndex loadOffsetIndex( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -357,6 +362,7 @@ private static OffsetIndex loadOffsetIndex( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -372,7 +378,7 @@ private static OffsetIndex loadOffsetIndex( private CatalogOffsetIndexStoragePartPersistenceService( long catalogVersion, - @Nullable CatalogHeader catalogHeader, + @Nonnull CatalogHeader catalogHeader, @Nonnull TransactionOptions transactionOptions, @Nonnull OffsetIndex offsetIndex, @Nonnull OffHeapMemoryManager offHeapMemoryManager, @@ -396,10 +402,12 @@ private CatalogOffsetIndexStoragePartPersistenceService( @Nonnull @Override public CatalogHeader getCatalogHeader(long catalogVersion) { - if (currentCatalogHeader == null) { - currentCatalogHeader = offsetIndex.get(catalogVersion, 1L, CatalogHeader.class); + if (this.currentCatalogHeader == null) { + this.currentCatalogHeader = Objects.requireNonNull( + this.offsetIndex.get(catalogVersion, 1L, CatalogHeader.class) + ); } - return currentCatalogHeader; + return this.currentCatalogHeader; } @Override diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java index 79e2096e39..d1e0d5ebb5 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -858,6 +858,7 @@ public DefaultCatalogPersistenceService( this.catalogName, FileType.CATALOG, this.catalogName, + storageOptions.syncWrites(), this.catalogStoragePath.resolve(getCatalogBootstrapFileName(catalogName)), this.observableOutputKeeper ) @@ -940,6 +941,7 @@ public DefaultCatalogPersistenceService( this.catalogName, FileType.CATALOG, this.catalogName, + storageOptions.syncWrites(), this.catalogStoragePath.resolve(getCatalogBootstrapFileName(catalogName)), this.observableOutputKeeper ) @@ -1442,6 +1444,7 @@ public IsolatedWalPersistenceService createIsolatedWalPersistenceService(@Nonnul this.transactionOptions.transactionWorkDirectory() .resolve(transactionId.toString()) .resolve(transactionId + ".wal"), + this.storageOptions.syncWrites(), this.observableOutputKeeper, this.offHeapMemoryManager ) @@ -1614,6 +1617,7 @@ public CatalogPersistenceService replaceWith( catalogNameToBeReplaced, FileType.CATALOG, catalogNameToBeReplaced, + storageOptions.syncWrites(), newPath.resolve(getCatalogBootstrapFileName(catalogNameToBeReplaced)), this.observableOutputKeeper ), @@ -2221,6 +2225,7 @@ private CatalogBootstrap writeCatalogBootstrap( originalBootstrapHandle, new WriteOnlyFileHandle( originalBootstrapHandle.getTargetFile(), + storageOptions.syncWrites(), this.observableOutputKeeper ) ), @@ -2275,6 +2280,7 @@ void trimBootstrapFile(long catalogVersion) { originalBootstrapHandle, new WriteOnlyFileHandle( originalBootstrapHandle.getTargetFile(), + storageOptions.syncWrites(), this.observableOutputKeeper ) ), @@ -2498,6 +2504,7 @@ private WriteOnlyFileHandle createNewBootstrapTempWriteHandle(@Nonnull String ne // create new file and replace the former one with it return new WriteOnlyFileHandle( Files.createTempFile(CatalogPersistenceService.getCatalogBootstrapFileName(newCatalogName), ".tmp"), + storageOptions.syncWrites(), this.observableOutputKeeper ); } catch (IOException e) { diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java index 5aaeae4f5d..ea7422c59e 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -763,6 +763,7 @@ public DefaultEntityCollectionPersistenceService( catalogName, FileType.ENTITY_COLLECTION, this.entityCollectionFileReference.entityType(), + storageOptions.syncWrites(), this.entityCollectionFile, observableOutputKeeper ), @@ -817,6 +818,7 @@ public DefaultEntityCollectionPersistenceService( catalogName, FileType.ENTITY_COLLECTION, this.entityCollectionFileReference.entityType(), + storageOptions.syncWrites(), this.entityCollectionFile, this.observableOutputKeeper ), diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java index b2efebdf7e..da305a31ea 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -107,6 +107,7 @@ public TransactionalStoragePartPersistenceService( offsetIndexRecordTypeRegistry, new WriteOnlyOffHeapWithFileBackupHandle( this.targetFile, + storageOptions.syncWrites(), observableOutputKeeper, offHeapMemoryManager ), diff --git a/evita_test_support/src/main/resources/evita-configuration.yaml b/evita_test_support/src/main/resources/evita-configuration.yaml index e92bdbb840..54f0ba10c0 100644 --- a/evita_test_support/src/main/resources/evita-configuration.yaml +++ b/evita_test_support/src/main/resources/evita-configuration.yaml @@ -29,6 +29,7 @@ storage: waitOnCloseSeconds: ${storage.waitOnCloseSeconds:60} outputBufferSize: ${storage.outputBufferSize:4MB} maxOpenedReadHandles: ${storage.maxOpenedReadHandles:12} + syncWrites: ${storage.syncWrites:true} computeCRC32C: ${storage.computeCRC32C:true} minimalActiveRecordShare: ${storage.minimalActiveRecordShare:0.5} fileSizeCompactionThresholdBytes: ${storage.fileSizeCompactionThresholdBytes:100M}