From 99221da8ca8239251f4a57c649ee771c3cc32a29 Mon Sep 17 00:00:00 2001 From: Zikun Ma <55695098+DanielWang2035@users.noreply.github.com> Date: Tue, 24 Dec 2024 23:20:52 +0800 Subject: [PATCH] Load: detect region replica set changes due to Region Migration (#14104) --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 5 + .../{ => load}/LoadEmptyFileException.java | 2 +- .../{ => load}/LoadFileException.java | 2 +- .../{ => load}/LoadReadOnlyException.java | 2 +- .../LoadRuntimeOutOfMemoryException.java | 2 +- .../PartitionViolationException.java | 2 +- .../RegionReplicaSetChangedException.java | 36 +++++ .../legacy/loader/DeletionLoader.java | 2 +- .../protocol/legacy/loader/TsFileLoader.java | 2 +- .../pipeconsensus/PipeConsensusReceiver.java | 2 +- .../PipeStatementExceptionVisitor.java | 2 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 2 +- .../load/LoadTsFileTableSchemaCache.java | 2 +- .../load/LoadTsFileToTableModelAnalyzer.java | 2 +- .../load/LoadTsFileToTreeModelAnalyzer.java | 2 +- .../load/LoadTsFileTreeSchemaCache.java | 2 +- .../TreeSchemaAutoCreatorAndVerifier.java | 4 +- .../load/LoadTsFileDispatcherImpl.java | 2 +- .../scheduler/load/LoadTsFileScheduler.java | 134 ++++++++++++------ .../iotdb/db/storageengine/StorageEngine.java | 2 +- .../storageengine/dataregion/DataRegion.java | 2 +- .../dataregion/tsfile/TsFileResource.java | 2 +- .../timeindex/ArrayDeviceTimeIndex.java | 2 +- .../tsfile/timeindex/FileTimeIndex.java | 2 +- .../tsfile/timeindex/ITimeIndex.java | 2 +- .../storageengine/load/LoadTsFileManager.java | 2 +- .../LoadTsFileDataCacheMemoryBlock.java | 2 +- .../load/memory/LoadTsFileMemoryManager.java | 2 +- .../load/splitter/TsFileSplitter.java | 35 +++-- ...tchedCompactionWithTsFileSplitterTest.java | 15 +- 31 files changed, 202 insertions(+), 85 deletions(-) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/{ => load}/LoadEmptyFileException.java (95%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/{ => load}/LoadFileException.java (96%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/{ => load}/LoadReadOnlyException.java (96%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/{ => load}/LoadRuntimeOutOfMemoryException.java (95%) rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/{ => load}/PartitionViolationException.java (96%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/RegionReplicaSetChangedException.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index ec8ba5d41aa9..9b26ca3da239 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1176,6 +1176,8 @@ public class IoTDBConfig { private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min + private int loadTsFileRetryCountOnRegionChange = 10; + private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s private boolean loadActiveListeningEnable = true; @@ -4138,6 +4140,14 @@ public void setLoadCleanupTaskExecutionDelayTimeSeconds( this.loadCleanupTaskExecutionDelayTimeSeconds = loadCleanupTaskExecutionDelayTimeSeconds; } + public int getLoadTsFileRetryCountOnRegionChange() { + return loadTsFileRetryCountOnRegionChange; + } + + public void setLoadTsFileRetryCountOnRegionChange(int loadTsFileRetryCountOnRegionChange) { + this.loadTsFileRetryCountOnRegionChange = loadTsFileRetryCountOnRegionChange; + } + public double getLoadWriteThroughputBytesPerSecond() { return loadWriteThroughputBytesPerSecond; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7a5b947d3b08..a42bce1fdb4b 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2412,6 +2412,11 @@ private void loadLoadTsFileProps(TrimProperties properties) { properties.getProperty( "load_clean_up_task_execution_delay_time_seconds", String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds())))); + conf.setLoadTsFileRetryCountOnRegionChange( + Integer.parseInt( + properties.getProperty( + "load_tsfile_retry_count_on_region_change", + String.valueOf(conf.getLoadTsFileRetryCountOnRegionChange())))); conf.setLoadWriteThroughputBytesPerSecond( Double.parseDouble( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadEmptyFileException.java similarity index 95% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadEmptyFileException.java index 59e8a1d13f0c..ea336c29f307 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadEmptyFileException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.db.exception.load; public class LoadEmptyFileException extends LoadFileException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadFileException.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadFileException.java index 523044aab0ab..0d1c4c538820 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadFileException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.db.exception.load; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadReadOnlyException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadReadOnlyException.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadReadOnlyException.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadReadOnlyException.java index d2da6e70df4a..d10c85909290 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadReadOnlyException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadReadOnlyException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.db.exception.load; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadRuntimeOutOfMemoryException.java similarity index 95% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadRuntimeOutOfMemoryException.java index 050274e99408..fa31054c053d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadRuntimeOutOfMemoryException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.db.exception.load; public class LoadRuntimeOutOfMemoryException extends RuntimeException { public LoadRuntimeOutOfMemoryException(String message) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/PartitionViolationException.java similarity index 96% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/PartitionViolationException.java index 8b4c8dfb8ab0..ffb47633e51e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/PartitionViolationException.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.db.exception.load; public class PartitionViolationException extends LoadFileException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/RegionReplicaSetChangedException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/RegionReplicaSetChangedException.java new file mode 100644 index 000000000000..e362c0b66349 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/RegionReplicaSetChangedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.load; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + +public class RegionReplicaSetChangedException extends LoadFileException { + + public RegionReplicaSetChangedException(TRegionReplicaSet original, TRegionReplicaSet current) { + super( + String.format( + "Region replica set changed from %s to %s during loading TsFile, maybe due to region migration", + original, current)); + } + + public RegionReplicaSetChangedException() { + super("Region replica set changed during loading TsFile, maybe due to region migration"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java index 32f549866f20..46292b097463 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.Coordinator; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java index 0e0612ce2ca9..c78ca0544b4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.Coordinator; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 32ead7587e37..6bf7b7b7f6e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -41,7 +41,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq; import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq; import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java index bdb48a2ccbdf..8059ee639303 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java @@ -22,7 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.exception.sql.StatementAnalyzeException; import org.apache.iotdb.db.queryengine.plan.statement.Statement; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 0d096dfe28eb..9cf39b42325e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -21,8 +21,8 @@ import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.exception.LoadReadOnlyException; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.load.LoadReadOnlyException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index 127b619f06c6..b7070afea49f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -23,8 +23,8 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java index 2975365ec94f..5262a02af952 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; -import org.apache.iotdb.db.exception.LoadEmptyFileException; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java index 76bac3bc26bc..fe91d00f8e1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadEmptyFileException; +import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java index f982a03affd8..ff9ea28224c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTreeSchemaCache.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index c36b7f0aaf36..67379c724efe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -34,9 +34,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadFileException; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.load.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index a0f676781378..5189e9d6f362 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -31,7 +31,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index c112eb344905..16a84777c3b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.scheduler.load; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -37,8 +38,9 @@ import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadFileException; -import org.apache.iotdb.db.exception.LoadReadOnlyException; +import org.apache.iotdb.db.exception.load.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadReadOnlyException; +import org.apache.iotdb.db.exception.load.RegionReplicaSetChangedException; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -89,6 +91,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -98,7 +101,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link @@ -206,7 +208,8 @@ public void start() { long startTime = System.nanoTime(); final boolean isFirstPhaseSuccess; try { - isFirstPhaseSuccess = firstPhase(node); + isFirstPhaseSuccess = + firstPhaseWithRetry(node, CONFIG.getLoadTsFileRetryCountOnRegionChange()); } finally { LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost( LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() - startTime); @@ -262,7 +265,30 @@ public void start() { } } - private boolean firstPhase(LoadSingleTsFileNode node) { + private boolean firstPhaseWithRetry(LoadSingleTsFileNode node, int retryCountOnRegionChange) { + retryCountOnRegionChange = Math.max(0, retryCountOnRegionChange); + while (true) { + try { + return firstPhase(node); + } catch (RegionReplicaSetChangedException e) { + if (retryCountOnRegionChange > 0) { + LOGGER.warn( + "Region replica set changed during loading TsFile {}, maybe due to region migration, will retry for {} times.", + node.getTsFileResource(), + retryCountOnRegionChange); + retryCountOnRegionChange--; + } else { + stateMachine.transitionToFailed(e); + LOGGER.warn( + "Region replica set changed during loading TsFile {} after retry.", + node.getTsFileResource()); + return false; + } + } + } + } + + private boolean firstPhase(LoadSingleTsFileNode node) throws RegionReplicaSetChangedException { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { new TsFileSplitter( @@ -272,6 +298,8 @@ private boolean firstPhase(LoadSingleTsFileNode node) { stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())); return false; } + } catch (RegionReplicaSetChangedException e) { + throw e; } catch (IllegalStateException e) { stateMachine.transitionToFailed(e); LOGGER.warn( @@ -531,7 +559,8 @@ private static class TsFileDataManager { private final LoadSingleTsFileNode singleTsFileNode; private long dataSize; - private final Map replicaSet2Piece; + private final Map> + regionId2ReplicaSetAndNode; private final List nonDirectionalChunkData; private final LoadTsFileDataCacheMemoryBlock block; @@ -542,12 +571,12 @@ public TsFileDataManager( this.scheduler = scheduler; this.singleTsFileNode = singleTsFileNode; this.dataSize = 0; - this.replicaSet2Piece = new HashMap<>(); + this.regionId2ReplicaSetAndNode = new HashMap<>(); this.nonDirectionalChunkData = new ArrayList<>(); this.block = block; } - private boolean addOrSendTsFileData(TsFileData tsFileData) { + private boolean addOrSendTsFileData(TsFileData tsFileData) throws LoadFileException { switch (tsFileData.getType()) { case CHUNK: return addOrSendChunkData((ChunkData) tsFileData); @@ -563,7 +592,7 @@ private boolean isMemoryEnough() { return dataSize <= SINGLE_SCHEDULER_MAX_MEMORY_SIZE && block.hasEnoughMemory(); } - private boolean addOrSendChunkData(ChunkData chunkData) { + private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException { nonDirectionalChunkData.add(chunkData); dataSize += chunkData.getDataSize(); block.addMemoryUsage(chunkData.getDataSize()); @@ -572,30 +601,37 @@ private boolean addOrSendChunkData(ChunkData chunkData) { routeChunkData(); // start to dispatch from the biggest TsFilePieceNode - List sortedReplicaSets = - replicaSet2Piece.keySet().stream() + List sortedRegionIds = + regionId2ReplicaSetAndNode.keySet().stream() .sorted( - Comparator.comparingLong(o -> replicaSet2Piece.get(o).getDataSize()).reversed()) + Comparator.comparingLong( + o -> regionId2ReplicaSetAndNode.get(o).getRight().getDataSize()) + .reversed()) .collect(Collectors.toList()); - for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) { - LoadTsFilePieceNode pieceNode = replicaSet2Piece.get(sortedReplicaSet); + for (TConsensusGroupId sortedRegionId : sortedRegionIds) { + final TRegionReplicaSet replicaSet = + regionId2ReplicaSetAndNode.get(sortedRegionId).getLeft(); + final LoadTsFilePieceNode pieceNode = + regionId2ReplicaSetAndNode.get(sortedRegionId).getRight(); if (pieceNode.getDataSize() == 0) { // total data size has been reduced to 0 break; } - if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) { + if (!scheduler.dispatchOnePieceNode(pieceNode, replicaSet)) { return false; } dataSize -= pieceNode.getDataSize(); block.reduceMemoryUsage(pieceNode.getDataSize()); - replicaSet2Piece.put( - sortedReplicaSet, - new LoadTsFilePieceNode( - singleTsFileNode.getPlanNodeId(), - singleTsFileNode - .getTsFileResource() - .getTsFile())); // can not just remove, because of deletion + regionId2ReplicaSetAndNode.put( + sortedRegionId, + new Pair<>( + replicaSet, + new LoadTsFilePieceNode( + singleTsFileNode.getPlanNodeId(), + singleTsFileNode + .getTsFileResource() + .getTsFile()))); // can not just remove, because of deletion if (isMemoryEnough()) { break; } @@ -605,7 +641,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) { return true; } - private void routeChunkData() { + private void routeChunkData() throws LoadFileException { if (nonDirectionalChunkData.isEmpty()) { return; } @@ -616,37 +652,51 @@ private void routeChunkData() { .map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot())) .collect(Collectors.toList()), scheduler.queryContext.getSession().getUserName()); - IntStream.range(0, nonDirectionalChunkData.size()) - .forEach( - i -> - replicaSet2Piece - .computeIfAbsent( - replicaSets.get(i), - o -> - new LoadTsFilePieceNode( - singleTsFileNode.getPlanNodeId(), - singleTsFileNode.getTsFileResource().getTsFile())) - .addTsFileData(nonDirectionalChunkData.get(i))); + for (int i = 0; i < replicaSets.size(); i++) { + final TRegionReplicaSet replicaSet = replicaSets.get(i); + final TConsensusGroupId regionId = replicaSet.getRegionId(); + if (regionId2ReplicaSetAndNode.containsKey(regionId) + && !Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), replicaSet)) { + // Detected region replica set changed (maybe due to region migration), throw an exception + throw new RegionReplicaSetChangedException( + regionId2ReplicaSetAndNode.get(regionId).getLeft(), replicaSet); + } + + regionId2ReplicaSetAndNode + .computeIfAbsent( + replicaSet.getRegionId(), + o -> + new Pair<>( + replicaSet, + new LoadTsFilePieceNode( + singleTsFileNode.getPlanNodeId(), + singleTsFileNode.getTsFileResource().getTsFile()))) + .getRight() + .addTsFileData(nonDirectionalChunkData.get(i)); + } nonDirectionalChunkData.clear(); } - private boolean addOrSendDeletionData(DeletionData deletionData) { + private boolean addOrSendDeletionData(DeletionData deletionData) throws LoadFileException { routeChunkData(); // ensure chunk data will be added before deletion - for (Map.Entry entry : replicaSet2Piece.entrySet()) { + for (Map.Entry> entry : + regionId2ReplicaSetAndNode.entrySet()) { dataSize += deletionData.getDataSize(); block.addMemoryUsage(deletionData.getDataSize()); - entry.getValue().addTsFileData(deletionData); + entry.getValue().getRight().addTsFileData(deletionData); } return true; } - private boolean sendAllTsFileData() { + private boolean sendAllTsFileData() throws LoadFileException { routeChunkData(); - for (Map.Entry entry : replicaSet2Piece.entrySet()) { - block.reduceMemoryUsage(entry.getValue().getDataSize()); - if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) { + for (Map.Entry> entry : + regionId2ReplicaSetAndNode.entrySet()) { + block.reduceMemoryUsage(entry.getValue().getRight().getDataSize()); + if (!scheduler.dispatchOnePieceNode( + entry.getValue().getRight(), entry.getValue().getLeft())) { LOGGER.warn( "Dispatch piece node {} of TsFile {} error.", entry.getValue(), @@ -658,7 +708,7 @@ private boolean sendAllTsFileData() { } private void clear() { - replicaSet2Piece.clear(); + regionId2ReplicaSetAndNode.clear(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 47120fc9781d..6496aaf256c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -47,10 +47,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DataRegionException; -import org.apache.iotdb.db.exception.LoadReadOnlyException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessRejectException; +import org.apache.iotdb.db.exception.load.LoadReadOnlyException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 92c68cd22f89..ffa958f2637a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -42,10 +42,10 @@ import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.quota.ExceedQuotaException; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index ed681abe8ad9..5f8891574a7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -29,7 +29,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java index 31fd9d385e62..cf1cf1dd1db7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; -import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.IDeviceID; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java index c9820d809f1e..50d2cfc9e0ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; -import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.IDeviceID; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java index a966d3036440..52035293a0cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.IDeviceID; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 5263e2513707..c807ce9f56b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java index ef038fd374f8..c581ae768f4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.storageengine.load.memory; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index b1978d2f6cfc..7a5fc392d622 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -21,7 +21,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.slf4j.Logger; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index 5c86082ce8c9..23ba30fde005 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; @@ -57,13 +58,12 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; public class TsFileSplitter { private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class); private final File tsFile; - private final Function consumer; + private final TsFileDataConsumer consumer; private Map offset2ChunkMetadata = new HashMap<>(); private List deletions = new ArrayList<>(); private Map> pageIndex2ChunkData = new HashMap<>(); @@ -82,13 +82,14 @@ public class TsFileSplitter { private List> pageIndex2TimesList = null; private List isTimeChunkNeedDecodeList = new ArrayList<>(); - public TsFileSplitter(File tsFile, Function consumer) { + public TsFileSplitter(File tsFile, TsFileDataConsumer consumer) { this.tsFile = tsFile; this.consumer = consumer; } @SuppressWarnings({"squid:S3776", "squid:S6541"}) - public void splitTsFileByDataPartition() throws IOException, IllegalStateException { + public void splitTsFileByDataPartition() + throws IOException, LoadFileException, IllegalStateException { try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { getAllModification(deletions); @@ -143,7 +144,7 @@ public void splitTsFileByDataPartition() throws IOException, IllegalStateExcepti } private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte marker) - throws IOException { + throws IOException, LoadFileException { long chunkOffset = reader.position(); timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size(); consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData); @@ -196,7 +197,7 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk( IChunkMetadata chunkMetadata, long chunkOffset, ChunkData chunkData) - throws IOException { + throws IOException, LoadFileException { String measurementId = header.getMeasurementID(); TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot(); Decoder defaultTimeDecoder = @@ -285,7 +286,8 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk( } } - private void processValueChunk(TsFileSequenceReader reader, byte marker) throws IOException { + private void processValueChunk(TsFileSequenceReader reader, byte marker) + throws IOException, LoadFileException { long chunkOffset = reader.position(); IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); ChunkHeader header = reader.readChunkHeader(marker); @@ -352,7 +354,7 @@ private void storeTimeChunkContext() { } private void switchToTimeChunkContextOfCurrentMeasurement( - TsFileSequenceReader reader, String measurement) throws IOException { + TsFileSequenceReader reader, String measurement) throws IOException, LoadFileException { int index = valueColumn2TimeChunkIndex.getOrDefault(measurement, 0); if (index != timeChunkIndexOfCurrentValueColumn) { consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData); @@ -415,12 +417,15 @@ private void getChunkMetadata( } } - private void handleModification(List deletions) { - deletions.forEach(o -> consumer.apply(new DeletionData(o))); + private void handleModification(List deletions) throws LoadFileException { + for (final ModEntry mod : deletions) { + consumer.apply(new DeletionData(mod)); + } } private void consumeAllAlignedChunkData( - long offset, Map> pageIndex2ChunkData) { + long offset, Map> pageIndex2ChunkData) + throws LoadFileException { if (pageIndex2ChunkData.isEmpty()) { return; } @@ -446,7 +451,8 @@ private void consumeAllAlignedChunkData( this.pageIndex2ChunkData = new HashMap<>(); } - private void consumeChunkData(String measurement, long offset, ChunkData chunkData) { + private void consumeChunkData(String measurement, long offset, ChunkData chunkData) + throws LoadFileException { if (Boolean.FALSE.equals(consumer.apply(chunkData))) { throw new IllegalStateException( String.format( @@ -549,4 +555,9 @@ private TsPrimitiveType[] decodeValuePage( return valuePageReader.nextValueBatch( times); // should be origin time, so recording satisfied length is necessary } + + @FunctionalInterface + public interface TsFileDataConsumer { + boolean apply(TsFileData tsFileData) throws LoadFileException; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java index 9daee881371b..edfad9ff2519 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; @@ -87,7 +88,8 @@ public void testCompactionFlushChunk() StorageEngineException, InterruptedException, MetadataException, - PageException { + PageException, + LoadFileException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -121,7 +123,8 @@ public void testCompactionFlushChunkAndSplitByTimePartition() StorageEngineException, InterruptedException, MetadataException, - PageException { + PageException, + LoadFileException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -155,7 +158,8 @@ public void testCompactionFlushPage() StorageEngineException, InterruptedException, MetadataException, - PageException { + PageException, + LoadFileException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -193,7 +197,8 @@ public void testCompactionFlushPageAndSplitByTimePartition() StorageEngineException, InterruptedException, MetadataException, - PageException { + PageException, + LoadFileException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -259,7 +264,7 @@ private TsFileResource performCompaction() } private void consumeChunkDataAndValidate(TsFileResource resource) - throws IOException, IllegalPathException { + throws IOException, IllegalPathException, LoadFileException { Map writerMap = new HashMap<>(); TsFileSplitter splitter =