Skip to content

Commit

Permalink
Load: Fix some issues during loading tsfiles of old version (#14649)
Browse files Browse the repository at this point in the history
1. ArrayDeviceTimeIndex does not update minStartTime and maxEndTime
2. Reading old version resource file with mods file may cause error
3. TsFile genrated by Load has wrong ModificationFile instance, which refer to the source file of Load.
4. Old mod file cannot be read by LoadTsFileTreeSchemaCache
  • Loading branch information
shuwenwei authored Jan 14, 2025
1 parent b4edfdf commit b51f63c
Show file tree
Hide file tree
Showing 17 changed files with 277 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private void generateTEXT(final Tablet tablet, final int column, final int row)
public void generateDeletion(final String device, final int number)
throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
new ModificationFile(ModificationFile.getExclusiveMods(tsFile))) {
new ModificationFile(ModificationFile.getExclusiveMods(tsFile), false)) {
writer.flush();
final TreeSet<Long> timeSet = device2TimeSet.get(device);
if (timeSet.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.it;

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.SealedTsFileRecoverPerformer;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Objects;

@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBLoadTsFileWithModIT {
private File tmpDir;

@Before
public void setUp() throws Exception {
tmpDir = new File(Files.createTempDirectory("load").toUri());
EnvFactory.getEnv().initClusterEnvironment();
}

@After
public void tearDown() throws Exception {
try {
for (final File file : Objects.requireNonNull(tmpDir.listFiles())) {
Files.delete(file.toPath());
}
Files.delete(tmpDir.toPath());
} finally {
EnvFactory.getEnv().cleanClusterEnvironment();
}
}

private void generateFileWithNewModFile()
throws IOException, WriteProcessException, IllegalPathException, DataRegionException {
TsFileResource resource = generateFile();
// write mods file
resource
.getExclusiveModFile()
.write(new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1"), 1, 2));
resource.getExclusiveModFile().close();
}

private void generateFileWithOldModFile()
throws IOException, DataRegionException, WriteProcessException, IllegalPathException {
TsFileResource resource = generateFile();
ModificationFileV1 oldModFile = ModificationFileV1.getNormalMods(resource);
oldModFile.write(new Deletion(new MeasurementPath("root.test.d1.s1"), Long.MAX_VALUE, 1, 2));
oldModFile.close();
}

private TsFileResource generateFile()
throws WriteProcessException, IOException, DataRegionException {
File tsfile = new File(tmpDir, "1-1-0-0.tsfile");
try (TsFileWriter writer = new TsFileWriter(tsfile)) {
writer.registerAlignedTimeseries(
"root.test.d1",
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
Tablet tablet =
new Tablet(
"root.test.d1",
Collections.singletonList(new MeasurementSchema("s1", TSDataType.BOOLEAN)));
for (int i = 0; i < 5; i++) {
tablet.addTimestamp(i, i);
tablet.addValue(i, 0, true);
}
writer.writeTree(tablet);
}
// generate resource file
TsFileResource resource = new TsFileResource(tsfile);
try (SealedTsFileRecoverPerformer performer = new SealedTsFileRecoverPerformer(resource)) {
performer.recover();
}
resource.setStatusForTest(TsFileResourceStatus.NORMAL);
resource.deserialize();
return resource;
}

@Test
public void testWithNewModFile()
throws SQLException,
IOException,
DataRegionException,
WriteProcessException,
IllegalPathException {
generateFileWithNewModFile();
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(s1) as c from root.test.d1")) {
Assert.assertTrue(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
}
}
}

@Test
public void testWithOldModFile()
throws SQLException,
IOException,
DataRegionException,
WriteProcessException,
IllegalPathException {
generateFileWithOldModFile();
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(String.format("load \'%s\'", tmpDir.getAbsolutePath()));

try (final ResultSet resultSet =
statement.executeQuery("select count(s1) as c from root.test.d1")) {
Assert.assertTrue(resultSet.next());
Assert.assertEquals(3, resultSet.getLong("c"));
Assert.assertTrue(
new File(tmpDir, "1-1-0-0.tsfile" + ModificationFileV1.FILE_SUFFIX).exists());
Assert.assertFalse(
new File(tmpDir, "1-1-0-0.tsfile" + ModificationFile.FILE_SUFFIX).exists());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,17 @@ public void insertTabletWithStringValuesTest() {
while (dataSet.hasNext()) {
RowRecord rowRecord = dataSet.next();
List<Field> fields = rowRecord.getFields();
// this test may occasionally fail by IndexOutOfBoundsException
if (fields.size() != 7) {
SessionDataSet showTimeseriesDataSet =
session.executeQueryStatement("show timeseries root.sg1.d1.*");
LOGGER.error("show timeseries result:");
while (showTimeseriesDataSet.hasNext()) {
RowRecord row = showTimeseriesDataSet.next();
LOGGER.error(row.toString());
}
LOGGER.error("The number of fields is not correct. fields values: " + fields);
}
assertEquals(fields.get(5).getBinaryV(), fields.get(6).getBinaryV());
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
Expand Down Expand Up @@ -320,7 +321,7 @@ public void setCurrentModificationsAndTimeIndex(
TsFileResource resource, TsFileSequenceReader reader) throws IOException {
clearModificationsAndTimeIndex();

currentModifications = resource.getAllModEntries();
currentModifications = ModificationFile.readAllModifications(resource.getTsFile(), false);
for (final ModEntry modification : currentModifications) {
currentModificationsMemoryUsageSizeInBytes += modification.serializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void setCurrentModificationsAndTimeIndex(
TsFileResource resource, TsFileSequenceReader reader) throws IOException {
clearModificationsAndTimeIndex();

currentModifications = resource.getAllModEntries();
currentModifications = ModificationFile.readAllModifications(resource.getTsFile(), true);
for (final ModEntry modification : currentModifications) {
currentModificationsMemoryUsageSizeInBytes += modification.serializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3135,6 +3135,8 @@ private void loadModFile(
final File newTargetModFile = ModificationFile.getExclusiveMods(targetTsFile);
moveModFile(newModFileToLoad, newTargetModFile, deleteOriginFile);
}
// force update mod file metrics
tsFileResource.getExclusiveModFile();
}

@SuppressWarnings("java:S2139")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ protected boolean doCompaction() {
logger.logSourceFile(unseqFileToInsert);
logger.logTargetFile(targetFile);
logger.force();
CompactionUtils.prepareCompactionModFiles(
Collections.singletonList(targetFile), Collections.singletonList(unseqFileToInsert));

prepareTargetFiles();

validateCompactionResult(
Expand Down
Loading

0 comments on commit b51f63c

Please sign in to comment.