Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 18, 2023
1 parent f24c297 commit 8f233c7
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ MySQL [test]> select * from test_table;
At this point, you can replace the original data in the table. To sum up, the sequence column will be compared among all the batches, the largest value of the same key will be imported into Doris table.

## Note
1. To prevent misuse, in load tasks like StreamLoad/BrokerLoad, user must specify the sequence column; otherwise, user will receive the following error message:
1. To prevent misuse, in load tasks like StreamLoad/BrokerLoad and row updates with insert statements, user must explicitly specify the sequence column (unless the sequence column's default value is CURRENT_TIMESTAMP); otherwise, user will receive the following error message:
```
Table test_tbl has a sequence column, need to specify the sequence column
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ MySQL [test]> select * from test_table;
此时就可以替换表中原有的数据。综上,在导入过程中,会比较所有批次的sequence列值,选择值最大的记录导入Doris表中。

## 注意
1. 为防止误用,在StreamLoad/BrokerLoad等导入任务中,必须要指定sequence列,不然会收到以下报错信息:
1. 为防止误用,在StreamLoad/BrokerLoad等导入任务以及行更新insert语句中,用户必须显示指定sequence列(除非sequence列的默认值为CURRENT_TIMESTAMP),不然会收到以下报错信息:
```
Table test_tbl has sequence column, need to specify the sequence column
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.analysis;

import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand Down Expand Up @@ -447,10 +448,16 @@ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null && targetColumnNames != null) {
Optional<String> foundCol = targetColumnNames.stream()
.filter(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny();
Column seqCol = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
.findFirst().get();
if (!foundCol.isPresent() && !isPartialUpdate && !isFromDeleteOrUpdateStmt
&& !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
if (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,8 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends P

HashSet<String> partialUpdateCols = new HashSet<>();
boolean isPartialUpdate = olapTableSink.isPartialUpdate();
OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable();

if (isPartialUpdate) {
OlapTable olapTable = olapTableSink.getTargetTable();
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
Expand All @@ -43,7 +44,6 @@
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
Expand Down Expand Up @@ -117,8 +117,8 @@ public List<Rule> buildRules() {
Optional<String> foundCol = sink.getColNames().stream()
.filter(col -> col.equals(table.getSequenceMapCol()))
.findFirst();
if (!foundCol.isPresent() && seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
if (!foundCol.isPresent() && (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP))) {
throw new AnalysisException("Table " + table.getName()
+ " has sequence column, need to specify the sequence column");
}
Expand Down Expand Up @@ -165,7 +165,7 @@ public List<Rule> buildRules() {
Optional<Column> seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst();
if (!seqCol.isPresent() && !sink.isPartialUpdate()) {
if (!seqCol.isPresent()) {
throw new AnalysisException("sequence column is not contained in"
+ " target table " + table.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.planner.external;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
Expand Down Expand Up @@ -188,7 +189,8 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a
// add columnExpr for sequence column
TableIf targetTable = getTargetTable();
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) {
String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol();
OlapTable olapTable = (OlapTable) targetTable;
String sequenceCol = olapTable.getSequenceMapCol();
if (sequenceCol != null) {
String finalSequenceCol = sequenceCol;
Optional<ImportColumnDesc> foundCol = columnDescs.descs.stream()
Expand All @@ -199,8 +201,14 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a
columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
new SlotRef(null, sequenceCol)));
} else if (!fileGroupInfo.isPartialUpdate()) {
throw new UserException("Table " + targetTable.getName()
+ " has sequence column, need to specify the sequence column");
Column seqCol = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
.findFirst().get();
if (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
throw new UserException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
} else {
sequenceCol = context.fileGroup.getSequenceCol();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20
3 unknown 2500 \N 4321 2022-07-18 0 4 2022-07-18

-- !sql --
1 200
2 400

Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20
3 unknown 1500 \N 4321 2022-07-20 0 4 2022-07-20

-- !sql --
1 200
2 400

Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,29 @@ suite("test_partial_update_native_insert_seq_col_old_planner", "p0") {
qt_partial_update_with_seq_hidden_columns """select * from ${tableName} order by id;"""

sql """ DROP TABLE IF EXISTS ${tableName} """


def tableName2 = "nereids_partial_update_native_insert_seq_col2"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE ${tableName2} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`score` int(11) NOT NULL COMMENT "用户得分",
`update_time` DATETIMEV2 NULL DEFAULT CURRENT_TIMESTAMP)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "update_time"
)"""

// don't set enable_unique_key_partial_update, it's a row update
// the input data don't contains sequence mapping column but the sequence mapping
// column's default value is CURRENT_TIMESTAMP, will load successfully
sql "SET show_hidden_columns=false"
sql "set enable_unique_key_partial_update=false;"
sql "sync;"
sql "insert into ${tableName2}(id,score) values(2,400),(1,200);"
qt_sql """ select id,score from ${tableName2} order by id;"""
sql """ DROP TABLE IF EXISTS ${tableName2}; """
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,35 @@ suite("test_primary_key_partial_update_seq_col", "p0") {

// drop drop
sql """ DROP TABLE IF EXISTS ${tableName} """


def tableName2 = "nereids_partial_update_native_insert_seq_col2"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE ${tableName2} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`score` int(11) NOT NULL COMMENT "用户得分",
`update_time` DATETIMEV2 NULL DEFAULT CURRENT_TIMESTAMP)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "update_time"
)"""

// don't set partial update header, it's a row update streamload
// the input data don't contains sequence mapping column but the sequence mapping
// column's default value is CURRENT_TIMESTAMP, will load successfully
streamLoad {
table "${tableName2}"

set 'column_separator', ','
set 'format', 'csv'
set 'columns', 'id,score'

file 'basic.csv'
time 10000 // limit inflight 10s
}
qt_sql """ select id,score from ${tableName2} order by id;"""
sql """ DROP TABLE IF EXISTS ${tableName2}; """
}

0 comments on commit 8f233c7

Please sign in to comment.