Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 9, 2023
1 parent ffaa145 commit 9b3f5b2
Show file tree
Hide file tree
Showing 26 changed files with 136 additions and 64 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ Status DataDir::load() {
Status commit_txn_status = _txn_manager->commit_txn(
_meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
rowset, true);
rowset, true, false);
if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ Status DeltaWriter::init() {
return Status::OK();
}
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
_rowset_builder.get_partial_update_info()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
return Status::OK();
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -124,7 +125,7 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down Expand Up @@ -225,8 +226,12 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
// _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
// table_schema_param->partial_update_input_columns());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class DeltaWriterV2 {
MonotonicStopWatch _lock_watch;

std::vector<std::shared_ptr<LoadStreamStub>> _streams;

std::shared_ptr<PartialUpdateInfo> _partial_update_info {PartialUpdateInfo {}};
};

} // namespace doris
13 changes: 7 additions & 6 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_is_partial_update(partial_update_info.is_partial_update),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
Expand All @@ -77,8 +78,8 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
if (_tablet_schema->is_partial_update()) {
_num_columns = _tablet_schema->partial_input_column_size();
if (_is_partial_update) {
_num_columns = partial_update_info.partial_update_input_columns.size();
}
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -178,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
// for unique key partial update, sequence column index in block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
Expand Down Expand Up @@ -417,7 +418,7 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand All @@ -428,7 +429,7 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
Expand All @@ -42,6 +43,7 @@ class Schema;
class SlotDescriptor;
class TabletSchema;
class TupleDescriptor;
struct PartialUpdateInfo;
enum KeysType : int;

// row pos in _input_mutable_block
Expand Down Expand Up @@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();

Expand Down Expand Up @@ -202,6 +205,7 @@ class MemTable {
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;

Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
Expand All @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() {
}

Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema, bool unique_key_mow) {
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;

_reset_mem_table();

Expand Down Expand Up @@ -195,7 +199,7 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_unique_key_mow, mem_table_insert_tracker,
_unique_key_mow, *_partial_update_info, mem_table_insert_tracker,
mem_table_flush_tracker));

_segment_num++;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class MemTableWriter {
~MemTableWriter();

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
Expand Down Expand Up @@ -141,6 +142,8 @@ class MemTableWriter {
int64_t _segment_num = 0;

MonotonicStopWatch _lock_watch;

std::shared_ptr<PartialUpdateInfo> _partial_update_info {PartialUpdateInfo {}};
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ struct HashOfRowsetId {
using RowsetIdUnorderedSet = std::unordered_set<RowsetId, HashOfRowsetId>;

class DeleteBitmap;
struct PartialUpdateInfo;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
del_preds.pop();
}
Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn(
request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false);
request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false,
false);
if (!commit_status.ok() && !commit_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
res = std::move(commit_status);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
_context.tablet_schema->is_partial_update()) {
_context.mow_context->partial_update_info->is_partial_update) {
return Status::OK();
}
auto rowset = _build_tmp();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class BetaRowsetWriter : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

bool is_partial_update() override { return _context.is_partial_update; }

private:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

bool is_partial_update() override { return _context.partial_update_info->is_partial_update; }

private:
RowsetWriterContext _context;

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class RowsetWriter {

virtual int64_t segment_writer_ns() { return 0; }

virtual bool is_partial_update() = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

PartialUpdateInfo partial_update_info;
};

} // namespace doris
17 changes: 9 additions & 8 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);

// find missing column cids
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
std::vector<uint32_t> missing_cids = _mow_context->partial_update_info->missing_cids;
std::vector<uint32_t> including_cids = _mow_context->partial_update_info->update_cids;

// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
Expand Down Expand Up @@ -421,15 +421,15 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_tablet_schema->is_strict_mode()) {
if (_mow_context->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
}

if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
if (!_mow_context->partial_update_info->can_insert_new_rows_in_partial_update) {
return Status::InternalError(
"the unmentioned columns should have default value or be nullable for "
"newly inserted rows in non-strict mode partial update");
Expand Down Expand Up @@ -492,7 +492,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}

// convert missing columns and send to column writer
auto cids_missing = _tablet_schema->get_missing_cids();
auto cids_missing = _mow_context->partial_update_info->missing_cids;
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows,
cids_missing);
for (auto cid : cids_missing) {
Expand Down Expand Up @@ -545,8 +545,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
bool has_default_or_nullable,
const size_t& segment_start_pos) {
// create old value columns
auto old_value_block = _tablet_schema->create_missing_columns_block();
std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
std::vector<uint32_t> cids_missing = _mow_context->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK(cids_missing.size() == old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
Expand Down Expand Up @@ -652,7 +652,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
if (_tablet_schema->is_partial_update() && _opts.write_type == DataWriteType::TYPE_DIRECT) {
if (_mow_context->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
return Status::OK();
}
Expand Down
23 changes: 14 additions & 9 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
Expand Down Expand Up @@ -131,7 +132,7 @@ Status RowsetBuilder::init() {
}
_delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_id());
mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id, _rowset_ids,
_delete_bitmap);
_delete_bitmap, _partial_update_info);
}

// check tablet version number
Expand Down Expand Up @@ -226,7 +227,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
// For partial update, we need to fill in the entire row of data, during the calculation
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.
if (_rowset->tablet_schema()->is_partial_update()) {
if (_partial_update_info->is_partial_update) {
return Status::OK();
}

Expand All @@ -238,8 +239,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}

Status RowsetBuilder::wait_calc_delete_bitmap() {
if (!_tablet->enable_unique_key_merge_on_write() ||
_rowset->tablet_schema()->is_partial_update()) {
if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) {
return Status::OK();
}
std::lock_guard<std::mutex> l(_lock);
Expand Down Expand Up @@ -270,8 +270,9 @@ Status RowsetBuilder::commit_txn() {

std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
_req.load_id, _rowset, false);
Status res = _storage_engine->txn_manager()->commit_txn(
_req.partition_id, _tablet, _req.txn_id, _req.load_id, _rowset, false,
_partial_update_info->is_partial_update);

if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
Expand Down Expand Up @@ -324,9 +325,13 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
_tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
// _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
// table_schema_param->partial_update_input_columns());
// _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
Loading

0 comments on commit 9b3f5b2

Please sign in to comment.