Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support WAL Compression #12476

Merged
merged 32 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8c52f5c
enable wal compression
THUMarkLau Mar 28, 2024
693ffb7
fix bug
THUMarkLau Mar 30, 2024
e2e6bc1
fix compilation problem
THUMarkLau May 7, 2024
896a357
remove useless code
THUMarkLau May 7, 2024
ee3a64f
recover some code
THUMarkLau May 7, 2024
1316653
support compression type in WAL Compress Header
THUMarkLau May 7, 2024
2728057
support multi version WAL
THUMarkLau May 11, 2024
e70df23
edit configuration item
THUMarkLau May 11, 2024
2db27b1
add log for WAL size
THUMarkLau May 13, 2024
b505d31
temp for debug
THUMarkLau May 18, 2024
95c0fd5
fix bug
THUMarkLau May 19, 2024
23d1743
remove useless log
THUMarkLau May 19, 2024
8bedc72
remove one configuration
THUMarkLau May 20, 2024
69ebc89
use compression rate to update wal disk usage
THUMarkLau May 24, 2024
d6ca95b
fix ut
THUMarkLau May 24, 2024
4373559
fix test
THUMarkLau May 25, 2024
27a15c3
set default to uncompress
THUMarkLau May 26, 2024
cd8939f
fix wal ut
THUMarkLau May 27, 2024
610a2e2
optimize calculating of wal size
THUMarkLau May 27, 2024
8929d41
close wal file when the origin size of wal buffer is larger than thre…
THUMarkLau May 27, 2024
b2d667f
add the size of magic string
THUMarkLau May 30, 2024
3297ffb
may be fix the bug
THUMarkLau May 30, 2024
b365347
fix with comment
THUMarkLau May 31, 2024
443d1a0
edit with review
THUMarkLau Jun 9, 2024
130930e
fix test
THUMarkLau Jun 10, 2024
ad021a7
add test for wal compression
THUMarkLau Jun 11, 2024
964bf9b
add hot reload
THUMarkLau Jun 11, 2024
0f17197
clean the code to make it more readable
THUMarkLau Jun 11, 2024
e48b7db
reuse the byte buffer if possible
THUMarkLau Jun 12, 2024
76cc139
Indicate the encoding of String
THUMarkLau Jun 17, 2024
5814812
Edit according to comment
THUMarkLau Jun 17, 2024
d0b76f3
spotless
THUMarkLau Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FSUtils;
Expand Down Expand Up @@ -1137,6 +1138,8 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";

private CompressionType WALCompressionAlgorithm = CompressionType.UNCOMPRESSED;

IoTDBConfig() {}

public int getMaxLogEntriesNumPerBatch() {
Expand Down Expand Up @@ -1881,7 +1884,7 @@ public long getWalFileSizeThresholdInByte() {
return walFileSizeThresholdInByte;
}

void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
public void setWalFileSizeThresholdInByte(long walFileSizeThresholdInByte) {
this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
}

Expand Down Expand Up @@ -3984,4 +3987,12 @@ public TDataNodeLocation generateLocalDataNodeLocation() {
new TEndPoint(getInternalAddress(), getSchemaRegionConsensusPort()));
return result;
}

public CompressionType getWALCompressionAlgorithm() {
return WALCompressionAlgorithm;
}

public void setWALCompressionAlgorithm(CompressionType WALCompressionAlgorithm) {
this.WALCompressionAlgorithm = WALCompressionAlgorithm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.fileSystem.FSType;
import org.apache.tsfile.utils.FilePathUtils;
Expand Down Expand Up @@ -416,6 +417,10 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
properties.getProperty(
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
boolean enableWALCompression =
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);

conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
Expand Down Expand Up @@ -1793,6 +1798,10 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
properties.getProperty(
"merge_threshold_of_explain_analyze",
String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
boolean enableWALCompression =
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);

// update Consensus config
reloadConsensusProps(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;

public abstract class AbstractNodeAllocationStrategy implements NodeAllocationStrategy {
Expand Down Expand Up @@ -72,8 +72,8 @@ protected IWALNode createWALNode(String identifier) {
protected IWALNode createWALNode(String identifier, String folder) {
try {
return new WALNode(identifier, folder);
} catch (FileNotFoundException e) {
logger.error("Fail to create wal node", e);
} catch (IOException e) {
logger.error("Meet exception when creating wal node", e);
return WALFakeNode.getFailureInstance(e);
}
}
Expand All @@ -82,7 +82,7 @@ protected IWALNode createWALNode(
String identifier, String folder, long startFileVersion, long startSearchIndex) {
try {
return new WALNode(identifier, folder, startFileVersion, startSearchIndex);
} catch (FileNotFoundException e) {
} catch (IOException e) {
OneSizeFitsQuorum marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Fail to create wal node", e);
return WALFakeNode.getFailureInstance(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
Expand Down Expand Up @@ -58,7 +57,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {

protected AbstractWALBuffer(
String identifier, String logDirectory, long startFileVersion, long startSearchIndex)
throws FileNotFoundException {
throws IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
Expand Down Expand Up @@ -119,7 +118,7 @@ public class WALBuffer extends AbstractWALBuffer {
// manage wal files which have MemTableIds
private final Map<Long, Set<Long>> memTableIdsOfWal = new ConcurrentHashMap<>();

public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
public WALBuffer(String identifier, String logDirectory) throws IOException {
this(identifier, logDirectory, new CheckpointManager(identifier, logDirectory), 0, 0L);
}

Expand All @@ -129,7 +128,7 @@ public WALBuffer(
CheckpointManager checkpointManager,
long startFileVersion,
long startSearchIndex)
throws FileNotFoundException {
throws IOException {
super(identifier, logDirectory, startFileVersion, startSearchIndex);
this.checkpointManager = checkpointManager;
currentFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
Expand Down Expand Up @@ -521,8 +520,9 @@ public void run() {
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(), usedRatio * 100);

// flush buffer to os
double compressionRatio = 1.0;
try {
currentWALFileWriter.write(syncingBuffer, info.metaData);
compressionRatio = currentWALFileWriter.write(syncingBuffer, info.metaData);
} catch (Throwable e) {
logger.error(
"Fail to sync wal node-{}'s buffer, change system mode to error.", identifier, e);
Expand All @@ -535,12 +535,14 @@ public void run() {
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new HashSet<>())
.addAll(info.metaData.getMemTablesId());
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage, compressionRatio);

boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
|| (forceFlag && currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
// TODO: Control the wal file by the number of WALEntry
|| (forceFlag
&& currentWALFileWriter.originalSize() >= config.getWalFileSizeThresholdInByte())) {
try {
rollLogWriter(searchIndex, currentWALFileWriter.getWalFileStatus());
forceSuccess = true;
Expand Down Expand Up @@ -582,7 +584,7 @@ public void run() {
position += fsyncListener.getWalEntryHandler().getSize();
}
}
lastFsyncPosition = currentWALFileWriter.size();
lastFsyncPosition = currentWALFileWriter.originalSize();
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
WRITING_METRICS.recordSyncWALBufferCost(System.nanoTime() - startTime, forceFlag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
Expand Down Expand Up @@ -78,7 +77,7 @@ public class CheckpointManager implements AutoCloseable {

// endregion

public CheckpointManager(String identifier, String logDirectory) throws FileNotFoundException {
public CheckpointManager(String identifier, String logDirectory) throws IOException {
this.identifier = identifier;
this.logDirectory = logDirectory;
File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
Expand Down Expand Up @@ -345,12 +344,13 @@ public long getFirstValidWALVersionId() {
}

/** Update wal disk cost of active memTables. */
public void updateCostOfActiveMemTables(Map<Long, Long> memTableId2WalDiskUsage) {
public void updateCostOfActiveMemTables(
Map<Long, Long> memTableId2WalDiskUsage, double compressionRate) {
for (Map.Entry<Long, Long> memTableWalUsage : memTableId2WalDiskUsage.entrySet()) {
memTableId2Info.computeIfPresent(
memTableWalUsage.getKey(),
(k, v) -> {
v.addWalDiskUsage(memTableWalUsage.getValue());
v.addWalDiskUsage((long) (memTableWalUsage.getValue() * compressionRate));
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
return v;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -47,8 +45,7 @@ public CheckpointReader(File logFile) {

private void init() {
checkpoints = new ArrayList<>();
try (DataInputStream logStream =
new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) {
try (DataInputStream logStream = new DataInputStream(new WALInputStream(logFile))) {
maxMemTableId = logStream.readLong();
while (logStream.available() > 0) {
Checkpoint checkpoint = Checkpoint.deserialize(logStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;

/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
public class CheckpointWriter extends LogWriter {
public CheckpointWriter(File logFile) throws FileNotFoundException {
public CheckpointWriter(File logFile) throws IOException {
super(logFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public interface ILogWriter extends Closeable {
*
* @param buffer content that have been converted to bytes
* @throws IOException if an I/O error occurs
* @return Compression rate of the buffer after compression
*/
void write(ByteBuffer buffer) throws IOException;
double write(ByteBuffer buffer) throws IOException;

/**
* Forces any updates to this file to be written to the storage device that contains it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@

package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;

import org.apache.tsfile.compress.ICompressor;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
* LogWriter writes the binary logs into a file, including writing {@link WALEntry} into .wal file
Expand All @@ -43,23 +47,89 @@ public abstract class LogWriter implements ILogWriter {
protected final File logFile;
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
protected long size;
protected long size = 0;
protected long originalSize = 0;
private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1);
private ICompressor compressor =
ICompressor.getCompressor(
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
private ByteBuffer compressedByteBuffer;
// Minimum size to compress, default is 32 KB
private static long minCompressionSize = 32 * 1024L;

protected LogWriter(File logFile) throws FileNotFoundException {
protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
if (!logFile.exists() || logFile.length() == 0) {
this.logChannel.write(
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
size += logChannel.position();
}
if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
!= CompressionType.UNCOMPRESSED) {
// TODO: Use a dynamic strategy to enlarge the buffer size
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
} else {
compressedByteBuffer = null;
}
}

@Override
public void write(ByteBuffer buffer) throws IOException {
size += buffer.position();
public double write(ByteBuffer buffer) throws IOException {
CompressionType compressionType =
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
int bufferSize = buffer.position();
if (bufferSize == 0) {
return 1.0;
}
originalSize += bufferSize;
buffer.flip();
boolean compressed = false;
int uncompressedSize = bufferSize;
if (compressionType != CompressionType.UNCOMPRESSED
/* Do not compress buffer that is less than min size */
&& bufferSize > minCompressionSize) {
if (Objects.isNull(compressedByteBuffer)) {
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
}
compressedByteBuffer.clear();
if (compressor.getType() != compressionType) {
compressor = ICompressor.getCompressor(compressionType);
}
compressor.compress(buffer, compressedByteBuffer);
buffer = compressedByteBuffer;
bufferSize = buffer.position();
buffer.flip();
compressed = true;
}
size += bufferSize;
/*
Header structure:
[CompressionType(1 byte)][dataBufferSize(4 bytes)][uncompressedSize(4 bytes)]
*/
headerBuffer.clear();
headerBuffer.put(
compressed ? compressionType.serialize() : CompressionType.UNCOMPRESSED.serialize());
headerBuffer.putInt(bufferSize);
if (compressed) {
headerBuffer.putInt(uncompressedSize);
}
size += headerBuffer.position();
try {
headerBuffer.flip();
logChannel.write(headerBuffer);
logChannel.write(buffer);
} catch (ClosedChannelException e) {
logger.warn("Cannot write to {}", logFile, e);
}
return ((double) bufferSize / uncompressedSize);
}

@Override
Expand All @@ -79,6 +149,10 @@ public long size() {
return size;
}

public long originalSize() {
return originalSize;
}

@Override
public File getLogFile() {
return logFile;
Expand All @@ -97,4 +171,8 @@ public void close() throws IOException {
}
}
}

public long getOffset() throws IOException {
return logChannel.position();
}
}
Loading
Loading