From dd4d2be1723f83dc362849e88bfb02aff1b35613 Mon Sep 17 00:00:00 2001 From: Itami Sho <42286868+MiniSho@users.noreply.github.com> Date: Tue, 31 Dec 2024 09:22:49 +0800 Subject: [PATCH] Load: Support auto data type conversion when data type mismatch detected during analysis stage (#14529) Co-authored-by: Steve Yurong Su --- .../apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 73 ++++++++- .../org/apache/iotdb/rpc/TSStatusCode.java | 3 + .../db/exception/VerifyMetadataException.java | 12 +- .../VerifyMetadataTypeMismatchException.java | 29 ++++ .../protocol/legacy/loader/TsFileLoader.java | 1 + .../thrift/IoTDBDataNodeReceiver.java | 1 + .../plan/analyze/load/LoadTsFileAnalyzer.java | 76 +++++++--- .../load/LoadTsFileTableSchemaCache.java | 3 +- .../load/LoadTsFileToTreeModelAnalyzer.java | 7 +- .../TreeSchemaAutoCreatorAndVerifier.java | 32 ++-- .../plan/relational/sql/ast/LoadTsFile.java | 16 +- .../statement/crud/LoadTsFileStatement.java | 70 ++++++--- .../load/active/ActiveLoadTsFileLoader.java | 1 + .../load/config/LoadTsFileConfigurator.java | 29 +++- .../LoadConvertedInsertTabletStatement.java | 52 +++++++ ...InsertTabletStatementExceptionVisitor.java | 51 +++++++ ...dInsertTabletStatementTSStatusVisitor.java | 65 ++++++++ ...tementDataTypeConvertExecutionVisitor.java | 143 ++++++++++++++++++ ...tementDataTypeConvertExecutionVisitor.java | 130 ++++++++++++++++ .../LoadTsFileDataTypeConverter.java | 107 +++++++++++++ 20 files changed, 832 insertions(+), 69 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 0b9d310c7677..cf4d22c6aca4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -31,6 +31,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; @@ -48,6 +49,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -56,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail; import static org.apache.iotdb.db.it.utils.TestUtils.createUser; @@ -328,7 +331,7 @@ public void testLoadWithExtendTemplate() throws Exception { } @Test - public void testLoadWithAutoRegister() throws Exception { + public void testLoadWithAutoCreate() throws Exception { final long writtenPoint1; // device 0, device 1, sg 0 try (final TsFileGenerator generator = @@ -898,6 +901,74 @@ public void testLoadLocally() throws Exception { } } + @Test + public void testLoadWithConvertOnTypeMismatch() throws Exception { + + List> measurementSchemas = + generateMeasurementSchemasForDataTypeConvertion(); + + final File file = new File(tmpDir, "1-0-0-0.tsfile"); + + long writtenPoint = 0; + List schemaList1 = + measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + List schemaList2 = + measurementSchemas.stream().map(pair -> pair.right).collect(Collectors.toList()); + + try (final TsFileGenerator generator = new TsFileGenerator(file)) { + generator.registerTimeseries(SchemaConfig.DEVICE_0, schemaList2); + + generator.generateData(SchemaConfig.DEVICE_0, 100, PARTITION_INTERVAL / 10_000, false); + + writtenPoint = generator.getTotalNumber(); + } + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + for (MeasurementSchema schema : schemaList1) { + statement.execute(convert2SQL(SchemaConfig.DEVICE_0, schema)); + } + + statement.execute(String.format("load \"%s\" ", file.getAbsolutePath())); + + try (final ResultSet resultSet = + statement.executeQuery("select count(*) from root.** group by level=1,2")) { + if (resultSet.next()) { + final long sgCount = resultSet.getLong("count(root.sg.test_0.*.*)"); + Assert.assertEquals(writtenPoint, sgCount); + } else { + Assert.fail("This ResultSet is empty."); + } + } + } + } + + private List> + generateMeasurementSchemasForDataTypeConvertion() { + TSDataType[] dataTypes = { + TSDataType.STRING, + TSDataType.TEXT, + TSDataType.BLOB, + TSDataType.TIMESTAMP, + TSDataType.BOOLEAN, + TSDataType.DATE, + TSDataType.DOUBLE, + TSDataType.FLOAT, + TSDataType.INT32, + TSDataType.INT64 + }; + List> pairs = new ArrayList<>(); + + for (TSDataType type : dataTypes) { + for (TSDataType dataType : dataTypes) { + String id = String.format("%s2%s", type.name(), dataType.name()); + pairs.add(new Pair<>(new MeasurementSchema(id, type), new MeasurementSchema(id, dataType))); + } + } + return pairs; + } + private static class SchemaConfig { private static final String STORAGE_GROUP_0 = "root.sg.test_0"; private static final String STORAGE_GROUP_1 = "root.sg.test_1"; diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 84145d01d795..b1863c3d491e 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -197,6 +197,9 @@ public enum TSStatusCode { PIPE_ERROR(1107), PIPESERVER_ERROR(1108), VERIFY_METADATA_ERROR(1109), + LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION(1110), + LOAD_IDEMPOTENT_CONFLICT_EXCEPTION(1111), + LOAD_USER_CONFLICT_EXCEPTION(1112), // UDF UDF_LOAD_CLASS_ERROR(1200), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java index 1603c9872f4c..78c6d8d4d5cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java @@ -23,16 +23,12 @@ import org.apache.iotdb.rpc.TSStatusCode; public class VerifyMetadataException extends IoTDBException { - public VerifyMetadataException( - String path, String compareInfo, String tsFileInfo, String tsFilePath, String IoTDBInfo) { - super( - String.format( - "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.", - path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo), - TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode()); - } public VerifyMetadataException(String message) { super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode()); } + + public VerifyMetadataException(String message, int errorCode) { + super(message, errorCode); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java new file mode 100644 index 000000000000..209fb83bcdbe --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +import org.apache.iotdb.rpc.TSStatusCode; + +public class VerifyMetadataTypeMismatchException extends VerifyMetadataException { + + public VerifyMetadataTypeMismatchException(String message) { + super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java index c78ca0544b4c..e181ec1d5926 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java @@ -56,6 +56,7 @@ public void load() { try { LoadTsFileStatement statement = new LoadTsFileStatement(tsFile.getAbsolutePath()); statement.setDeleteAfterLoad(true); + statement.setConvertOnTypeMismatch(true); statement.setDatabaseLevel(parseSgLevel()); statement.setVerifySchema(true); statement.setAutoCreateDatabase(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 8785231480fa..a87e229a03a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -569,6 +569,7 @@ private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbso throws FileNotFoundException { final LoadTsFileStatement statement = new LoadTsFileStatement(fileAbsolutePath); statement.setDeleteAfterLoad(true); + statement.setConvertOnTypeMismatch(true); statement.setVerifySchema(true); statement.setAutoCreateDatabase(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 9cf39b42325e..71e985be1138 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadReadOnlyException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -33,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -54,8 +57,7 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileAnalyzer.class); // These are only used when constructed from tree model SQL - private final LoadTsFileStatement loadTsFileStatement; - + private final LoadTsFileStatement loadTsFileTreeStatement; // These are only used when constructed from table model SQL private final LoadTsFile loadTsFileTableStatement; @@ -67,6 +69,8 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { protected final boolean isDeleteAfterLoad; + protected final boolean isConvertOnTypeMismatch; + protected final boolean isAutoCreateDatabase; protected final int databaseLevel; @@ -78,15 +82,19 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance(); final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance(); + protected final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter; + LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { - this.loadTsFileStatement = loadTsFileStatement; + this.loadTsFileTreeStatement = loadTsFileStatement; this.tsFiles = loadTsFileStatement.getTsFiles(); this.statementString = loadTsFileStatement.toString(); this.isVerifySchema = loadTsFileStatement.isVerifySchema(); this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad(); + this.isConvertOnTypeMismatch = loadTsFileStatement.isConvertOnTypeMismatch(); this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase(); this.databaseLevel = loadTsFileStatement.getDatabaseLevel(); this.database = loadTsFileStatement.getDatabase(); + this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter(); this.loadTsFileTableStatement = null; this.isTableModelStatement = false; @@ -99,11 +107,13 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { this.statementString = loadTsFileTableStatement.toString(); this.isVerifySchema = true; this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad(); + this.isConvertOnTypeMismatch = loadTsFileTableStatement.isConvertOnTypeMismatch(); this.isAutoCreateDatabase = loadTsFileTableStatement.isAutoCreateDatabase(); this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel(); this.database = loadTsFileTableStatement.getDatabase(); + this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter(); - this.loadTsFileStatement = null; + this.loadTsFileTreeStatement = null; this.isTableModelStatement = true; this.context = context; } @@ -137,6 +147,11 @@ protected boolean doAnalyzeFileByFile(IAnalysis analysis) { } catch (AuthException e) { setFailAnalysisForAuthException(analysis, e); return false; + } catch (VerifyMetadataTypeMismatchException e) { + executeDataTypeConversionOnTypeMismatch(analysis, e); + // just return false to STOP the analysis process, + // the real result on the conversion will be set in the analysis. + return false; } catch (BufferUnderflowException e) { LOGGER.warn( "The file {} is not a valid tsfile. Please check the input file.", tsFile.getPath(), e); @@ -161,6 +176,39 @@ protected boolean doAnalyzeFileByFile(IAnalysis analysis) { protected abstract void analyzeSingleTsFile(final File tsFile) throws IOException, AuthException, VerifyMetadataException; + protected void executeDataTypeConversionOnTypeMismatch( + final IAnalysis analysis, final VerifyMetadataTypeMismatchException e) { + final TSStatus status = + isConvertOnTypeMismatch + ? (isTableModelStatement + ? loadTsFileDataTypeConverter.convertForTableModel(loadTsFileTableStatement) + : loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileTreeStatement)) + : null; + + if (status == null) { + analysis.setFailStatus( + new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage())); + } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + && status.getCode() != TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) { + analysis.setFailStatus(status); + } + analysis.setFinishQueryAfterAnalyze(true); + setRealStatement(analysis); + } + + protected void setRealStatement(IAnalysis analysis) { + if (isTableModelStatement) { + // Do nothing by now. + } else { + analysis.setRealStatement(loadTsFileTreeStatement); + } + } + + protected String getStatementString() { + return statementString; + } + protected TsFileResource constructTsFileResource( final TsFileSequenceReader reader, final File tsFile) throws IOException { final TsFileResource tsFileResource = new TsFileResource(tsFile); @@ -174,23 +222,11 @@ protected TsFileResource constructTsFileResource( return tsFileResource; } - protected String getStatementString() { - return statementString; - } - - protected void setRealStatement(IAnalysis analysis) { - if (isTableModelStatement) { - // Do nothing by now. - } else { - analysis.setRealStatement(loadTsFileStatement); - } - } - protected void addTsFileResource(TsFileResource tsFileResource) { if (isTableModelStatement) { loadTsFileTableStatement.addTsFileResource(tsFileResource); } else { - loadTsFileStatement.addTsFileResource(tsFileResource); + loadTsFileTreeStatement.addTsFileResource(tsFileResource); } } @@ -198,7 +234,7 @@ protected void addWritePointCount(long writePointCount) { if (isTableModelStatement) { loadTsFileTableStatement.addWritePointCount(writePointCount); } else { - loadTsFileStatement.addWritePointCount(writePointCount); + loadTsFileTreeStatement.addWritePointCount(writePointCount); } } @@ -206,6 +242,10 @@ protected boolean isVerifySchema() { return isVerifySchema; } + protected boolean isConvertOnTypeMismatch() { + return isConvertOnTypeMismatch; + } + protected boolean isAutoCreateDatabase() { return isAutoCreateDatabase; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index dac5f81e66bc..a3edfabadb8a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -290,7 +291,7 @@ private void verifyTableDataTypeAndGenerateIdColumnMapper( final ColumnSchema realColumn = realSchema.getColumn(fileColumn.getName(), fileColumn.getColumnCategory()); if (!fileColumn.getType().equals(realColumn.getType())) { - throw new VerifyMetadataException( + throw new VerifyMetadataTypeMismatchException( String.format( "Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s", realColumn.getName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java index fe91d00f8e1a..a34483af9d83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -85,6 +86,9 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { } catch (AuthException e) { setFailAnalysisForAuthException(analysis, e); return analysis; + } catch (VerifyMetadataTypeMismatchException e) { + executeDataTypeConversionOnTypeMismatch(analysis, e); + return analysis; } catch (Exception e) { final String exceptionMessage = String.format( @@ -105,7 +109,8 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { } @Override - protected void analyzeSingleTsFile(final File tsFile) throws IOException, AuthException { + protected void analyzeSingleTsFile(final File tsFile) + throws IOException, AuthException, VerifyMetadataTypeMismatchException { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { // can be reused when constructing tsfile resource final TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataIterator = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index 67379c724efe..ac389b2a6dbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.VerifyMetadataException; +import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -80,6 +81,7 @@ import java.util.stream.Collectors; public class TreeSchemaAutoCreatorAndVerifier { + private static final Logger LOGGER = LoggerFactory.getLogger(TreeSchemaAutoCreatorAndVerifier.class); @@ -103,7 +105,7 @@ public void setCurrentModificationsAndTimeIndex( public void autoCreateAndVerify( TsFileSequenceReader reader, Map> device2TimeseriesMetadataList) - throws IOException, AuthException { + throws IOException, AuthException, VerifyMetadataTypeMismatchException { for (final Map.Entry> entry : device2TimeseriesMetadataList.entrySet()) { final IDeviceID device = entry.getKey(); @@ -204,13 +206,14 @@ public void flushAndClearDeviceIsAlignedCacheIfNecessary() throws SemanticExcept schemaCache.clearDeviceIsAlignedCacheIfNecessary(); } - public void flush() throws AuthException { + public void flush() throws AuthException, VerifyMetadataTypeMismatchException { doAutoCreateAndVerify(); schemaCache.clearTimeSeries(); } - private void doAutoCreateAndVerify() throws SemanticException, AuthException { + private void doAutoCreateAndVerify() + throws SemanticException, AuthException, VerifyMetadataTypeMismatchException { if (schemaCache.getDevice2TimeSeries().isEmpty()) { return; } @@ -233,15 +236,26 @@ private void doAutoCreateAndVerify() throws SemanticException, AuthException { } } catch (AuthException e) { throw e; + } catch (VerifyMetadataTypeMismatchException e) { + if (loadTsFileAnalyzer.isConvertOnTypeMismatch()) { + // throw exception to convert data type in the upper layer (LoadTsFileAnalyzer) + throw e; + } else { + handleException(e, loadTsFileAnalyzer.getStatementString()); + } } catch (Exception e) { - LOGGER.warn("Auto create or verify schema error.", e); - throw new SemanticException( - String.format( - "Auto create or verify schema error when executing statement %s. Detail: %s.", - loadTsFileAnalyzer.getStatementString(), e.getMessage())); + handleException(e, loadTsFileAnalyzer.getStatementString()); } } + private void handleException(Exception e, String statementString) throws SemanticException { + LOGGER.warn("Auto create or verify schema error.", e); + throw new SemanticException( + String.format( + "Auto create or verify schema error when executing statement %s. Detail: %s.", + statementString, e.getMessage())); + } + private void makeSureNoDuplicatedMeasurementsInDevices() throws VerifyMetadataException { for (final Map.Entry> entry : schemaCache.getDevice2TimeSeries().entrySet()) { @@ -445,7 +459,7 @@ private void verifySchema(ISchemaTree schemaTree) // check datatype if (!tsFileSchema.getType().equals(iotdbSchema.getType())) { - throw new VerifyMetadataException( + throw new VerifyMetadataTypeMismatchException( String.format( "Measurement %s%s%s datatype not match, TsFile: %s, IoTDB: %s", device, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 71ca9359e6f6..e61d93538522 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -41,8 +41,9 @@ public class LoadTsFile extends Statement { private final File file; private int databaseLevel; // For loading to tree-model only private String database; // For loading to table-model only - private boolean deleteAfterLoad; - private boolean autoCreateDatabase; + private boolean deleteAfterLoad = false; + private boolean convertOnTypeMismatch = true; + private boolean autoCreateDatabase = true; private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE; private final Map loadAttributes; @@ -58,6 +59,7 @@ public LoadTsFile(NodeLocation location, String filePath, Map lo this.file = new File(filePath); this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.deleteAfterLoad = false; + this.convertOnTypeMismatch = true; this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); this.resources = new ArrayList<>(); this.writePointCountList = new ArrayList<>(); @@ -85,12 +87,16 @@ public void setAutoCreateDatabase(boolean autoCreateDatabase) { this.autoCreateDatabase = autoCreateDatabase; } + public boolean isAutoCreateDatabase() { + return autoCreateDatabase; + } + public boolean isDeleteAfterLoad() { return deleteAfterLoad; } - public boolean isAutoCreateDatabase() { - return autoCreateDatabase; + public boolean isConvertOnTypeMismatch() { + return convertOnTypeMismatch; } public int getDatabaseLevel() { @@ -133,6 +139,8 @@ private void initAttributes() { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes); this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes); + this.convertOnTypeMismatch = + LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes); this.model = LoadTsFileConfigurator.parseOrGetDefaultModel( loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 9e1484f638f2..520700d38483 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -43,19 +43,23 @@ import java.util.List; import java.util.Map; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.MODEL_KEY; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE; public class LoadTsFileStatement extends Statement { private final File file; private int databaseLevel; // For loading to tree-model only private String database; // For loading to table-model only - private boolean verifySchema; - private boolean deleteAfterLoad; - private boolean autoCreateDatabase; + private boolean verifySchema = true; + private boolean deleteAfterLoad = false; + private boolean convertOnTypeMismatch = true; + private boolean autoCreateDatabase = true; private String model = LoadTsFileConfigurator.MODEL_TREE_VALUE; private Map loadAttributes; @@ -69,6 +73,7 @@ public LoadTsFileStatement(String filePath) throws FileNotFoundException { this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.verifySchema = true; this.deleteAfterLoad = false; + this.convertOnTypeMismatch = true; this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); this.resources = new ArrayList<>(); this.writePointCountList = new ArrayList<>(); @@ -99,6 +104,7 @@ protected LoadTsFileStatement() { this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.verifySchema = true; this.deleteAfterLoad = false; + this.convertOnTypeMismatch = true; this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); this.tsFiles = new ArrayList<>(); this.resources = new ArrayList<>(); @@ -136,48 +142,56 @@ private static void sortTsFiles(List files) { }); } - public void setDeleteAfterLoad(boolean deleteAfterLoad) { - this.deleteAfterLoad = deleteAfterLoad; - } - public void setDatabaseLevel(int databaseLevel) { this.databaseLevel = databaseLevel; } + public int getDatabaseLevel() { + return databaseLevel; + } + public void setDatabase(String database) { this.database = database; } - public void setModel(String model) { - this.model = model; + public String getDatabase() { + return database; } public void setVerifySchema(boolean verifySchema) { this.verifySchema = verifySchema; } - public void setAutoCreateDatabase(boolean autoCreateDatabase) { - this.autoCreateDatabase = autoCreateDatabase; - } - public boolean isVerifySchema() { return verifySchema; } + public void setDeleteAfterLoad(boolean deleteAfterLoad) { + this.deleteAfterLoad = deleteAfterLoad; + } + public boolean isDeleteAfterLoad() { return deleteAfterLoad; } - public boolean isAutoCreateDatabase() { - return autoCreateDatabase; + public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) { + this.convertOnTypeMismatch = convertOnTypeMismatch; } - public int getDatabaseLevel() { - return databaseLevel; + public boolean isConvertOnTypeMismatch() { + return convertOnTypeMismatch; } - public String getDatabase() { - return database; + public void setAutoCreateDatabase(boolean autoCreateDatabase) { + this.autoCreateDatabase = autoCreateDatabase; + } + + public boolean isAutoCreateDatabase() { + return autoCreateDatabase; + } + + public void setModel(String model) { + this.model = model; } public String getModel() { @@ -213,6 +227,8 @@ private void initAttributes() { this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes); this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes); this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes); + this.convertOnTypeMismatch = + LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes); this.model = LoadTsFileConfigurator.parseOrGetDefaultModel( loadAttributes, LoadTsFileConfigurator.MODEL_TREE_VALUE); @@ -234,14 +250,18 @@ public TSStatus checkPermissionBeforeProcess(String userName) { public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelationalStatement( MPPQueryContext context) { loadAttributes = new HashMap<>(); + loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel)); if (database != null) { loadAttributes.put(DATABASE_NAME_KEY, database); } - loadAttributes.put(ON_SUCCESS_KEY, String.valueOf(deleteAfterLoad)); + loadAttributes.put( + ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE : ON_SUCCESS_NONE_VALUE); + loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(convertOnTypeMismatch)); if (model != null) { loadAttributes.put(MODEL_KEY, model); } + return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes); } @@ -255,13 +275,15 @@ public String toString() { return "LoadTsFileStatement{" + "file=" + file - + ", deleteAfterLoad=" + + ", delete-after-load=" + deleteAfterLoad - + ", databaseLevel=" + + ", database-level=" + databaseLevel - + ", verifySchema=" + + ", verify-schema=" + verifySchema - + ", tsFiles Size=" + + ", convert-on-type-mismatch=" + + convertOnTypeMismatch + + ", tsFiles size=" + tsFiles.size() + '}'; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 605218ae0a6e..188ae30436d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -192,6 +192,7 @@ private Optional> tryGetNextPendingFile() { private TSStatus loadTsFile(final Pair filePair) throws FileNotFoundException { final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft()); statement.setDeleteAfterLoad(true); + statement.setConvertOnTypeMismatch(true); statement.setVerifySchema(true); statement.setAutoCreateDatabase(false); return executeStatement(filePair.getRight() ? new PipeEnrichedStatement(statement) : statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 929d106f2443..70d7401c560d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -49,6 +49,9 @@ public static void validateParameters(final String key, final String value) { break; case DATABASE_NAME_KEY: break; + case CONVERT_ON_TYPE_MISMATCH_KEY: + validateConvertOnTypeMismatchParam(value); + break; default: throw new SemanticException("Invalid parameter '" + key + "' for LOAD TSFILE command."); } @@ -90,8 +93,8 @@ public static int parseOrGetDefaultDatabaseLevel(final Map loadA } public static final String ON_SUCCESS_KEY = "on-success"; - private static final String ON_SUCCESS_DELETE_VALUE = "delete"; - private static final String ON_SUCCESS_NONE_VALUE = "none"; + public static final String ON_SUCCESS_DELETE_VALUE = "delete"; + public static final String ON_SUCCESS_NONE_VALUE = "none"; private static final Set ON_SUCCESS_VALUE_SET = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE, ON_SUCCESS_NONE_VALUE))); @@ -107,7 +110,27 @@ public static void validateOnSuccessParam(final String onSuccess) { public static boolean parseOrGetDefaultOnSuccess(final Map loadAttributes) { final String value = loadAttributes.get(ON_SUCCESS_KEY); - return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value); + return !StringUtils.isEmpty(value) && ON_SUCCESS_DELETE_VALUE.equalsIgnoreCase(value); + } + + public static final String CONVERT_ON_TYPE_MISMATCH_KEY = "convert-on-type-mismatch"; + private static final boolean CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = true; + + public static void validateConvertOnTypeMismatchParam(final String convertOnTypeMismatch) { + if (!"true".equalsIgnoreCase(convertOnTypeMismatch) + && !"false".equalsIgnoreCase(convertOnTypeMismatch)) { + throw new SemanticException( + String.format( + "Given %s value '%s' is not supported, please input a valid boolean value.", + CONVERT_ON_TYPE_MISMATCH_KEY, convertOnTypeMismatch)); + } + } + + public static boolean parseOrGetDefaultConvertOnTypeMismatch( + final Map loadAttributes) { + return Boolean.parseBoolean( + loadAttributes.getOrDefault( + CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE))); } public static final String MODEL_KEY = "model"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java new file mode 100644 index 000000000000..1a8144424028 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter; +import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoadConvertedInsertTabletStatement extends PipeConvertedInsertTabletStatement { + + private static final Logger LOGGER = + LoggerFactory.getLogger(LoadConvertedInsertTabletStatement.class); + + public LoadConvertedInsertTabletStatement(final InsertTabletStatement insertTabletStatement) { + super(insertTabletStatement); + } + + @Override + protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + LOGGER.info( + "Load: Inserting tablet to {}.{}. Casting type from {} to {}.", + devicePath, + measurements[columnIndex], + dataTypes[columnIndex], + dataType); + columns[columnIndex] = + ArrayConverter.convert(dataTypes[columnIndex], dataType, columns[columnIndex]); + dataTypes[columnIndex] = dataType; + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java new file mode 100644 index 000000000000..f292ee55930a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +public class LoadConvertedInsertTabletStatementExceptionVisitor + extends StatementVisitor { + + @Override + public TSStatus visitNode(final StatementNode node, final Exception context) { + return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()) + .setMessage(context.getMessage()); + } + + @Override + public TSStatus visitLoadFile( + final LoadTsFileStatement loadTsFileStatement, final Exception context) { + if (context instanceof LoadRuntimeOutOfMemoryException) { + return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } else if (context instanceof SemanticException) { + return new TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } + return visitStatement(loadTsFileStatement, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java new file mode 100644 index 000000000000..6e9601f8d95c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +public class LoadConvertedInsertTabletStatementTSStatusVisitor + extends StatementVisitor { + + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + @Override + public TSStatus visitNode(final StatementNode node, final TSStatus context) { + return context; + } + + @Override + public TSStatus visitInsertTablet( + final InsertTabletStatement insertTabletStatement, final TSStatus context) { + return visitInsertBase(insertTabletStatement, context); + } + + private TSStatus visitInsertBase( + final InsertBaseStatement insertBaseStatement, final TSStatus context) { + if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + return new TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { + return new TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + && config.isEnablePartialInsert())) { + return new TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } + return visitStatement(insertBaseStatement, context); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java new file mode 100644 index 000000000000..233f60124ac2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Objects; +import java.util.Optional; + +public class LoadTableStatementDataTypeConvertExecutionVisitor + extends AstVisitor, String> { + + private static final Logger LOGGER = + LoggerFactory.getLogger(LoadTableStatementDataTypeConvertExecutionVisitor.class); + + @FunctionalInterface + public interface StatementExecutor { + // databaseName can NOT be null + TSStatus execute(final Statement statement, final String databaseName); + } + + private final StatementExecutor statementExecutor; + + public LoadTableStatementDataTypeConvertExecutionVisitor(StatementExecutor statementExecutor) { + this.statementExecutor = statementExecutor; + } + + @Override + public Optional visitLoadTsFile( + final LoadTsFile loadTsFileStatement, final String databaseName) { + if (Objects.isNull(databaseName)) { + LOGGER.warn( + "Database name is unexpectedly null for LoadTsFileStatement: {}. Skip data type conversion.", + loadTsFileStatement); + return Optional.empty(); + } + + LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", loadTsFileStatement); + + for (final File file : loadTsFileStatement.getTsFiles()) { + try (final TsFileInsertionEventTableParser parser = + new TsFileInsertionEventTableParser( + file, + new TablePattern(true, null, null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null)) { + for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) { + if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + continue; + } + final PipeRawTabletInsertionEvent rawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) tabletInsertionEvent; + + final LoadConvertedInsertTabletStatement statement = + new LoadConvertedInsertTabletStatement( + PipeTransferTabletRawReq.toTPipeTransferRawReq( + rawTabletInsertionEvent.convertToTablet(), + rawTabletInsertionEvent.isAligned()) + .constructStatement()); + + TSStatus result; + try { + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement, databaseName)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement, databaseName)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + } + + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() + == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + return Optional.empty(); + } + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + if (loadTsFileStatement.isDeleteAfterLoad()) { + loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + } + + LOGGER.info( + "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); + + return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java new file mode 100644 index 000000000000..5793b96502a1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.commons.io.FileUtils; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Optional; + +public class LoadTreeStatementDataTypeConvertExecutionVisitor + extends StatementVisitor, Void> { + + private final StatementExecutor statementExecutor; + + private static final Logger LOGGER = + LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class); + + @FunctionalInterface + public interface StatementExecutor { + TSStatus execute(final Statement statement); + } + + public LoadTreeStatementDataTypeConvertExecutionVisitor( + final StatementExecutor statementExecutor) { + this.statementExecutor = statementExecutor; + } + + @Override + public Optional visitNode(final StatementNode statementNode, final Void v) { + return Optional.empty(); + } + + @Override + public Optional visitLoadFile( + final LoadTsFileStatement loadTsFileStatement, final Void v) { + + LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement); + + for (final File file : loadTsFileStatement.getTsFiles()) { + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + final LoadConvertedInsertTabletStatement statement = + new LoadConvertedInsertTabletStatement( + PipeTransferTabletRawReq.toTPipeTransferRawReq( + tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()) + .constructStatement()); + + TSStatus result; + try { + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement)); + + // Retry max 5 times if the write process is rejected + for (int i = 0; + i < 5 + && result.getCode() + == TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode(); + i++) { + Thread.sleep(100L * (i + 1)); + result = + statement.accept( + LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR, + statementExecutor.execute(statement)); + } + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e); + } + + if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() + || result.getCode() + == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) { + return Optional.empty(); + } + } + } catch (final Exception e) { + LOGGER.warn( + "Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e); + return Optional.empty(); + } + } + + if (loadTsFileStatement.isDeleteAfterLoad()) { + loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + } + + LOGGER.info( + "Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement); + + return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java new file mode 100644 index 000000000000..7ad7a5af5447 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoadTsFileDataTypeConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class); + + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + + private final SqlParser relationalSqlParser = new SqlParser(); + private final LoadTableStatementDataTypeConvertExecutionVisitor + tableStatementDataTypeConvertExecutionVisitor = + new LoadTableStatementDataTypeConvertExecutionVisitor( + ((statement, databaseName) -> + Coordinator.getInstance() + .executeForTableModel( + statement, + relationalSqlParser, + SESSION_MANAGER.getCurrSession(), + SESSION_MANAGER.requestQueryId(), + SESSION_MANAGER.getSessionInfoOfPipeReceiver( + SESSION_MANAGER.getCurrSession(), databaseName), + "", + LocalExecutionPlanner.getInstance().metadata, + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + .status)); + private final LoadTreeStatementDataTypeConvertExecutionVisitor + treeStatementDataTypeConvertExecutionVisitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> + Coordinator.getInstance() + .executeForTreeModel( + statement, + SESSION_MANAGER.requestQueryId(), + SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), + "", + ClusterPartitionFetcher.getInstance(), + ClusterSchemaFetcher.getInstance(), + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false) + .status); + + public static final LoadConvertedInsertTabletStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = + new LoadConvertedInsertTabletStatementTSStatusVisitor(); + public static final LoadConvertedInsertTabletStatementExceptionVisitor + STATEMENT_EXCEPTION_VISITOR = new LoadConvertedInsertTabletStatementExceptionVisitor(); + + public TSStatus convertForTableModel(LoadTsFile loadTsFileTableStatement) { + try { + return loadTsFileTableStatement + .accept( + tableStatementDataTypeConvertExecutionVisitor, loadTsFileTableStatement.getDatabase()) + .orElse(null); + } catch (Exception e) { + LOGGER.warn( + "Failed to convert data types for table model statement {}.", + loadTsFileTableStatement, + e); + return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()); + } + } + + public TSStatus convertForTreeModel(LoadTsFileStatement loadTsFileTreeStatement) { + try { + return loadTsFileTreeStatement + .accept(treeStatementDataTypeConvertExecutionVisitor, null) + .orElse(null); + } catch (Exception e) { + LOGGER.warn( + "Failed to convert data types for tree model statement {}.", loadTsFileTreeStatement, e); + return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()); + } + } +}