Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 8, 2023
1 parent d517d7a commit 2251739
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 21 deletions.
6 changes: 3 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ Status DeltaWriter::init() {
return Status::OK();
}
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
_rowset_builder.get_partial_udpate_info()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
return Status::OK();
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -124,7 +125,7 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down Expand Up @@ -225,8 +226,12 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
// _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
// table_schema_param->partial_update_input_columns());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class DeltaWriterV2 {
MonotonicStopWatch _lock_watch;

std::vector<std::shared_ptr<LoadStreamStub>> _streams;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
13 changes: 7 additions & 6 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_is_partial_update(partial_update_info.is_partial_update),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
Expand All @@ -77,8 +78,8 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
if (_tablet_schema->is_partial_update()) {
_num_columns = _tablet_schema->partial_input_column_size();
if (_is_partial_update) {
_num_columns = partial_update_info.partial_update_input_columns.size();
}
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -178,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
// for unique key partial update, sequence column index in block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
Expand Down Expand Up @@ -417,7 +418,7 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand All @@ -428,7 +429,7 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
Expand All @@ -42,6 +43,7 @@ class Schema;
class SlotDescriptor;
class TabletSchema;
class TupleDescriptor;
struct PartialUpdateInfo;
enum KeysType : int;

// row pos in _input_mutable_block
Expand Down Expand Up @@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();

Expand Down Expand Up @@ -202,6 +205,7 @@ class MemTable {
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;

Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
Expand All @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() {
}

Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema, bool unique_key_mow) {
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;

_reset_mem_table();

Expand Down Expand Up @@ -195,7 +199,7 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_unique_key_mow, mem_table_insert_tracker,
_unique_key_mow, *_partial_update_info, mem_table_insert_tracker,
mem_table_flush_tracker));

_segment_num++;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class MemTableWriter {
~MemTableWriter();

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
Expand Down Expand Up @@ -141,6 +142,8 @@ class MemTableWriter {
int64_t _segment_num = 0;

MonotonicStopWatch _lock_watch;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

bool is_partial_update = false;
};

} // namespace doris
16 changes: 10 additions & 6 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
Expand Down Expand Up @@ -226,7 +227,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
// For partial update, we need to fill in the entire row of data, during the calculation
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.
if (_rowset->tablet_schema()->is_partial_update()) {
if (_partial_update_info->is_partial_update) {
return Status::OK();
}

Expand All @@ -238,8 +239,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}

Status RowsetBuilder::wait_calc_delete_bitmap() {
if (!_tablet->enable_unique_key_merge_on_write() ||
_rowset->tablet_schema()->is_partial_update()) {
if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) {
return Status::OK();
}
std::lock_guard<std::mutex> l(_lock);
Expand Down Expand Up @@ -324,9 +324,13 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
_tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
// _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
// table_schema_param->partial_update_input_columns());
// _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/olap/rowset_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class RowsetBuilder {
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }

std::shared_ptr<PartialUpdateInfo> get_partial_udpate_info() const {
return _partial_update_info;
}

private:
void _garbage_collection();

Expand Down Expand Up @@ -113,6 +117,8 @@ class RowsetBuilder {
// current rowset_ids, used to do diff in publish_version
RowsetIdUnorderedSet _rowset_ids;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;

RuntimeProfile* _profile = nullptr;
RuntimeProfile::Counter* _build_rowset_timer = nullptr;
RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr;
Expand Down
31 changes: 31 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,35 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b);

using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool is_partial_update,
const std::set<string>& partial_update_input_columns, bool is_strict_mode) {
this->is_partial_update = is_partial_update;
this->partial_update_input_columns = partial_update_input_columns;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (partial_update_input_columns.count(tablet_column.name()) == 0) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
}
this->is_strict_mode = is_strict_mode;
}

bool is_partial_update {false};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
// if key not exist in old rowset, use default value or null value for the unmentioned cols
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
};

} // namespace doris

0 comments on commit 2251739

Please sign in to comment.