Skip to content

Commit

Permalink
add fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Jan 3, 2025
1 parent d2de7e0 commit 22a7a46
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 36 deletions.
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Status CloudRowsetBuilder::init() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema()));

RowsetWriterContext context;
context.txn_id = _req.txn_id;
Expand Down
24 changes: 13 additions & 11 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ Status DeltaWriterV2::init() {
if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
}
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_streams[0]->tablet_schema(_req.index_id));
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_streams[0]->tablet_schema(_req.index_id)));
RowsetWriterContext context;
context.txn_id = _req.txn_id;
context.load_id = _req.load_id;
Expand Down Expand Up @@ -210,9 +210,9 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) {
return Status::OK();
}

void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
_tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
int i = 0;
Expand All @@ -236,12 +236,14 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn());
RETURN_IF_ERROR(_partial_update_info->init(
_req.tablet_id, _req.txn_id, *_tablet_schema,
table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn()));
return Status::OK();
}

} // namespace doris
6 changes: 3 additions & 3 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class DeltaWriterV2 {
Status cancel_with_status(const Status& st);

private:
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema);
Status _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema);

void _update_profile(RuntimeProfile* profile);

Expand Down
40 changes: 34 additions & 6 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>

#include "common/consts.h"
#include "common/logging.h"
#include "olap/base_tablet.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
Expand All @@ -32,12 +33,13 @@

namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, const std::string& auto_increment_column,
int32_t sequence_map_col_uid, int64_t cur_max_version) {
Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone,
const std::string& auto_increment_column,
int32_t sequence_map_col_uid, int64_t cur_max_version) {
partial_update_mode = unique_key_update_mode;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
Expand All @@ -48,6 +50,31 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema,
missing_cids.clear();
update_cids.clear();

// partial_update_cols should include all key columns
for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
const auto key_col = tablet_schema.column(i);
if (!partial_update_cols.contains(key_col.name())) {
auto msg = fmt::format(
"Unable to do partial update on shadow index's tablet, tablet_id={}, "
"txn_id={}. Missing key column {}.",
tablet_id, txn_id, key_col.name());
LOG_WARNING(msg);
return Status::Aborted<false>(msg);
}
}

// every including columns should be in tablet_schema
for (const auto& col : partial_update_cols) {
if (-1 == tablet_schema.field_index(col)) {
auto msg = fmt::format(
"Unable to do partial update on shadow index's tablet, tablet_id={}, "
"txn_id={}. Can't find column {} in tablet's schema.",
tablet_id, txn_id, col);
LOG_WARNING(msg);
return Status::Aborted<false>(msg);
}
}

for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto tablet_column = tablet_schema.column(i);
Expand Down Expand Up @@ -75,6 +102,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema,
is_fixed_partial_update() &&
partial_update_input_columns.contains(auto_increment_column);
_generate_default_values_for_missing_cids(tablet_schema);
return Status::OK();
}

void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
Expand Down
11 changes: 6 additions & 5 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ struct RowsetId;
class BitmapValue;

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1,
int64_t cur_max_version = -1);
Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema,
Expand Down
18 changes: 10 additions & 8 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ Status RowsetBuilder::init() {
};
})
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema()));
RowsetWriterContext context;
context.txn_id = _req.txn_id;
context.load_id = _req.load_id;
Expand Down Expand Up @@ -396,9 +396,9 @@ Status BaseRowsetBuilder::cancel() {
return Status::OK();
}

void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
Status BaseRowsetBuilder::_build_current_tablet_schema(
int64_t index_id, const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
// find the right index id
int i = 0;
auto indexes = table_schema_param->indexes();
Expand Down Expand Up @@ -438,13 +438,15 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(
*_tablet_schema, table_schema_param->unique_key_update_mode(),
RETURN_IF_ERROR(_partial_update_info->init(
tablet()->tablet_id(), _req.txn_id, *_tablet_schema,
table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(),
table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase);
table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase));
return Status::OK();
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/rowset_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class BaseRowsetBuilder {
Status init_mow_context(std::shared_ptr<MowContext>& mow_context);

protected:
void _build_current_tablet_schema(int64_t index_id,
Status _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema);

Expand Down

0 comments on commit 22a7a46

Please sign in to comment.