Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 8, 2023
1 parent 2839278 commit d9e2404
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 31 deletions.
6 changes: 3 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ Status DeltaWriter::init() {
return Status::OK();
}
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_builder.rowset_writer(),
_rowset_builder.tablet_schema(),
_rowset_builder.get_partial_udpate_info()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
return Status::OK();
Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_is_partial_update(partial_update_info.is_partial_update),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
Expand All @@ -77,8 +78,8 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
if (_tablet_schema->is_partial_update()) {
_num_columns = _tablet_schema->partial_input_column_size();
if (_is_partial_update) {
_num_columns = partial_update_info.partial_update_input_columns.size();
}
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -178,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
// for unique key partial update, sequence column index in block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
Expand Down Expand Up @@ -417,7 +418,7 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand All @@ -428,7 +429,7 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
Expand All @@ -42,6 +43,7 @@ class Schema;
class SlotDescriptor;
class TabletSchema;
class TupleDescriptor;
struct PartialUpdateInfo;
enum KeysType : int;

// row pos in _input_mutable_block
Expand Down Expand Up @@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker,
bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();

Expand Down Expand Up @@ -202,6 +205,7 @@ class MemTable {
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;

Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
Expand All @@ -63,10 +64,12 @@ MemTableWriter::~MemTableWriter() {
}

Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema, bool unique_key_mow) {
TabletSchemaSPtr tablet_schema,
const PartialUpdateInfo& partial_update_info, bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;

_reset_mem_table();

Expand Down Expand Up @@ -195,7 +198,7 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_unique_key_mow, mem_table_insert_tracker,
_unique_key_mow, _partial_update_info, mem_table_insert_tracker,
mem_table_flush_tracker));

_segment_num++;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MemTableWriter {
~MemTableWriter();

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
bool unique_key_mow = false);
const PartialUpdateInfo& partial_update_info, bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
bool is_append = false);
Expand Down Expand Up @@ -141,6 +141,8 @@ class MemTableWriter {
int64_t _segment_num = 0;

MonotonicStopWatch _lock_watch;

PartialUpdateInfo _partial_update_info;
};

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

bool is_partial_update = false;
};

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
// For partial update, we need to fill in the entire row of data, during the calculation
// of the delete bitmap. This operation is resource-intensive, and we need to minimize
// the number of times it occurs. Therefore, we skip this operation here.
if (_rowset->tablet_schema()->is_partial_update()) {
if (_partial_update_info.is_partial_update) {
return Status::OK();
}

Expand All @@ -238,8 +238,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}

Status RowsetBuilder::wait_calc_delete_bitmap() {
if (!_tablet->enable_unique_key_merge_on_write() ||
_rowset->tablet_schema()->is_partial_update()) {
if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info.is_partial_update) {
return Status::OK();
}
std::lock_guard<std::mutex> l(_lock);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class RowsetBuilder {
// For UT
DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }

PartialUpdateInfo get_partial_udpate_info() const { return _partial_update_info; }

private:
void _garbage_collection();

Expand Down
30 changes: 15 additions & 15 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,32 +421,32 @@ using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool is_partial_update,
const std::set<string>& partial_update_input_columns, bool is_strict_mode) {
_is_partial_update = is_partial_update;
_partial_update_input_columns = partial_update_input_columns;
_missing_cids.clear();
_update_cids.clear();
this->is_partial_update = is_partial_update;
this->partial_update_input_columns = partial_update_input_columns;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (_partial_update_input_columns.count(tablet_column.name()) == 0) {
_missing_cids.emplace_back(i);
if (partial_update_input_columns.count(tablet_column.name()) == 0) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
_can_insert_new_rows_in_partial_update = false;
can_insert_new_rows_in_partial_update = false;
}
} else {
_update_cids.emplace_back(i);
update_cids.emplace_back(i);
}
}
_is_strict_mode = is_strict_mode;
this->is_strict_mode = is_strict_mode;
}

bool _is_partial_update {false};
std::set<std::string> _partial_update_input_columns;
std::vector<uint32_t> _missing_cids;
std::vector<uint32_t> _update_cids;
bool is_partial_update {false};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
// if key not exist in old rowset, use default value or null value for the unmentioned cols
// to generate a new row, only available in non-strict mode
bool _can_insert_new_rows_in_partial_update {true};
bool _is_strict_mode {false};
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
};

} // namespace doris

0 comments on commit d9e2404

Please sign in to comment.