diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 12ef819d6fb987..597e576041a0a2 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2930,6 +2930,15 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, Version dummy_version(end_version + 1, end_version + 1); auto rowset_schema = rowset->tablet_schema(); bool is_partial_update = rowset_writer && rowset_writer->is_partial_update(); + bool have_input_seq_column = false; + if (is_partial_update && rowset_schema->has_sequence_col()) { + std::vector including_cids = + rowset_writer->get_partial_update_info()->update_cids; + have_input_seq_column = + rowset_schema->has_sequence_col() && + (std::find(including_cids.cbegin(), including_cids.cend(), + rowset_schema->sequence_col_idx()) != including_cids.cend()); + } // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; @@ -2998,12 +3007,24 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, continue; } - // sequence id smaller than the previous one, so delete current row - if (st.is()) { + if (st.is() && (!is_partial_update || have_input_seq_column)) { + // `st.is()` means that there exists a row with the same key and larger value + // in seqeunce column. + // - If the current load is not a partial update, we just delete current row. + // - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts + // during concurrent partial updates. And there exists another load which introduces a row with + // the same keys and larger sequence column value published successfully after the commit phase + // of the current load. + // - If the columns we update include sequence column, we should delete the current row becase the + // partial update on the current row has been `overwritten` by the previous one with larger sequence + // column value. + // - Otherwise, we should combine the values of the missing columns in the previous row and the values + // of the including columns in the current row into a new row. delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, row_id); continue; - } else if (is_partial_update && rowset_writer != nullptr) { + } + if (is_partial_update && rowset_writer != nullptr) { // In publish version, record rows to be deleted for concurrent update // For example, if version 5 and 6 update a row, but version 6 only see // version 4 when write, and when publish version, version 5's value will diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out index bcd7e86c53ce31..f4a51133a8176d 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out @@ -6,3 +6,24 @@ 4 "bbbbbbbb" 4444 499 40 5 "cccccccccccc" 5555 599 50 +-- !sql -- +1 "ddddddddddd" 1111 199 10 +2 "eeeeee" 2222 299 20 +3 "aaaaa" 3333 399 30 +4 "bbbbbbbb" 4444 499 40 +5 "cccccccccccc" 5555 599 50 + +-- !sql -- +1 "ddddddddddd" 1111 199 10 0 5 10 +2 "eeeeee" 2222 299 20 0 5 20 +3 "aaaaa" 3333 399 30 0 5 30 +4 "bbbbbbbb" 4444 499 40 0 5 40 +5 "cccccccccccc" 5555 599 50 0 5 50 + +-- !sql -- +1 "ddddddddddd" 1111 199 10 0 5 10 +2 "eeeeee" 2222 299 20 0 5 20 +3 "aaaaa" 3333 399 30 0 5 30 +4 "bbbbbbbb" 4444 499 40 0 5 40 +5 "cccccccccccc" 5555 599 50 0 5 50 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy index 19522e8064e122..ba0c1766aa161e 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy @@ -17,7 +17,156 @@ suite("test_primary_key_partial_update_parallel", "p0") { + // case 1: concurrent partial update def tableName = "test_primary_key_partial_update" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2), + (1, "doris", 1000, 123, 1), + (5, "doris5", 5000, 523, 5), + (4, "doris4", 4000, 423, 4), + (3, "doris3", 3000, 323, 3);""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s + } + } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score,test' + + file 'partial_update_parallel2.csv' + time 10000 // limit inflight 10s + } + } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + + t1.join() + t2.join() + t3.join() + + sql "sync" + + qt_sql """ select * from ${tableName} order by id;""" + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + + + // case 2: concurrent partial update with row store column + tableName = "test_primary_key_row_store_partial_update" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2), + (1, "doris", 1000, 123, 1), + (5, "doris5", 5000, 523, 5), + (4, "doris4", 4000, 423, 4), + (3, "doris3", 3000, 323, 3);""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s + } + } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score,test' + + file 'partial_update_parallel2.csv' + time 10000 // limit inflight 10s + } + } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + + t1.join() + t2.join() + t3.join() + + sql "sync" + + qt_sql """ select * from ${tableName} order by id;""" + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + + + // case 3: concurrent partial update with sequence column + tableName = "test_primary_key_seq_partial_update" // create table sql """ DROP TABLE IF EXISTS ${tableName} """ @@ -29,14 +178,21 @@ suite("test_primary_key_partial_update_parallel", "p0") { `test` int(11) NULL COMMENT "null test", `dft` int(11) DEFAULT "4321") UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true") + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "dft") """ sql """insert into ${tableName} values + (2, "deprecated", 99999, 999, 1), (2, "doris2", 2000, 223, 2), (1, "doris", 1000, 123, 1), + (3, "deprecated", 99999, 999, 2), (5, "doris5", 5000, 523, 5), (4, "doris4", 4000, 423, 4), + (4, "deprecated", 99999, 999, 3), + (4, "deprecated", 99999, 999, 1), (3, "doris3", 3000, 323, 3);""" t1 = Thread.startDaemon { @@ -85,9 +241,94 @@ suite("test_primary_key_partial_update_parallel", "p0") { t2.join() t3.join() + sql "set show_hidden_columns=true;" sql "sync" qt_sql """ select * from ${tableName} order by id;""" + sql "set show_hidden_columns=false;" + sql "sync" + sql """ DROP TABLE IF EXISTS ${tableName}; """ + + + // case 4: concurrent partial update with row store column and sequence column + tableName = "test_primary_key_row_store_seq_partial_update" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "function_column.sequence_col" = "dft", + "store_row_column" = "true") + """ + + sql """insert into ${tableName} values + (2, "deprecated", 99999, 999, 1), + (2, "doris2", 2000, 223, 2), + (1, "doris", 1000, 123, 1), + (3, "deprecated", 99999, 999, 2), + (5, "doris5", 5000, 523, 5), + (4, "doris4", 4000, 423, 4), + (4, "deprecated", 99999, 999, 3), + (4, "deprecated", 99999, 999, 1), + (3, "doris3", 3000, 323, 3);""" + + t1 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + file 'partial_update_parallel1.csv' + time 10000 // limit inflight 10s + } + } + + t2 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score,test' + + file 'partial_update_parallel2.csv' + time 10000 // limit inflight 10s + } + } + + t3 = Thread.startDaemon { + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft' + + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + } + } + + t1.join() + t2.join() + t3.join() + + sql "set show_hidden_columns=true;" + sql "sync" + + qt_sql """ select id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__ from ${tableName} order by id;""" sql """ DROP TABLE IF EXISTS ${tableName}; """ }