Skip to content

Commit

Permalink
updat
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 authored and dataroaring committed Oct 11, 2023
1 parent 73f632a commit 6f8bfc2
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -442,6 +443,17 @@ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
targetPartitionIds.add(part.getId());
}
}

if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null && targetColumnNames != null) {
Optional<String> foundCol = targetColumnNames.stream()
.filter(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny();
if (!foundCol.isPresent() && !isPartialUpdate
&& !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}

if (isPartialUpdate && olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -375,8 +376,19 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends P

HashSet<String> partialUpdateCols = new HashSet<>();
boolean isPartialUpdate = olapTableSink.isPartialUpdate();
OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable();

if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& !olapTableSink.getCols().isEmpty() && olapTableSink.isFromNativeInsertStmt()) {
Optional<Column> foundCol = olapTableSink.getCols().stream()
.filter(col -> col.getName().equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny();
if (!foundCol.isPresent() && !isPartialUpdate) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}

if (isPartialUpdate) {
OlapTable olapTable = olapTableSink.getTargetTable();
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,13 @@ public List<Rule> buildRules() {
Optional<Column> seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst();
if (!seqCol.isPresent()) {
if (!seqCol.isPresent() && !sink.isPartialUpdate()) {
throw new AnalysisException("sequence column is not contained in"
+ " target table " + table.getName());
}
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName()));
if (columnToOutput.get(seqCol.get().getName()) != null) {
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName()));
}
} else if (sink.isPartialUpdate()) {
// If the current load is a partial update, the values of unmentioned
// columns will be filled in SegmentWriter. And the output of sink node
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 doris 1000 123 1 2023-01-01
2 doris2 2000 223 1 2023-01-01

-- !partial_update_without_seq --
1 doris 200 123 1 2023-01-01
2 doris2 400 223 1 2023-01-01

-- !partial_update_with_seq --
1 doris 200 123 1 2023-01-01
2 doris2 2600 223 1 2023-07-20
3 unknown 2500 \N 4321 2022-07-18

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 0 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20
3 unknown 2500 \N 4321 2022-07-18 0 4 2022-07-18

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 doris 1000 123 1 2023-01-01
2 doris2 2000 223 1 2023-01-01

-- !partial_update_without_seq --
1 doris 200 123 1 2023-01-01
2 doris2 400 223 1 2023-01-01

-- !partial_update_with_seq --
1 doris 200 123 1 2023-01-01
2 doris2 2600 223 1 2023-07-20
3 unknown 2500 \N 4321 2022-07-18

-- !partial_update_with_seq_hidden_columns --
1 doris 200 123 1 2023-01-01 0 3 2023-01-01
2 doris2 2600 223 1 2023-07-20 0 4 2023-07-20
3 unknown 2500 \N 4321 2022-07-18 0 4 2022-07-18

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("nereids_partial_update_native_insert_seq_col", "p0") {
sql "set enable_nereids_dml=true;"
sql "set experimental_enable_nereids_planner=true;"
sql "set enable_fallback_to_original_planner=false;"
sql "sync;"

def tableName = "nereids_partial_update_native_insert_seq_col"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分",
`test` int(11) NULL COMMENT "null test",
`dft` int(11) DEFAULT "4321",
`update_time` date NULL)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "update_time"
)
"""
sql """ insert into ${tableName} values
(2, "doris2", 2000, 223, 1, '2023-01-01'),
(1, "doris", 1000, 123, 1, '2023-01-01');"""
sql "sync"

qt_select_default """ select * from ${tableName} order by id;"""

// don't set partial update header, it's a row update streamload
// the input data don't contains sequence mapping column, will load fail
test {
sql "insert into ${tableName}(id,score) values(2,400),(1,200);"
exception "Table nereids_partial_update_native_insert_seq_col has sequence column, need to specify the sequence column"
}

// set partial update header, should success
// we don't provide the sequence column in input data, so the updated rows
// should use there original sequence column values.
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${tableName}(id,score) values(2,400),(1,200);"
sql "set enable_unique_key_partial_update=false;"
sql "sync;"

qt_partial_update_without_seq """ select * from ${tableName} order by id;"""

// provide the sequence column this time, should update according to the
// given sequence values
sql "set enable_unique_key_partial_update=true;"
sql "set enable_insert_strict = false;"
sql "sync;"
sql """insert into ${tableName}(id,score,update_time) values
(2,2500,"2023-07-19"),
(2,2600,"2023-07-20"),
(1,1300,"2022-07-19"),
(3,1500,"2022-07-20"),
(3,2500,"2022-07-18");"""
sql "set enable_unique_key_partial_update=false;"
sql "sync;"

qt_partial_update_with_seq """ select * from ${tableName} order by id;"""

sql "SET show_hidden_columns=true"
sql "sync"

qt_partial_update_with_seq_hidden_columns """select * from ${tableName} order by id;"""

sql """ DROP TABLE IF EXISTS ${tableName} """
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_partial_update_native_insert_seq_col_old_planner", "p0") {
sql "set enable_nereids_dml=false;"
sql "set experimental_enable_nereids_planner=false;"
sql "set enable_fallback_to_original_planner=true;"
sql "sync;"

def tableName = "test_partial_update_native_insert_seq_col_old_planner"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分",
`test` int(11) NULL COMMENT "null test",
`dft` int(11) DEFAULT "4321",
`update_time` date NULL)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"function_column.sequence_col" = "update_time"
)
"""
sql """ insert into ${tableName} values
(2, "doris2", 2000, 223, 1, '2023-01-01'),
(1, "doris", 1000, 123, 1, '2023-01-01');"""
sql "sync"

qt_select_default """ select * from ${tableName} order by id;"""

// don't set partial update header, it's a row update streamload
// the input data don't contains sequence mapping column, will load fail
test {
sql "insert into ${tableName}(id,score) values(2,400),(1,200);"
exception "Table test_partial_update_native_insert_seq_col_old_planner has sequence column, need to specify the sequence column"
}

// set partial update header, should success
// we don't provide the sequence column in input data, so the updated rows
// should use there original sequence column values.
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${tableName}(id,score) values(2,400),(1,200);"
sql "set enable_unique_key_partial_update=false;"
sql "sync;"

qt_partial_update_without_seq """ select * from ${tableName} order by id;"""

// provide the sequence column this time, should update according to the
// given sequence values
sql "set enable_unique_key_partial_update=true;"
sql "set enable_insert_strict = false;"
sql "sync;"
sql """insert into ${tableName}(id,score,update_time) values
(2,2500,"2023-07-19"),
(2,2600,"2023-07-20"),
(1,1300,"2022-07-19"),
(3,1500,"2022-07-20"),
(3,2500,"2022-07-18");"""
sql "set enable_unique_key_partial_update=false;"
sql "sync;"

qt_partial_update_with_seq """ select * from ${tableName} order by id;"""

sql "SET show_hidden_columns=true"
sql "sync"

qt_partial_update_with_seq_hidden_columns """select * from ${tableName} order by id;"""

sql """ DROP TABLE IF EXISTS ${tableName} """
}

0 comments on commit 6f8bfc2

Please sign in to comment.