Skip to content

Commit

Permalink
update case and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 16, 2023
1 parent 2490b3e commit bbf535d
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 11 deletions.
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class BetaRowsetWriter : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

bool is_transient_rowset_writer() override { return _context.is_transient_rowset_writer; }

private:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class RowsetWriter {

virtual int64_t segment_writer_ns() { return 0; }

virtual bool is_transient_rowset_writer() { return false; }

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

bool is_transient_rowset_writer {false};
};

} // namespace doris
8 changes: 7 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,7 @@ Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = table_id();
context.enable_segcompaction = false;
context.is_transient_rowset_writer = true;
// ATTN: context.tablet is a shared_ptr, can't simply set it's value to `this`. We should
// get the shared_ptr from tablet_manager.
context.tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
Expand Down Expand Up @@ -2919,6 +2920,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
bool is_partial_update = rowset_schema->is_partial_update();
bool is_transient_rowset_writer = rowset_writer->is_transient_rowset_writer();
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
Expand Down Expand Up @@ -2988,7 +2990,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}

// sequence id smaller than the previous one, so delete current row
if (st.is<KEY_ALREADY_EXISTS>()) {
if (st.is<KEY_ALREADY_EXISTS>() && !is_transient_rowset_writer) {
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
Expand Down Expand Up @@ -3039,6 +3041,10 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}

if (pos > 0) {
LOG(INFO) << fmt::format(
"[before generate_new_block_for_partial_update] pos; {}, missing_cids: {}, "
"update_cids: {}",
pos, rowset_schema->get_missing_cids(), rowset_schema->get_update_cids());
RETURN_IF_ERROR(generate_new_block_for_partial_update(
rowset_schema, read_plan_ori, read_plan_update, rsid_to_rowset, &block));
sort_block(block, ordered_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
5 "cccccccccccc" 5555 599 50 0 5 50

-- !sql --
1 "ddddddddddd" 1000 123 10 0 5 10
2 "eeeeee" 2000 223 20 0 5 20
3 "aaaaa" 3000 323 30 0 5 30
4 "bbbbbbbb" 4000 423 40 0 5 40
5 "cccccccccccc" 5000 523 50 0 5 50
1 "ddddddddddd" 1111 199 10 0 5 10
2 "eeeeee" 2222 299 20 0 5 20
3 "aaaaa" 3333 399 30 0 5 30
4 "bbbbbbbb" 4444 499 40 0 5 40
5 "cccccccccccc" 5555 599 50 0 5 50

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

suite("test_primary_key_partial_update_parallel", "p0") {

// case 1: concurrent partial update
def tableName = "test_primary_key_partial_update"

// create table
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -92,8 +91,8 @@ suite("test_primary_key_partial_update_parallel", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName}; """


// case 2: concurrent partial update with row store column
tableName = "test_primary_key_row_store_partial_update"
// create table
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -166,6 +165,7 @@ suite("test_primary_key_partial_update_parallel", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName}; """


// case 3: concurrent partial update with sequence column
tableName = "test_primary_key_seq_partial_update"

// create table
Expand Down Expand Up @@ -250,9 +250,8 @@ suite("test_primary_key_partial_update_parallel", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName}; """


// case 4: concurrent partial update with row store column and sequence column
tableName = "test_primary_key_row_store_seq_partial_update"

// create table
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
Expand Down

0 comments on commit bbf535d

Please sign in to comment.