Skip to content

Commit

Permalink
Fix flush error due to compression ratio (#12953)
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou committed Jul 19, 2024
1 parent 4500875 commit a1be25e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,18 @@ void restore() throws IOException {
int maxRatioIndex = 0;
for (int i = 0; i < ratioFiles.length; i++) {
String[] fileNameArray = ratioFiles[i].getName().split("-");
long diskSize = Long.parseLong(fileNameArray[2]);
if (diskSize > totalDiskSize) {
totalMemorySize = new AtomicLong(Long.parseLong(fileNameArray[1]));
totalDiskSize = diskSize;
maxRatioIndex = i;
// fileNameArray.length != 3 means the compression ratio may be negative, ignore it
if (fileNameArray.length == 3) {
try {
long diskSize = Long.parseLong(fileNameArray[2]);
if (diskSize > totalDiskSize) {
totalMemorySize = new AtomicLong(Long.parseLong(fileNameArray[1]));
totalDiskSize = diskSize;
maxRatioIndex = i;
}
} catch (NumberFormatException ignore) {
// ignore illegal compression file name
}
}
}
LOGGER.debug(
Expand All @@ -165,11 +172,18 @@ void restore() throws IOException {
totalDiskSize = 1;
for (int i = 0; i < ratioFilesBeforeV121.length; i++) {
String[] fileNameArray = ratioFilesBeforeV121[i].getName().split("-");
double currentCompressRatio =
Double.parseDouble(fileNameArray[1]) / Double.parseDouble(fileNameArray[2]);
if (getRatio() < currentCompressRatio) {
totalMemorySize = new AtomicLong((long) currentCompressRatio);
maxRatioIndex = i;
// fileNameArray.length != 3 means the compression ratio may be negative, ignore it
if (fileNameArray.length == 3) {
try {
double currentCompressRatio =
Double.parseDouble(fileNameArray[1]) / Double.parseDouble(fileNameArray[2]);
if (getRatio() < currentCompressRatio) {
totalMemorySize = new AtomicLong((long) currentCompressRatio);
maxRatioIndex = i;
}
} catch (NumberFormatException ignore) {
// ignore illegal compression file name
}
}
}
deleteRedundantFilesByIndex(ratioFilesBeforeV121, maxRatioIndex);
Expand Down Expand Up @@ -201,10 +215,6 @@ void reset() throws IOException {
totalDiskSize = 0L;
}

public static void decreaseDuplicatedMemorySize(long size) {
totalMemorySize.addAndGet(-size);
}

public static CompressionRatio getInstance() {
return CompressionRatioHolder.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

package org.apache.iotdb.db.storageengine.dataregion.memtable;

import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
Expand Down Expand Up @@ -389,17 +387,6 @@ public void encode(IChunkWriter chunkWriter) {
list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) {
long recordSize =
MemUtils.getRecordSize(
tsDataType,
tsDataType == TSDataType.TEXT
? list.getBinaryByValueIndex(
list.getValueIndex(sortedRowIndex), columnIndex)
: null,
true);
CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
}
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -332,12 +330,6 @@ public void encode(IChunkWriter chunkWriter) {

// skip duplicated data
if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) {
long recordSize =
MemUtils.getRecordSize(
tsDataType,
tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex) : null,
true);
CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,40 @@ public void testRestore() throws IOException {
// largest diskSize to the memory
assertEquals(2, compressionRatio.getRatio(), 0.1);
}

@Test
public void testRestoreIllegal1() throws IOException {
Files.createFile(
new File(
directory,
String.format(Locale.ENGLISH, CompressionRatio.RATIO_FILE_PATH_FORMAT, 10, 50))
.toPath());

Files.createFile(
new File(
directory,
String.format(Locale.ENGLISH, CompressionRatio.RATIO_FILE_PATH_FORMAT, -1000, 100))
.toPath());

compressionRatio.restore();

// if multiple files exist in the system due to some exceptions, restore the file with the
// largest diskSize to the memory
assertEquals(0.2, compressionRatio.getRatio(), 0.1);
}

@Test
public void testRestoreIllegal2() throws IOException {

Files.createFile(
new File(
directory,
String.format(Locale.ENGLISH, CompressionRatio.RATIO_FILE_PATH_FORMAT, -1000, 100))
.toPath());

compressionRatio.restore();

// if compression ratio from file is negative, assume the compression ratio is 0 / 0 = NaN
assertEquals(Double.NaN, compressionRatio.getRatio(), 0.1);
}
}

0 comments on commit a1be25e

Please sign in to comment.