Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Sep 22, 2023
1 parent 320fc14 commit 9fbe9b9
Show file tree
Hide file tree
Showing 57 changed files with 1,430 additions and 281 deletions.
8 changes: 5 additions & 3 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_version = pschema.version();
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_is_unique_key_replace_if_not_null = pschema.is_unique_key_replace_if_not_null();

for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
Expand Down Expand Up @@ -176,9 +177,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_table_id = tschema.table_id;
_version = tschema.version;
_is_partial_update = tschema.is_partial_update;
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
_is_strict_mode = tschema.__isset.is_strict_mode && tschema.is_strict_mode;
_is_unique_key_replace_if_not_null = tschema.__isset.is_unique_key_replace_if_not_null &&
tschema.is_unique_key_replace_if_not_null;

for (auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
Expand Down Expand Up @@ -246,6 +247,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_is_unique_key_replace_if_not_null(_is_unique_key_replace_if_not_null);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class OlapTableSchemaParam {
return _partial_update_input_columns;
}
bool is_strict_mode() const { return _is_strict_mode; }
bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; }
std::string debug_string() const;

private:
Expand All @@ -104,6 +105,7 @@ class OlapTableSchemaParam {
bool _is_partial_update = false;
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
bool _is_unique_key_replace_if_not_null = false;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
15 changes: 9 additions & 6 deletions be/src/olap/calc_delete_bitmap_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
namespace doris {
using namespace ErrorCode;

Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
const segment_v2::SegmentSharedPtr& cur_segment,
const std::vector<RowsetSharedPtr>& target_rowsets,
int64_t end_version, RowsetWriter* rowset_writer) {
Status CalcDeleteBitmapToken::submit(
TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
const segment_v2::SegmentSharedPtr& cur_segment,
const std::vector<RowsetSharedPtr>& target_rowsets,
std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps,
int64_t end_version, RowsetWriter* rowset_writer) {
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
Expand All @@ -45,8 +47,9 @@ Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur
_delete_bitmaps.push_back(bitmap);
}
return _thread_token->submit_func([=, this]() {
auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets,
bitmap, end_version, rowset_writer);
auto st =
tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets, bitmap,
indicator_maps, end_version, rowset_writer);
if (!st.ok()) {
LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
<< tablet->tablet_id() << " rowset: " << cur_rowset->rowset_id()
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/calc_delete_bitmap_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ class CalcDeleteBitmapToken {

Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
const segment_v2::SegmentSharedPtr& cur_segment,
const std::vector<RowsetSharedPtr>& target_rowsets, int64_t end_version,
RowsetWriter* rowset_writer);
const std::vector<RowsetSharedPtr>& target_rowsets,
std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps,
int64_t end_version, RowsetWriter* rowset_writer);

// wait all tasks in token to be completed.
Status wait();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr

OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets,
delete_bitmap, cur_version, nullptr,
delete_bitmap, nullptr, cur_version, nullptr,
rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
Expand Down
18 changes: 15 additions & 3 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <typeinfo>
#include <unordered_map>
#include <unordered_set>
#include <variant>

#include "io/io_common.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -471,10 +472,21 @@ struct MowContext {
// used in mow partial update
struct RidAndPos {
uint32_t rid;
// pos in block
size_t pos;
uint32_t pos; // pos in block
};

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;
struct ReadRowsInfo {
RidAndPos id_and_pos;
std::vector<uint32_t> cids; // cids for partial update columns
};
struct ReadColumnsInfo {
std::vector<RidAndPos> missing_column_rows;
std::map<uint32_t, std::vector<RidAndPos>> partial_update_rows;
};

using RowStoreReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<ReadRowsInfo>>>;
using ColumnStoreReadPlan = std::map<RowsetId, std::map<uint32_t, ReadColumnsInfo>>;
using PartialUpdateReadPlan = std::variant<ColumnStoreReadPlan, RowStoreReadPlan>;
using IndicatorMaps = std::map<uint32_t, std::vector<uint32_t>>;

} // namespace doris
4 changes: 2 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
}
OlapStopWatch watch;
RETURN_IF_ERROR(_context.tablet->calc_delete_bitmap(
rowset, segments, specified_rowsets, _context.mow_context->delete_bitmap,
_context.mow_context->max_version, nullptr));
rowset, segments, specified_rowsets, _context.mow_context->delete_bitmap, nullptr,
_context.mow_context->max_version, nullptr, nullptr));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ class BetaRowsetWriter : public RowsetWriter {

int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }

// currently, a rowset cantains at most one segment, so we just return the segment's indicator maps
std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {
return _segment_creator.get_indicator_maps();
}

RowsetId rowset_id() override { return _context.rowset_id; }

RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class BetaRowsetWriterV2 : public RowsetWriter {
return Status::OK();
}

// currently, a rowset cantains at most one segment, so we just return the segment's indicator maps
std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {
return _segment_creator.get_indicator_maps();
}

Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override;

int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class RowsetWriter {

virtual int64_t num_rows_filtered() const = 0;

virtual std::shared_ptr<IndicatorMaps> get_indicator_maps() const = 0;

virtual RowsetId rowset_id() = 0;

virtual RowsetTypePB type() const = 0;
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
segstat.index_size = index_size + writer->get_inverted_index_file_size();
segstat.key_bounds = key_bounds;

if (!_indicator_maps) {
_indicator_maps.reset(new IndicatorMaps);
}
auto indicator_maps = writer->get_indicator_maps();
if (indicator_maps) {
_indicator_maps->merge(*indicator_maps);
}

writer.reset();

RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class SegmentFlusher {

int64_t num_rows_filtered() const { return _num_rows_filtered; }

std::shared_ptr<IndicatorMaps> get_indicator_maps() const { return _indicator_maps; }

Status close();

public:
Expand Down Expand Up @@ -142,6 +144,8 @@ class SegmentFlusher {
// written rows by add_block/add_row
std::atomic<int64_t> _num_rows_written = 0;
std::atomic<int64_t> _num_rows_filtered = 0;

std::shared_ptr<IndicatorMaps> _indicator_maps = nullptr;
};

class SegmentCreator {
Expand All @@ -166,6 +170,10 @@ class SegmentCreator {

int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }

std::shared_ptr<IndicatorMaps> get_indicator_maps() const {
return _segment_flusher.get_indicator_maps();
}

// Flush a block into a single segment, with pre-allocated segment_id.
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
Expand Down
Loading

0 comments on commit 9fbe9b9

Please sign in to comment.