diff --git a/documentation/user/en/operate/configure.md b/documentation/user/en/operate/configure.md index 94fa5dcf9a..cde850600b 100644 --- a/documentation/user/en/operate/configure.md +++ b/documentation/user/en/operate/configure.md @@ -41,6 +41,7 @@ storage: # [see Storage configuration]( waitOnCloseSeconds: 60 outputBufferSize: 4MB maxOpenedReadHandles: 12 + syncWrites: true computeCRC32C: true minimalActiveRecordShare: 0.5 fileSizeCompactionThresholdBytes: 100MB @@ -490,6 +491,14 @@ This section contains configuration options for the storage layer of the databas [MacOS](https://gist.github.com/tombigel/d503800a282fcadbee14b537735d202c) +
syncWrites
+
+

**Default:** `true`

+

Determines whether the storage layer forces the operating system to flush the internal buffers to disk at + regular "safe points" or not. The default is true, so that data is not lost in the event of a power failure. + There are situations where disabling this feature can improve performance and the client can accept the risk + of data loss (e.g. when running automated tests, etc.).

+
computeCRC32C

**Default:** `true`

diff --git a/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java b/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java index b8684cbbb2..54958d72fa 100644 --- a/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java +++ b/evita_api/src/main/java/io/evitadb/api/configuration/StorageOptions.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,13 @@ * purposes. The size of the buffer limits the maximum size of an individual record in the * key/value data store. * @param maxOpenedReadHandles Maximum number of simultaneously opened {@link java.io.InputStream} to file offset index file. - * @param computeCRC32C Contains setting that determined whether CRC32C checksums will be computed for written + * @param syncWrites Determines whether the storage layer forces the operating system to flush + * the internal buffers to disk at regular "safe points" or not. The default + * is true, so that data is not lost in the event of a power failure. There + * are situations where disabling this feature can improve performance and + * the client can accept the risk of data loss (e.g. when running automated + * tests, etc.). + * @param computeCRC32C Determines whether CRC32C checksums will be computed for written * records and also whether the CRC32C checksum will be checked on record read. * @param minimalActiveRecordShare Minimal share of active records in the file. If the share is lower, the file will * be compacted. @@ -79,6 +85,7 @@ public record StorageOptions( long waitOnCloseSeconds, int outputBufferSize, int maxOpenedReadHandles, + boolean syncWrites, boolean computeCRC32C, double minimalActiveRecordShare, long fileSizeCompactionThresholdBytes, @@ -93,6 +100,7 @@ public record StorageOptions( public static final int DEFAULT_LOCK_TIMEOUT_SECONDS = 5; public static final int DEFAULT_WAIT_ON_CLOSE_SECONDS = 5; public static final int DEFAULT_MAX_OPENED_READ_HANDLES = Runtime.getRuntime().availableProcessors(); + public static final boolean DEFAULT_SYNC_WRITES = true; public static final boolean DEFAULT_COMPUTE_CRC = true; public static final double DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE = 0.5; public static final long DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD = 104_857_600L; // 100MB @@ -103,12 +111,14 @@ public record StorageOptions( /** * Builder method is planned to be used only in tests. */ + @Nonnull public static StorageOptions temporary() { return new StorageOptions( Path.of(System.getProperty("java.io.tmpdir"), "evita/data"), Path.of(System.getProperty("java.io.tmpdir"), "evita/export"), 5, 5, DEFAULT_OUTPUT_BUFFER_SIZE, Runtime.getRuntime().availableProcessors(), + false, true, DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE, DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD, @@ -121,6 +131,7 @@ public static StorageOptions temporary() { /** * Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future. */ + @Nonnull public static StorageOptions.Builder builder() { return new StorageOptions.Builder(); } @@ -128,6 +139,7 @@ public static StorageOptions.Builder builder() { /** * Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future. */ + @Nonnull public static StorageOptions.Builder builder(@Nonnull StorageOptions storageOptions) { return new StorageOptions.Builder(storageOptions); } @@ -140,6 +152,7 @@ public StorageOptions() { DEFAULT_WAIT_ON_CLOSE_SECONDS, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_MAX_OPENED_READ_HANDLES, + DEFAULT_SYNC_WRITES, DEFAULT_COMPUTE_CRC, DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE, DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD, @@ -156,6 +169,7 @@ public StorageOptions( long waitOnCloseSeconds, int outputBufferSize, int maxOpenedReadHandles, + boolean syncWrites, boolean computeCRC32C, double minimalActiveRecordShare, long fileSizeCompactionThresholdBytes, @@ -169,6 +183,7 @@ public StorageOptions( this.waitOnCloseSeconds = waitOnCloseSeconds; this.outputBufferSize = outputBufferSize; this.maxOpenedReadHandles = maxOpenedReadHandles; + this.syncWrites = syncWrites; this.computeCRC32C = computeCRC32C; this.minimalActiveRecordShare = minimalActiveRecordShare; this.fileSizeCompactionThresholdBytes = fileSizeCompactionThresholdBytes; @@ -188,6 +203,7 @@ public static class Builder { private long waitOnCloseSeconds = DEFAULT_WAIT_ON_CLOSE_SECONDS; private int outputBufferSize = DEFAULT_OUTPUT_BUFFER_SIZE; private int maxOpenedReadHandles = DEFAULT_MAX_OPENED_READ_HANDLES; + private boolean syncWrites = DEFAULT_SYNC_WRITES; private boolean computeCRC32C = DEFAULT_COMPUTE_CRC; private double minimalActiveRecordShare = DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE; private long fileSizeCompactionThresholdBytes = DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD; @@ -205,6 +221,7 @@ public static class Builder { this.waitOnCloseSeconds = storageOptions.waitOnCloseSeconds; this.outputBufferSize = storageOptions.outputBufferSize; this.maxOpenedReadHandles = storageOptions.maxOpenedReadHandles; + this.syncWrites = storageOptions.syncWrites; this.computeCRC32C = storageOptions.computeCRC32C; this.minimalActiveRecordShare = storageOptions.minimalActiveRecordShare; this.fileSizeCompactionThresholdBytes = storageOptions.fileSizeCompactionThresholdBytes; @@ -251,6 +268,12 @@ public Builder maxOpenedReadHandles(int maxOpenedReadHandles) { return this; } + @Nonnull + public Builder syncWrites(boolean syncWrites) { + this.syncWrites = syncWrites; + return this; + } + @Nonnull public Builder computeCRC32(boolean computeCRC32) { this.computeCRC32C = computeCRC32; @@ -296,6 +319,7 @@ public StorageOptions build() { waitOnCloseSeconds, outputBufferSize, maxOpenedReadHandles, + syncWrites, computeCRC32C, minimalActiveRecordShare, fileSizeCompactionThresholdBytes, diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java index 4d7df761a7..f53d50f72b 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -1610,7 +1610,10 @@ private Map> appendWal( UUID.randomUUID(), KryoFactory.createKryo(WalKryoConfigurer.INSTANCE), new WriteOnlyOffHeapWithFileBackupHandle( - isolatedWalFilePath, this.observableOutputKeeper, offHeapMemoryManager + isolatedWalFilePath, + false, + this.observableOutputKeeper, + offHeapMemoryManager ) ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java index 219176fee1..d107b5421d 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -159,6 +159,7 @@ class DefaultCatalogPersistenceServiceTest implements EvitaTestSupport { ); private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( getTestDirectory().resolve(transactionId.toString()), + false, observableOutputKeeper, new OffHeapMemoryManager(TEST_CATALOG, 512, 1) ); @@ -248,6 +249,7 @@ protected Kryo create() { catalogName, FileType.CATALOG, catalogName, + false, catalogFilePath, observableOutputKeeper ), @@ -796,7 +798,7 @@ private StorageOptions getStorageOptions() { getTestDirectory().resolve(DIR_DEFAULT_CATALOG_PERSISTENCE_SERVICE_TEST), 60, 60, StorageOptions.DEFAULT_OUTPUT_BUFFER_SIZE, 1, - true, 1.0, 0L, false, + false, true, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE ); } diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java index f7bd95d3e0..d9d39e6748 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultIsolatedWalServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -87,6 +87,7 @@ class DefaultIsolatedWalServiceTest implements EvitaTestSupport { ); private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( getTestDirectory().resolve(transactionId.toString()), + false, observableOutputKeeper, new OffHeapMemoryManager(TEST_CATALOG, 512, 1) ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java index 19a836df48..3d6127a08f 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexSerializationServiceTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ class OffsetIndexSerializationServiceTest { void shouldComputeExpectedRecordCountProperly() { final StorageOptions testOptions = new StorageOptions( Path.of(""), Path.of(""), 1, 0, 55, 1, - false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE + false, false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE ); assertEquals(new OffsetIndexSerializationService.ExpectedCounts(0, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 0)); assertEquals(new OffsetIndexSerializationService.ExpectedCounts(1, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 1)); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java index f9535b00e0..afa0974900 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/OffsetIndexTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.function.Supplier; import static io.evitadb.store.offsetIndex.OffsetIndexSerializationService.computeExpectedRecordCount; import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST; @@ -199,7 +200,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { ), limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -221,7 +222,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { ), limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -242,7 +243,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() { snapshotBootstrapDescriptor, limitedBufferOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(snapshotPath, observableOutputKeeper), + new WriteOnlyFileHandle(snapshotPath, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -284,7 +285,7 @@ void shouldRemoveRecord() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -328,7 +329,7 @@ void shouldReadBinaryRecordAndDeserializeManually() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -384,7 +385,7 @@ void shouldReadSingleRecordAndUsingManualDeserialization() { i ); - final EntityBodyStoragePart entityBody = OffsetIndex.readSingleRecord( + final Supplier entityBodySupplier = () -> OffsetIndex.readSingleRecord( targetFile, offsetIndexDescriptor.fileLocation(), key, @@ -395,8 +396,9 @@ void shouldReadSingleRecordAndUsingManualDeserialization() { .orElse(null) ); if (i < recordCount * (iterationCount - 1) && i % recordCount < removedRecords && i % recordCount > 0) { - assertNull(entityBody); + assertThrows(NullPointerException.class, entityBodySupplier::get); } else { + final EntityBodyStoragePart entityBody = entityBodySupplier.get(); assertNotNull(entityBody); assertEquals( new EntityBodyStoragePart(i), @@ -418,7 +420,7 @@ void shouldRefuseOperationAfterClose() { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -448,7 +450,7 @@ void generationalProofTest(GenerationalTestInput input) { ), buildOptionsWithLimitedBuffer(), offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ) @@ -522,7 +524,7 @@ void generationalProofTest(GenerationalTestInput input) { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(currentFilePath.get(), observableOutputKeeper), + new WriteOnlyFileHandle(currentFilePath.get(), false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -610,7 +612,7 @@ void generationalProofTest(GenerationalTestInput input) { ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(newPath, observableOutputKeeper), + new WriteOnlyFileHandle(newPath, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -653,7 +655,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex( ), storageOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -675,7 +677,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex( ), storageOptions, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); @@ -718,7 +720,7 @@ private InsertionOutput createRecordsInFileOffsetIndex( ), options, offsetIndexRecordTypeRegistry, - new WriteOnlyFileHandle(targetFile, observableOutputKeeper), + new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper), nonFlushedBlock -> {}, oldestRecordTimestamp -> {} ); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java index a8a8d35138..c4e559f220 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandleTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ void shouldWriteDataToOffHeapChunk() { try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { writeHandle.checkAndExecuteAndSync( @@ -85,7 +85,7 @@ void shouldWriteLargeDataFirstToOffHeapChunkThatAutomaticallySwitchesToTemporary try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { @@ -116,7 +116,7 @@ void shouldWriteLargeDataFirstToOffHeapChunkThatAutomaticallySwitchesToTemporary try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { @@ -151,7 +151,7 @@ void shouldStartDirectlyWithFileBackupIfThereIsNoFreeMemoryRegionAvailable() { try ( final OffHeapMemoryManager memoryManager = new OffHeapMemoryManager(TEST_CATALOG, 32, 1); final WriteOnlyOffHeapWithFileBackupHandle realMemoryHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { // we need to write at least one byte to the real memory handle to force the memory manager @@ -164,7 +164,7 @@ void shouldStartDirectlyWithFileBackupIfThereIsNoFreeMemoryRegionAvailable() { // because there is only one region available - this will force the handle to use the file backup immediately try ( final WriteOnlyOffHeapWithFileBackupHandle forcedFileHandle = new WriteOnlyOffHeapWithFileBackupHandle( - targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), outputKeeper, memoryManager + targetDirectory.resolve(UUIDUtil.randomUUID() + ".tmp"), false, outputKeeper, memoryManager ) ) { for (int i = 0; i < 5; i++) { diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java index f8650cf794..b0043faaef 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -333,7 +333,7 @@ private Map> writeWal(@Nonnull OffHeapMemoryManager offHeap UUID.randomUUID(), KryoFactory.createKryo(WalKryoConfigurer.INSTANCE), new WriteOnlyOffHeapWithFileBackupHandle( - isolatedWalFilePath, observableOutputKeeper, offHeapMemoryManager + isolatedWalFilePath, false, observableOutputKeeper, offHeapMemoryManager ) ); diff --git a/evita_server/src/main/resources/evita-configuration.yaml b/evita_server/src/main/resources/evita-configuration.yaml index 038b8446b2..3d720e2abd 100644 --- a/evita_server/src/main/resources/evita-configuration.yaml +++ b/evita_server/src/main/resources/evita-configuration.yaml @@ -29,6 +29,7 @@ storage: waitOnCloseSeconds: ${storage.waitOnCloseSeconds:60} outputBufferSize: ${storage.outputBufferSize:4MB} maxOpenedReadHandles: ${storage.maxOpenedReadHandles:12} + syncWrites: ${storage.syncWrites:true} computeCRC32C: ${storage.computeCRC32C:true} minimalActiveRecordShare: ${storage.minimalActiveRecordShare:0.5} fileSizeCompactionThresholdBytes: ${storage.fileSizeCompactionThresholdBytes:100M} diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java index 2737d77998..63971e7a76 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/OffsetIndex.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -230,7 +230,7 @@ public class OffsetIndex { * @param The type of the storage part. * @return deserialized storage part or null if the record was not found */ - @Nullable + @Nonnull public static T readSingleRecord( @Nonnull Path filePath, @Nonnull FileLocation fileLocation, @@ -251,7 +251,9 @@ public static T readSingleRecord( fileLocation, filteringOffsetIndexBuilder ); - return storagePartReader.apply(filteringOffsetIndexBuilder, input); + return Objects.requireNonNull( + storagePartReader.apply(filteringOffsetIndexBuilder, input) + ); } catch (FileNotFoundException e) { throw new UnexpectedIOException( "Cannot create read offset file index from file `" + filePath + "`!", @@ -1649,7 +1651,7 @@ public int countDifference(long catalogVersion) { // scan non-flushed values final ConcurrentHashMap nvValues = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvValues != null) { int index = Arrays.binarySearch(nv, catalogVersion); if (index != -1) { final int startIndex = index >= 0 ? index : -index - 1; @@ -1673,7 +1675,7 @@ public int countDifference(long catalogVersion) { } if (hv != null) { int index = Arrays.binarySearch(hv, catalogVersion); - if (index != -1) { + if (index != -1 && hvValues != null) { final int startIndex = index >= 0 ? index : -index - 1; for (int ix = hv.length - 1; ix > startIndex && ix >= 0; ix--) { final PastMemory differenceSet = hvValues.get(hv[ix]); @@ -1701,7 +1703,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { final long[] nv = this.nonFlushedVersions; if (nv != null) { int index = Arrays.binarySearch(nv, catalogVersion); - if (index != -1) { + if (index != -1 && nvValues != null) { final int startIndex = index >= 0 ? index : -index - 1; for (int ix = nv.length - 1; ix >= startIndex && ix >= 0; ix--) { final NonFlushedValueSet nonFlushedValueSet = nvValues.get(nv[ix]); @@ -1721,7 +1723,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { } finally { this.lock.unlock(); } - if (hv != null) { + if (hv != null && hvValues != null) { int index = Arrays.binarySearch(hv, catalogVersion); if (index != -1) { final int startIndex = index >= 0 ? index : -index - 1; @@ -1747,7 +1749,7 @@ public int countDifference(long catalogVersion, byte recordTypeId) { public Optional getNonFlushedValueIfVersionMatches(long catalogVersion, @Nonnull RecordKey key) { final ConcurrentHashMap nvSet = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvSet != null) { int index = Arrays.binarySearch(nv, catalogVersion); final int startIndex = index >= 0 ? index : -index - 2; if (startIndex >= 0) { @@ -1784,7 +1786,8 @@ public OptionalLong getLastNonFlushedCatalogVersionIfExists() { * @return true if there are non-flushed values, false otherwise */ public boolean hasValuesToFlush() { - return !(nonFlushedValues == null || nonFlushedValues.isEmpty()); + final ConcurrentHashMap nvSet = this.nonFlushedValues; + return nvSet != null && !nvSet.isEmpty(); } /** @@ -1815,6 +1818,7 @@ public Optional getVolatileValueInformation(long catal final long examinedVersion = hv[ix]; final PastMemory pastMemory = hvValues.get(examinedVersion); if (pastMemory.getRemovedKeys().contains(key)) { + //noinspection DataFlowIssue addedInFuture = false; } if (pastMemory.getAddedKeys().contains(key) && examinedVersion != catalogVersion) { @@ -1871,7 +1875,7 @@ public void removeValue(long catalogVersion, @Nonnull RecordKey key, @Nonnull Fi public Collection getNonFlushedEntriesToPromote(long catalogVersion) { final ConcurrentHashMap nvSet = this.nonFlushedValues; final long[] nv = this.nonFlushedVersions; - if (nv != null) { + if (nv != null && nvSet != null) { Assert.isPremiseValid( catalogVersion >= nv[nv.length - 1], "Catalog version is expected to be at least " + nv[nv.length - 1] + "!" @@ -1912,13 +1916,14 @@ public void recordHistoricalVersions( if (versionToPurge > -1) { try { this.lock.lock(); - if (this.historicalVersions != null) { - final long[] versionsToPurge = this.historicalVersions; + final long[] versionsToPurge = this.historicalVersions; + final ConcurrentHashMap theVolatileValues = this.volatileValues; + if (versionsToPurge != null && theVolatileValues != null) { int index = Arrays.binarySearch(versionsToPurge, versionToPurge); final int startIndex = index >= 0 ? index : -index - 2; if (index != -1) { for (int ix = startIndex; ix >= 0; ix--) { - this.volatileValues.remove(versionsToPurge[ix]); + theVolatileValues.remove(versionsToPurge[ix]); } } this.historicalVersions = Arrays.copyOfRange(versionsToPurge, startIndex + 1, versionsToPurge.length); @@ -1933,16 +1938,18 @@ public void recordHistoricalVersions( final long catalogVersion = valuesToPromote.getCatalogVersion(); try { this.lock.lock(); - if (this.historicalVersions == null) { + final long[] hv = this.historicalVersions; + final ConcurrentHashMap theVolatileValues = this.volatileValues; + if (hv == null || theVolatileValues == null) { + final ConcurrentHashMap newVolatileValues = CollectionUtils.createConcurrentHashMap(16); + newVolatileValues.put(catalogVersion, valuesToPromote.createFrom(keyToLocations)); this.historicalVersions = new long[]{catalogVersion}; - this.volatileValues = CollectionUtils.createConcurrentHashMap(16); - this.volatileValues.put(catalogVersion, valuesToPromote.createFrom(keyToLocations)); + this.volatileValues = newVolatileValues; } else { - this.volatileValues.compute( + theVolatileValues.compute( catalogVersion, (key, value) -> { if (value == null) { - final long[] hv = this.historicalVersions; this.historicalVersions = ArrayUtils.insertLongIntoOrderedArray(catalogVersion, hv); return valuesToPromote.createFrom(keyToLocations); } else { @@ -2041,14 +2048,17 @@ public Optional getOldestRecordKeptTimestamp() { */ public boolean contains(@Nonnull RecordKey key) { final long[] nv = this.nonFlushedVersions; - for (int i = nv.length - 1; i >= 0; i--) { - long nonFlushedVersion = nv[i]; - final NonFlushedValueSet nfSet = this.nonFlushedValues.get(nonFlushedVersion); - if (nfSet != null) { - if (nfSet.removedKeys.contains(key)) { - return false; - } else if (nfSet.addedKeys.contains(key)) { - return true; + final ConcurrentHashMap theNonVlushedValues = this.nonFlushedValues; + if (nv != null && theNonVlushedValues != null) { + for (int i = nv.length - 1; i >= 0; i--) { + long nonFlushedVersion = nv[i]; + final NonFlushedValueSet nfSet = theNonVlushedValues.get(nonFlushedVersion); + if (nfSet != null) { + if (nfSet.removedKeys.contains(key)) { + return false; + } else if (nfSet.addedKeys.contains(key)) { + return true; + } } } } @@ -2063,17 +2073,19 @@ public boolean contains(@Nonnull RecordKey key) { */ @Nonnull private NonFlushedValueSet getNonFlushedValues(long catalogVersion) { - if (this.nonFlushedVersions == null) { - this.nonFlushedValues = CollectionUtils.createConcurrentHashMap(16); + final long[] nv = this.nonFlushedVersions; + final ConcurrentHashMap theNonFlushedValues = this.nonFlushedValues; + if (nv == null || theNonFlushedValues == null) { + final ConcurrentHashMap newNonFlushedValues = CollectionUtils.createConcurrentHashMap(16); + final NonFlushedValueSet nvSet = new NonFlushedValueSet(catalogVersion, this::notifySizeIncrease); + newNonFlushedValues.put(catalogVersion, nvSet); + this.nonFlushedValues = newNonFlushedValues; this.nonFlushedVersions = new long[]{catalogVersion}; - final NonFlushedValueSet nv = new NonFlushedValueSet(catalogVersion, this::notifySizeIncrease); - this.nonFlushedValues.put(catalogVersion, nv); - return nv; + return nvSet; } else { - return this.nonFlushedValues.computeIfAbsent( + return theNonFlushedValues.computeIfAbsent( catalogVersion, cv -> { - final long[] nv = this.nonFlushedVersions; final long lastCatalogVersion = nv[nv.length - 1]; Assert.isPremiseValid( lastCatalogVersion == -1 || lastCatalogVersion <= catalogVersion, @@ -2253,7 +2265,7 @@ private record NonFlushedValuesWithFileLocation( * @param addedInFuture true if the value was added in future versions */ protected record VolatileValueInformation( - @Nonnull VersionedValue versionedValue, + @Nullable VersionedValue versionedValue, boolean removed, boolean addedInFuture ) { diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java index bee77eba33..85d16972cb 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyFileHandle.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -98,6 +98,11 @@ public class WriteOnlyFileHandle implements WriteOnlyHandle { * If a thread cannot acquire the lock within this time, a StorageException is thrown. */ private final long lockTimeoutSeconds; + /** + * Execute fsync when asked. When set to false, methods simply flush the buffers, but doesn't explicitly sync + * the data on the persistent storage - leaving it to the OS to decide when to do so. + */ + private final boolean fsSync; /** * The path to the target file that this handle is associated with. * This handle provides write-only access to the file at this path. @@ -177,11 +182,13 @@ static File getTargetFile(@Nonnull Path filePath) { * @param os The observable output stream to synchronize. * @throws SyncFailedException if the synchronization operation failed. */ - private static void doSync(@Nonnull ObservableOutput os) { + private static void doSync(@Nonnull ObservableOutput os, boolean fsSync) { // execute fsync so that data are really stored to the disk try { os.flush(); - os.getOutputStream().getFD().sync(); + if (fsSync) { + os.getOutputStream().getFD().sync(); + } } catch (IOException e) { throw new SyncFailedException(e); } @@ -189,15 +196,17 @@ private static void doSync(@Nonnull ObservableOutput os) { public WriteOnlyFileHandle( @Nonnull Path targetFile, + boolean fsSync, @Nonnull ObservableOutputKeeper observableOutputKeeper ) { - this(null, null, null, targetFile, observableOutputKeeper); + this(null, null, null, fsSync, targetFile, observableOutputKeeper); } public WriteOnlyFileHandle( @Nullable String catalogName, @Nullable FileType fileType, @Nullable String logicalName, + boolean fsSync, @Nonnull Path targetFile, @Nonnull ObservableOutputKeeper observableOutputKeeper ) { @@ -205,6 +214,7 @@ public WriteOnlyFileHandle( this.fileType = fileType; this.logicalName = logicalName; this.lockTimeoutSeconds = observableOutputKeeper.getLockTimeoutSeconds(); + this.fsSync = fsSync; this.targetFile = targetFile; Assert.isPremiseValid(getTargetFile(targetFile) != null, "Target file should be created or exception thrown!"); this.observableOutputKeeper = observableOutputKeeper; @@ -243,7 +253,7 @@ public void checkAndExecuteAndSync(@Nonnull String operation, @Nonnull Runnable OUTPUT_FACTORY, observableOutput -> { logic.accept(observableOutput); - doSync(observableOutput); + doSync(observableOutput, this.fsSync); } ); return; @@ -269,7 +279,7 @@ public T checkAndExecuteAndSync(@Nonnull String operation, @Nonnull Runna OUTPUT_FACTORY, observableOutput -> { final S result = logic.apply(observableOutput); - doSync(observableOutput); + doSync(observableOutput, this.fsSync); return postExecutionLogic.apply(observableOutput, result); } ); diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java index 43fde3e5bf..39c63ed5ca 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/io/WriteOnlyOffHeapWithFileBackupHandle.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ import io.evitadb.utils.Assert; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.FileOutputStream; import java.io.IOException; @@ -89,11 +90,16 @@ public class WriteOnlyOffHeapWithFileBackupHandle implements WriteOnlyHandle { /** * OutputStream that is used to write data to the off-heap memory. */ - private ObservableOutput offHeapMemoryOutput; + @Nullable private ObservableOutput offHeapMemoryOutput; /** * OutputStream that is used to write data to the file. */ - private ObservableOutput fileOutput; + @Nullable private ObservableOutput fileOutput; + /** + * Execute fsync when asked. When set to false, methods simply flush the buffers, but doesn't explicitly sync + * the data on the persistent storage - leaving it to the OS to decide when to do so. + */ + private final boolean fsSync; /** * Contains the information about the last end byte of fully written record. */ @@ -105,11 +111,11 @@ public class WriteOnlyOffHeapWithFileBackupHandle implements WriteOnlyHandle { * @param os The observable output stream to synchronize. * @throws SyncFailedException if the synchronization operation failed. */ - private static void doSync(@Nonnull ObservableOutput os) { + private static void doSync(@Nonnull ObservableOutput os, boolean fsSync) { // execute fsync so that data are really stored to the disk try { os.flush(); - if (os.getOutputStream() instanceof FileOutputStream fileOutputStream) { + if (fsSync && os.getOutputStream() instanceof FileOutputStream fileOutputStream) { fileOutputStream.getFD().sync(); } } catch (IOException e) { @@ -119,11 +125,13 @@ private static void doSync(@Nonnull ObservableOutput os) { public WriteOnlyOffHeapWithFileBackupHandle( @Nonnull Path targetFile, + boolean fsSync, @Nonnull ObservableOutputKeeper observableOutputKeeper, @Nonnull OffHeapMemoryManager offHeapMemoryManager ) { - this.offHeapMemoryManager = offHeapMemoryManager; this.targetFile = targetFile; + this.fsSync = fsSync; + this.offHeapMemoryManager = offHeapMemoryManager; this.observableOutputKeeper = observableOutputKeeper; } @@ -333,7 +341,7 @@ private T executeLogic( ) { final T result = logic.apply(output); if (sync) { - doSync(output); + doSync(output, this.fsSync); } // update the last consistent written position lastConsistentWrittenPosition = Math.toIntExact(output.total()); diff --git a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java index 8b62714a79..adf821d209 100644 --- a/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java +++ b/evita_store/evita_store_key_value/src/main/java/io/evitadb/store/offsetIndex/model/VersionedValue.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ import io.evitadb.store.offsetIndex.OffsetIndex; import io.evitadb.utils.MemoryMeasuringConstants; -import javax.annotation.Nullable; +import javax.annotation.Nonnull; import java.io.Serial; import java.io.Serializable; @@ -43,7 +43,7 @@ public record VersionedValue( long primaryKey, byte recordType, - @Nullable FileLocation fileLocation + @Nonnull FileLocation fileLocation ) implements Serializable { @Serial private static final long serialVersionUID = -4467999274212489366L; public static final long MEMORY_SIZE = 2 * MemoryMeasuringConstants.OBJECT_HEADER_SIZE + diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java index 3ce15dfd61..441c51810c 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/CatalogOffsetIndexStoragePartPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2024 + * Copyright (c) 2024-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ import java.nio.file.Path; import java.time.OffsetDateTime; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -172,6 +173,7 @@ public static CatalogOffsetIndexStoragePartPersistenceService create( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -273,10 +275,12 @@ static OffsetIndexDescriptor loadOffsetIndexDescriptor( final Kryo kryo = KryoFactory.createKryo( SharedClassesConfigurer.INSTANCE.andThen(CatalogHeaderKryoConfigurer.INSTANCE) ); - final CatalogHeader theCatalogHeader = StorageRecord.read( - theInput, catalogHeaderLocation, - (input, recordLength) -> kryo.readObject(input, CatalogHeader.class) - ).payload(); + final CatalogHeader theCatalogHeader = Objects.requireNonNull( + StorageRecord.read( + theInput, catalogHeaderLocation, + (input, recordLength) -> kryo.readObject(input, CatalogHeader.class) + ).payload() + ); catalogHeaderConsumer.accept(theCatalogHeader); return new OffsetIndexDescriptor( @@ -335,6 +339,7 @@ private static OffsetIndex loadOffsetIndex( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -357,6 +362,7 @@ private static OffsetIndex loadOffsetIndex( catalogName, FileType.CATALOG, catalogName, + storageOptions.syncWrites(), catalogFilePath, observableOutputKeeper ), @@ -372,7 +378,7 @@ private static OffsetIndex loadOffsetIndex( private CatalogOffsetIndexStoragePartPersistenceService( long catalogVersion, - @Nullable CatalogHeader catalogHeader, + @Nonnull CatalogHeader catalogHeader, @Nonnull TransactionOptions transactionOptions, @Nonnull OffsetIndex offsetIndex, @Nonnull OffHeapMemoryManager offHeapMemoryManager, @@ -396,10 +402,12 @@ private CatalogOffsetIndexStoragePartPersistenceService( @Nonnull @Override public CatalogHeader getCatalogHeader(long catalogVersion) { - if (currentCatalogHeader == null) { - currentCatalogHeader = offsetIndex.get(catalogVersion, 1L, CatalogHeader.class); + if (this.currentCatalogHeader == null) { + this.currentCatalogHeader = Objects.requireNonNull( + this.offsetIndex.get(catalogVersion, 1L, CatalogHeader.class) + ); } - return currentCatalogHeader; + return this.currentCatalogHeader; } @Override diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java index 79e2096e39..d1e0d5ebb5 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -858,6 +858,7 @@ public DefaultCatalogPersistenceService( this.catalogName, FileType.CATALOG, this.catalogName, + storageOptions.syncWrites(), this.catalogStoragePath.resolve(getCatalogBootstrapFileName(catalogName)), this.observableOutputKeeper ) @@ -940,6 +941,7 @@ public DefaultCatalogPersistenceService( this.catalogName, FileType.CATALOG, this.catalogName, + storageOptions.syncWrites(), this.catalogStoragePath.resolve(getCatalogBootstrapFileName(catalogName)), this.observableOutputKeeper ) @@ -1442,6 +1444,7 @@ public IsolatedWalPersistenceService createIsolatedWalPersistenceService(@Nonnul this.transactionOptions.transactionWorkDirectory() .resolve(transactionId.toString()) .resolve(transactionId + ".wal"), + this.storageOptions.syncWrites(), this.observableOutputKeeper, this.offHeapMemoryManager ) @@ -1614,6 +1617,7 @@ public CatalogPersistenceService replaceWith( catalogNameToBeReplaced, FileType.CATALOG, catalogNameToBeReplaced, + storageOptions.syncWrites(), newPath.resolve(getCatalogBootstrapFileName(catalogNameToBeReplaced)), this.observableOutputKeeper ), @@ -2221,6 +2225,7 @@ private CatalogBootstrap writeCatalogBootstrap( originalBootstrapHandle, new WriteOnlyFileHandle( originalBootstrapHandle.getTargetFile(), + storageOptions.syncWrites(), this.observableOutputKeeper ) ), @@ -2275,6 +2280,7 @@ void trimBootstrapFile(long catalogVersion) { originalBootstrapHandle, new WriteOnlyFileHandle( originalBootstrapHandle.getTargetFile(), + storageOptions.syncWrites(), this.observableOutputKeeper ) ), @@ -2498,6 +2504,7 @@ private WriteOnlyFileHandle createNewBootstrapTempWriteHandle(@Nonnull String ne // create new file and replace the former one with it return new WriteOnlyFileHandle( Files.createTempFile(CatalogPersistenceService.getCatalogBootstrapFileName(newCatalogName), ".tmp"), + storageOptions.syncWrites(), this.observableOutputKeeper ); } catch (IOException e) { diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java index 5aaeae4f5d..ea7422c59e 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultEntityCollectionPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -763,6 +763,7 @@ public DefaultEntityCollectionPersistenceService( catalogName, FileType.ENTITY_COLLECTION, this.entityCollectionFileReference.entityType(), + storageOptions.syncWrites(), this.entityCollectionFile, observableOutputKeeper ), @@ -817,6 +818,7 @@ public DefaultEntityCollectionPersistenceService( catalogName, FileType.ENTITY_COLLECTION, this.entityCollectionFileReference.entityType(), + storageOptions.syncWrites(), this.entityCollectionFile, this.observableOutputKeeper ), diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java index b2efebdf7e..da305a31ea 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/TransactionalStoragePartPersistenceService.java @@ -6,7 +6,7 @@ * | __/\ V /| | || (_| | |_| | |_) | * \___| \_/ |_|\__\__,_|____/|____/ * - * Copyright (c) 2023-2024 + * Copyright (c) 2023-2025 * * Licensed under the Business Source License, Version 1.1 (the "License"); * you may not use this file except in compliance with the License. @@ -107,6 +107,7 @@ public TransactionalStoragePartPersistenceService( offsetIndexRecordTypeRegistry, new WriteOnlyOffHeapWithFileBackupHandle( this.targetFile, + storageOptions.syncWrites(), observableOutputKeeper, offHeapMemoryManager ), diff --git a/evita_test_support/src/main/resources/evita-configuration.yaml b/evita_test_support/src/main/resources/evita-configuration.yaml index e92bdbb840..54f0ba10c0 100644 --- a/evita_test_support/src/main/resources/evita-configuration.yaml +++ b/evita_test_support/src/main/resources/evita-configuration.yaml @@ -29,6 +29,7 @@ storage: waitOnCloseSeconds: ${storage.waitOnCloseSeconds:60} outputBufferSize: ${storage.outputBufferSize:4MB} maxOpenedReadHandles: ${storage.maxOpenedReadHandles:12} + syncWrites: ${storage.syncWrites:true} computeCRC32C: ${storage.computeCRC32C:true} minimalActiveRecordShare: ${storage.minimalActiveRecordShare:0.5} fileSizeCompactionThresholdBytes: ${storage.fileSizeCompactionThresholdBytes:100M}