diff --git a/docs/en/docs/data-operate/update-delete/sequence-column-manual.md b/docs/en/docs/data-operate/update-delete/sequence-column-manual.md index 2abd369b2f9cc5a..33c3e0d202b7697 100644 --- a/docs/en/docs/data-operate/update-delete/sequence-column-manual.md +++ b/docs/en/docs/data-operate/update-delete/sequence-column-manual.md @@ -249,7 +249,7 @@ MySQL [test]> select * from test_table; At this point, you can replace the original data in the table. To sum up, the sequence column will be compared among all the batches, the largest value of the same key will be imported into Doris table. ## Note -1. To prevent misuse, in load tasks like StreamLoad/BrokerLoad, user must specify the sequence column; otherwise, user will receive the following error message: +1. To prevent misuse, in load tasks like StreamLoad/BrokerLoad and row updates with insert statements, user must explicitly specify the sequence column (unless the sequence column's default value is CURRENT_TIMESTAMP); otherwise, user will receive the following error message: ``` Table test_tbl has a sequence column, need to specify the sequence column ``` diff --git a/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md b/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md index 02a42c359435142..2e3e6dad3c67466 100644 --- a/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md +++ b/docs/zh-CN/docs/data-operate/update-delete/sequence-column-manual.md @@ -269,7 +269,7 @@ MySQL [test]> select * from test_table; 此时就可以替换表中原有的数据。综上,在导入过程中,会比较所有批次的sequence列值,选择值最大的记录导入Doris表中。 ## 注意 -1. 为防止误用,在StreamLoad/BrokerLoad等导入任务中,必须要指定sequence列,不然会收到以下报错信息: +1. 为防止误用,在StreamLoad/BrokerLoad等导入任务以及具行更新insert语句中,用户必须显示指定sequence列(除非sequence列的默认值为CURRENT_TIMESTAMP),不然会收到以下报错信息: ``` Table test_tbl has sequence column, need to specify the sequence column ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 7ede5e5fdc70d0f..0c165b9fb1a2692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.alter.SchemaChangeHandler; +import org.apache.doris.analysis.ColumnDef.DefaultValue; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -447,10 +448,16 @@ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null && targetColumnNames != null) { Optional foundCol = targetColumnNames.stream() .filter(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny(); + Column seqCol = olapTable.getFullSchema().stream() + .filter(col -> col.getName().equals(olapTable.getSequenceMapCol())) + .findFirst().get(); if (!foundCol.isPresent() && !isPartialUpdate && !isFromDeleteOrUpdateStmt && !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) { - throw new AnalysisException("Table " + olapTable.getName() - + " has sequence column, need to specify the sequence column"); + if (seqCol.getDefaultValue() == null + || !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) { + throw new AnalysisException("Table " + olapTable.getName() + + " has sequence column, need to specify the sequence column"); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 21860ae3369ca68..5e4097bbfec4722 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -376,9 +376,8 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink partialUpdateCols = new HashSet<>(); boolean isPartialUpdate = olapTableSink.isPartialUpdate(); - OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable(); - if (isPartialUpdate) { + OlapTable olapTable = olapTableSink.getTargetTable(); if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("Partial update is only allowed in" + "unique table with merge-on-write enabled."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 1b3221cba3a2206..6aee5ddb9bd8c57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.rules.analysis; +import org.apache.doris.analysis.ColumnDef.DefaultValue; import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -43,7 +44,6 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; @@ -117,8 +117,8 @@ public List buildRules() { Optional foundCol = sink.getColNames().stream() .filter(col -> col.equals(table.getSequenceMapCol())) .findFirst(); - if (!foundCol.isPresent() && seqCol.getDefaultValue() == null - || !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) { + if (!foundCol.isPresent() && (seqCol.getDefaultValue() == null + || !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP))) { throw new AnalysisException("Table " + table.getName() + " has sequence column, need to specify the sequence column"); } @@ -165,7 +165,7 @@ public List buildRules() { Optional seqCol = table.getFullSchema().stream() .filter(col -> col.getName().equals(table.getSequenceMapCol())) .findFirst(); - if (!seqCol.isPresent() && !sink.isPartialUpdate()) { + if (!seqCol.isPresent()) { throw new AnalysisException("sequence column is not contained in" + " target table " + table.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 52bb119d7a73762..872739313ab2ee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ColumnDef.DefaultValue; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.SlotRef; @@ -188,7 +189,8 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a // add columnExpr for sequence column TableIf targetTable = getTargetTable(); if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) { - String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol(); + OlapTable olapTable = (OlapTable) targetTable; + String sequenceCol = olapTable.getSequenceMapCol(); if (sequenceCol != null) { String finalSequenceCol = sequenceCol; Optional foundCol = columnDescs.descs.stream() @@ -199,8 +201,14 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol))); } else if (!fileGroupInfo.isPartialUpdate()) { - throw new UserException("Table " + targetTable.getName() - + " has sequence column, need to specify the sequence column"); + Column seqCol = olapTable.getFullSchema().stream() + .filter(col -> col.getName().equals(olapTable.getSequenceMapCol())) + .findFirst().get(); + if (seqCol.getDefaultValue() == null + || !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) { + throw new UserException("Table " + olapTable.getName() + + " has sequence column, need to specify the sequence column"); + } } } else { sequenceCol = context.fileGroup.getSequenceCol(); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.out index 7c0fa72acce152b..01b4341d54bbe5c 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.out @@ -17,3 +17,7 @@ 2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20 3 unknown 2500 \N 4321 2022-07-18 0 4 2022-07-18 +-- !sql -- +1 200 +2 400 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out index c27f441cc6b46bc..a789f18216e7de1 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_seq_col.out @@ -17,3 +17,7 @@ 2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20 3 unknown 1500 \N 4321 2022-07-20 0 4 2022-07-20 +-- !sql -- +1 200 +2 400 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.groovy index 008cd41169a048a..0f80940ec3393c3 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_insert_seq_col.groovy @@ -86,4 +86,29 @@ suite("test_partial_update_native_insert_seq_col_old_planner", "p0") { qt_partial_update_with_seq_hidden_columns """select * from ${tableName} order by id;""" sql """ DROP TABLE IF EXISTS ${tableName} """ + + + def tableName2 = "nereids_partial_update_native_insert_seq_col2" + sql """ DROP TABLE IF EXISTS ${tableName2} """ + sql """ + CREATE TABLE ${tableName2} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `score` int(11) NOT NULL COMMENT "用户得分", + `update_time` DATETIMEV2 NULL DEFAULT CURRENT_TIMESTAMP) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "update_time" + )""" + + // don't set enable_unique_key_partial_update, it's a row update + // the input data don't contains sequence mapping column but the sequence mapping + // column's default value is CURRENT_TIMESTAMP, will load successfully + sql "SET show_hidden_columns=false" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + sql "insert into ${tableName2}(id,score) values(2,400),(1,200);" + qt_sql """ select id,score from ${tableName2} order by id;""" + sql """ DROP TABLE IF EXISTS ${tableName2}; """ } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy index 06a066fc0a99c64..4a99f2a770021d0 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_seq_col.groovy @@ -124,4 +124,35 @@ suite("test_primary_key_partial_update_seq_col", "p0") { // drop drop sql """ DROP TABLE IF EXISTS ${tableName} """ + + + def tableName2 = "nereids_partial_update_native_insert_seq_col2" + sql """ DROP TABLE IF EXISTS ${tableName2} """ + sql """ + CREATE TABLE ${tableName2} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `score` int(11) NOT NULL COMMENT "用户得分", + `update_time` DATETIMEV2 NULL DEFAULT CURRENT_TIMESTAMP) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "update_time" + )""" + + // don't set partial update header, it's a row update streamload + // the input data don't contains sequence mapping column but the sequence mapping + // column's default value is CURRENT_TIMESTAMP, will load successfully + streamLoad { + table "${tableName2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'id,score' + + file 'basic.csv' + time 10000 // limit inflight 10s + } + qt_sql """ select id,score from ${tableName2} order by id;""" + sql """ DROP TABLE IF EXISTS ${tableName2}; """ }