Skip to content

Commit

Permalink
Merge pull request #768 from FgForrest/767-make-fssync-configurable
Browse files Browse the repository at this point in the history
feat: Make fsSync configurable
  • Loading branch information
novoj authored Jan 4, 2025
2 parents 308fc28 + c5c1f4c commit df5b29f
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 94 deletions.
9 changes: 9 additions & 0 deletions documentation/user/en/operate/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ storage: # [see Storage configuration](
waitOnCloseSeconds: 60
outputBufferSize: 4MB
maxOpenedReadHandles: 12
syncWrites: true
computeCRC32C: true
minimalActiveRecordShare: 0.5
fileSizeCompactionThresholdBytes: 100MB
Expand Down Expand Up @@ -490,6 +491,14 @@ This section contains configuration options for the storage layer of the databas
[MacOS](https://gist.github.com/tombigel/d503800a282fcadbee14b537735d202c)
</Note>
</dd>
<dt>syncWrites</dt>
<dd>
<p>**Default:** `true`</p>
<p>Determines whether the storage layer forces the operating system to flush the internal buffers to disk at
regular "safe points" or not. The default is true, so that data is not lost in the event of a power failure.
There are situations where disabling this feature can improve performance and the client can accept the risk
of data loss (e.g. when running automated tests, etc.).</p>
</dd>
<dt>computeCRC32C</dt>
<dd>
<p>**Default:** `true`</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023-2024
* Copyright (c) 2023-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,13 @@
* purposes. The size of the buffer limits the maximum size of an individual record in the
* key/value data store.
* @param maxOpenedReadHandles Maximum number of simultaneously opened {@link java.io.InputStream} to file offset index file.
* @param computeCRC32C Contains setting that determined whether CRC32C checksums will be computed for written
* @param syncWrites Determines whether the storage layer forces the operating system to flush
* the internal buffers to disk at regular "safe points" or not. The default
* is true, so that data is not lost in the event of a power failure. There
* are situations where disabling this feature can improve performance and
* the client can accept the risk of data loss (e.g. when running automated
* tests, etc.).
* @param computeCRC32C Determines whether CRC32C checksums will be computed for written
* records and also whether the CRC32C checksum will be checked on record read.
* @param minimalActiveRecordShare Minimal share of active records in the file. If the share is lower, the file will
* be compacted.
Expand All @@ -79,6 +85,7 @@ public record StorageOptions(
long waitOnCloseSeconds,
int outputBufferSize,
int maxOpenedReadHandles,
boolean syncWrites,
boolean computeCRC32C,
double minimalActiveRecordShare,
long fileSizeCompactionThresholdBytes,
Expand All @@ -93,6 +100,7 @@ public record StorageOptions(
public static final int DEFAULT_LOCK_TIMEOUT_SECONDS = 5;
public static final int DEFAULT_WAIT_ON_CLOSE_SECONDS = 5;
public static final int DEFAULT_MAX_OPENED_READ_HANDLES = Runtime.getRuntime().availableProcessors();
public static final boolean DEFAULT_SYNC_WRITES = true;
public static final boolean DEFAULT_COMPUTE_CRC = true;
public static final double DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE = 0.5;
public static final long DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD = 104_857_600L; // 100MB
Expand All @@ -103,12 +111,14 @@ public record StorageOptions(
/**
* Builder method is planned to be used only in tests.
*/
@Nonnull
public static StorageOptions temporary() {
return new StorageOptions(
Path.of(System.getProperty("java.io.tmpdir"), "evita/data"),
Path.of(System.getProperty("java.io.tmpdir"), "evita/export"),
5, 5, DEFAULT_OUTPUT_BUFFER_SIZE,
Runtime.getRuntime().availableProcessors(),
false,
true,
DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE,
DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD,
Expand All @@ -121,13 +131,15 @@ public static StorageOptions temporary() {
/**
* Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future.
*/
@Nonnull
public static StorageOptions.Builder builder() {
return new StorageOptions.Builder();
}

/**
* Builder for the storage options. Recommended to use to avoid binary compatibility problems in the future.
*/
@Nonnull
public static StorageOptions.Builder builder(@Nonnull StorageOptions storageOptions) {
return new StorageOptions.Builder(storageOptions);
}
Expand All @@ -140,6 +152,7 @@ public StorageOptions() {
DEFAULT_WAIT_ON_CLOSE_SECONDS,
DEFAULT_OUTPUT_BUFFER_SIZE,
DEFAULT_MAX_OPENED_READ_HANDLES,
DEFAULT_SYNC_WRITES,
DEFAULT_COMPUTE_CRC,
DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE,
DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD,
Expand All @@ -156,6 +169,7 @@ public StorageOptions(
long waitOnCloseSeconds,
int outputBufferSize,
int maxOpenedReadHandles,
boolean syncWrites,
boolean computeCRC32C,
double minimalActiveRecordShare,
long fileSizeCompactionThresholdBytes,
Expand All @@ -169,6 +183,7 @@ public StorageOptions(
this.waitOnCloseSeconds = waitOnCloseSeconds;
this.outputBufferSize = outputBufferSize;
this.maxOpenedReadHandles = maxOpenedReadHandles;
this.syncWrites = syncWrites;
this.computeCRC32C = computeCRC32C;
this.minimalActiveRecordShare = minimalActiveRecordShare;
this.fileSizeCompactionThresholdBytes = fileSizeCompactionThresholdBytes;
Expand All @@ -188,6 +203,7 @@ public static class Builder {
private long waitOnCloseSeconds = DEFAULT_WAIT_ON_CLOSE_SECONDS;
private int outputBufferSize = DEFAULT_OUTPUT_BUFFER_SIZE;
private int maxOpenedReadHandles = DEFAULT_MAX_OPENED_READ_HANDLES;
private boolean syncWrites = DEFAULT_SYNC_WRITES;
private boolean computeCRC32C = DEFAULT_COMPUTE_CRC;
private double minimalActiveRecordShare = DEFAULT_MINIMAL_ACTIVE_RECORD_SHARE;
private long fileSizeCompactionThresholdBytes = DEFAULT_MINIMAL_FILE_SIZE_COMPACTION_THRESHOLD;
Expand All @@ -205,6 +221,7 @@ public static class Builder {
this.waitOnCloseSeconds = storageOptions.waitOnCloseSeconds;
this.outputBufferSize = storageOptions.outputBufferSize;
this.maxOpenedReadHandles = storageOptions.maxOpenedReadHandles;
this.syncWrites = storageOptions.syncWrites;
this.computeCRC32C = storageOptions.computeCRC32C;
this.minimalActiveRecordShare = storageOptions.minimalActiveRecordShare;
this.fileSizeCompactionThresholdBytes = storageOptions.fileSizeCompactionThresholdBytes;
Expand Down Expand Up @@ -251,6 +268,12 @@ public Builder maxOpenedReadHandles(int maxOpenedReadHandles) {
return this;
}

@Nonnull
public Builder syncWrites(boolean syncWrites) {
this.syncWrites = syncWrites;
return this;
}

@Nonnull
public Builder computeCRC32(boolean computeCRC32) {
this.computeCRC32C = computeCRC32;
Expand Down Expand Up @@ -296,6 +319,7 @@ public StorageOptions build() {
waitOnCloseSeconds,
outputBufferSize,
maxOpenedReadHandles,
syncWrites,
computeCRC32C,
minimalActiveRecordShare,
fileSizeCompactionThresholdBytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023-2024
* Copyright (c) 2023-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1610,7 +1610,10 @@ private Map<Long, List<EntityContract>> appendWal(
UUID.randomUUID(),
KryoFactory.createKryo(WalKryoConfigurer.INSTANCE),
new WriteOnlyOffHeapWithFileBackupHandle(
isolatedWalFilePath, this.observableOutputKeeper, offHeapMemoryManager
isolatedWalFilePath,
false,
this.observableOutputKeeper,
offHeapMemoryManager
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023-2024
* Copyright (c) 2023-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -159,6 +159,7 @@ class DefaultCatalogPersistenceServiceTest implements EvitaTestSupport {
);
private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle(
getTestDirectory().resolve(transactionId.toString()),
false,
observableOutputKeeper,
new OffHeapMemoryManager(TEST_CATALOG, 512, 1)
);
Expand Down Expand Up @@ -248,6 +249,7 @@ protected Kryo create() {
catalogName,
FileType.CATALOG,
catalogName,
false,
catalogFilePath,
observableOutputKeeper
),
Expand Down Expand Up @@ -796,7 +798,7 @@ private StorageOptions getStorageOptions() {
getTestDirectory().resolve(DIR_DEFAULT_CATALOG_PERSISTENCE_SERVICE_TEST),
60, 60,
StorageOptions.DEFAULT_OUTPUT_BUFFER_SIZE, 1,
true, 1.0, 0L, false,
false, true, 1.0, 0L, false,
Long.MAX_VALUE, Long.MAX_VALUE
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2024
* Copyright (c) 2024-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,6 +87,7 @@ class DefaultIsolatedWalServiceTest implements EvitaTestSupport {
);
private final WriteOnlyOffHeapWithFileBackupHandle writeHandle = new WriteOnlyOffHeapWithFileBackupHandle(
getTestDirectory().resolve(transactionId.toString()),
false,
observableOutputKeeper,
new OffHeapMemoryManager(TEST_CATALOG, 512, 1)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023-2024
* Copyright (c) 2023-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,7 @@ class OffsetIndexSerializationServiceTest {
void shouldComputeExpectedRecordCountProperly() {
final StorageOptions testOptions = new StorageOptions(
Path.of(""), Path.of(""), 1, 0, 55, 1,
false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE
false, false, 1.0, 0L, false, Long.MAX_VALUE, Long.MAX_VALUE
);
assertEquals(new OffsetIndexSerializationService.ExpectedCounts(0, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 0));
assertEquals(new OffsetIndexSerializationService.ExpectedCounts(1, 1), OffsetIndexSerializationService.computeExpectedRecordCount(testOptions, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2023-2024
* Copyright (c) 2023-2025
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Supplier;

import static io.evitadb.store.offsetIndex.OffsetIndexSerializationService.computeExpectedRecordCount;
import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST;
Expand Down Expand Up @@ -199,7 +200,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() {
),
limitedBufferOptions,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand All @@ -221,7 +222,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() {
),
limitedBufferOptions,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand All @@ -242,7 +243,7 @@ void shouldCopySnapshotOfTheBigFileOffsetIndexAndReconstruct() {
snapshotBootstrapDescriptor,
limitedBufferOptions,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(snapshotPath, observableOutputKeeper),
new WriteOnlyFileHandle(snapshotPath, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -284,7 +285,7 @@ void shouldRemoveRecord() {
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -328,7 +329,7 @@ void shouldReadBinaryRecordAndDeserializeManually() {
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -384,7 +385,7 @@ void shouldReadSingleRecordAndUsingManualDeserialization() {
i
);

final EntityBodyStoragePart entityBody = OffsetIndex.readSingleRecord(
final Supplier<EntityBodyStoragePart> entityBodySupplier = () -> OffsetIndex.readSingleRecord(
targetFile,
offsetIndexDescriptor.fileLocation(),
key,
Expand All @@ -395,8 +396,9 @@ void shouldReadSingleRecordAndUsingManualDeserialization() {
.orElse(null)
);
if (i < recordCount * (iterationCount - 1) && i % recordCount < removedRecords && i % recordCount > 0) {
assertNull(entityBody);
assertThrows(NullPointerException.class, entityBodySupplier::get);
} else {
final EntityBodyStoragePart entityBody = entityBodySupplier.get();
assertNotNull(entityBody);
assertEquals(
new EntityBodyStoragePart(i),
Expand All @@ -418,7 +420,7 @@ void shouldRefuseOperationAfterClose() {
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -448,7 +450,7 @@ void generationalProofTest(GenerationalTestInput input) {
),
buildOptionsWithLimitedBuffer(),
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
)
Expand Down Expand Up @@ -522,7 +524,7 @@ void generationalProofTest(GenerationalTestInput input) {
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(currentFilePath.get(), observableOutputKeeper),
new WriteOnlyFileHandle(currentFilePath.get(), false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -610,7 +612,7 @@ void generationalProofTest(GenerationalTestInput input) {
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(newPath, observableOutputKeeper),
new WriteOnlyFileHandle(newPath, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -653,7 +655,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex(
),
storageOptions,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand All @@ -675,7 +677,7 @@ private InsertionOutput serializeAndReconstructBigFileOffsetIndex(
),
storageOptions,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down Expand Up @@ -718,7 +720,7 @@ private InsertionOutput createRecordsInFileOffsetIndex(
),
options,
offsetIndexRecordTypeRegistry,
new WriteOnlyFileHandle(targetFile, observableOutputKeeper),
new WriteOnlyFileHandle(targetFile, false, observableOutputKeeper),
nonFlushedBlock -> {},
oldestRecordTimestamp -> {}
);
Expand Down
Loading

0 comments on commit df5b29f

Please sign in to comment.