Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix case

fix some case under cluster key dir

fix case

tmp

update

remove strict mode partial update doc case

print duplidate key in status error msg in ERROR mode
  • Loading branch information
bobhan1 committed Jan 16, 2025
1 parent 93f8da1 commit 89b3d62
Show file tree
Hide file tree
Showing 61 changed files with 935 additions and 292 deletions.
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ namespace ErrorCode {
E(KEY_NOT_FOUND, -7000, false); \
E(KEY_ALREADY_EXISTS, -7001, false); \
E(ENTRY_NOT_FOUND, -7002, false); \
E(NEW_ROWS_IN_PARTIAL_UPDATE, -7003, false); \
E(INVALID_TABLET_STATE, -7211, false); \
E(ROWSETS_EXPIRED, -7311, false); \
E(CGROUP_ERROR, -7411, false); \
Expand Down
27 changes: 27 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
}
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
if (pschema.has_partial_update_new_key_policy()) {
_partial_update_new_row_policy = pschema.partial_update_new_key_policy();
}
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
Expand Down Expand Up @@ -270,6 +275,27 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
}

if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
if (tschema.__isset.partial_update_new_key_policy) {
switch (tschema.partial_update_new_key_policy) {
case doris::TPartialUpdateNewRowPolicy::APPEND: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND;
break;
}
case doris::TPartialUpdateNewRowPolicy::ERROR: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR;
break;
}
default: {
return Status::InvalidArgument(
"Unknown partial_update_new_key_policy: {}, should be one of "
"'APPEND' or 'ERROR'",
tschema.partial_update_new_key_policy);
}
}
}
}

for (const auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
}
Expand Down Expand Up @@ -358,6 +384,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
// for backward compatibility
pschema->set_partial_update(true);
}
pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class OlapTableSchemaParam {
std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
}
PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const {
return _partial_update_new_row_policy;
}
std::string auto_increment_coulumn() const { return _auto_increment_column; }
int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
Expand All @@ -128,6 +131,8 @@ class OlapTableSchemaParam {
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT};
PartialUpdateNewRowPolicyPB _partial_update_new_row_policy {
PartialUpdateNewRowPolicyPB::APPEND};
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
Expand Down
28 changes: 28 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
}

bool is_partial_update {false};
if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
{"UPSERT", TUniqueKeyUpdateMode::UPSERT},
Expand Down Expand Up @@ -693,21 +694,48 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
}
}
request.__set_unique_key_update_mode(unique_key_update_mode);
if (unique_key_update_mode != TUniqueKeyUpdateMode::UPSERT) {
is_partial_update = true;
}
} else {
return Status::InvalidArgument(
"Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
"'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
unique_key_update_mode_str);
}
}

if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
// only consider `partial_columns` parameter when `unique_key_update_mode` is not set
if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
// for backward compatibility
request.__set_partial_update(true);
is_partial_update = true;
}
}

if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) {
if (!is_partial_update) {
return Status::InvalidArgument(
"partial_update_new_key_policy can only be set when the load is (flexible) "
"partial update.");
}
static const std::map<std::string, TPartialUpdateNewRowPolicy::type> policy_map {
{"APPEND", TPartialUpdateNewRowPolicy::APPEND},
{"ERROR", TPartialUpdateNewRowPolicy::ERROR}};

auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY);
std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(),
[](unsigned char c) { return std::toupper(c); });
auto it = policy_map.find(policy_name);
if (it == policy_map.end()) {
return Status::InvalidArgument(
"Invalid partial_update_new_key_policy {}, must be one of {'APPEND', 'ERROR'}",
policy_name);
}
request.__set_partial_update_new_key_policy(it->second);
}

if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode";
static const std::string HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY = "partial_update_new_key_policy";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
RETURN_IF_ERROR(_partial_update_info->init(
_req.tablet_id, _req.txn_id, *_tablet_schema,
table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_new_key_policy(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(),
Expand Down
101 changes: 53 additions & 48 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ namespace doris {

Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
PartialUpdateNewRowPolicyPB policy,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone,
const std::string& auto_increment_column,
int32_t sequence_map_col_uid, int64_t cur_max_version) {
partial_update_mode = unique_key_update_mode;
partial_update_new_key_policy = policy;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
sequence_map_col_unqiue_id = sequence_map_col_uid;
Expand Down Expand Up @@ -97,6 +99,7 @@ Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSc

void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
partial_update_info_pb->set_partial_update_mode(partial_update_mode);
partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
for (const auto& col : partial_update_input_columns) {
partial_update_info_pb->add_partial_update_input_columns(col);
Expand Down Expand Up @@ -133,6 +136,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
} else {
partial_update_mode = partial_update_info_pb->partial_update_mode();
}
if (partial_update_info_pb->has_partial_update_new_key_policy()) {
partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
}
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
? partial_update_info_pb->max_version_in_flush_phase()
: -1;
Expand Down Expand Up @@ -186,56 +192,55 @@ std::string PartialUpdateInfo::summary() const {
max_version_in_flush_phase);
}

Status PartialUpdateInfo::handle_not_found_error_for_fixed_partial_update(
const TabletSchema& tablet_schema) const {
if (!can_insert_new_rows_in_partial_update) {
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
const std::function<std::string()>& line,
BitmapValue* skip_bitmap) {
switch (partial_update_new_key_policy) {
case doris::PartialUpdateNewRowPolicyPB::APPEND: {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
if (!can_insert_new_rows_in_partial_update) {
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (!col.has_default_value() && !col.is_nullable() &&
!(tablet_schema.auto_increment_column() == col.name())) {
error_column = col.name();
break;
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for newly inserted rows in non-strict mode partial update",
error_column);
}
} else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
DCHECK(skip_bitmap != nullptr);
bool can_insert_new_row {true};
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() &&
!col.is_nullable() && col.is_auto_increment()) {
error_column = col.name();
can_insert_new_row = false;
break;
}
}
if (!can_insert_new_row) {
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be "
"nullable for newly inserted rows in non-strict mode flexible partial "
"update",
error_column);
}
}
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be nullable "
"for newly inserted rows in non-strict mode partial update",
error_column);
}
return Status::OK();
}

Status PartialUpdateInfo::handle_not_found_error_for_flexible_partial_update(
const TabletSchema& tablet_schema, BitmapValue* skip_bitmap) const {
DCHECK(skip_bitmap != nullptr);
bool can_insert_new_rows_in_partial_update = true;
std::string error_column;
for (auto cid : missing_cids) {
const TabletColumn& col = tablet_schema.column(cid);
if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() &&
!col.is_nullable() && col.is_auto_increment()) {
error_column = col.name();
can_insert_new_rows_in_partial_update = false;
break;
}
}
if (!can_insert_new_rows_in_partial_update) {
return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
"the unmentioned column `{}` should have default value or be "
"nullable for newly inserted rows in non-strict mode flexible partial update",
error_column);
}
return Status::OK();
}

Status PartialUpdateInfo::handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema,
BitmapValue* skip_bitmap) const {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
RETURN_IF_ERROR(handle_not_found_error_for_fixed_partial_update(tablet_schema));
} else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
RETURN_IF_ERROR(
handle_not_found_error_for_flexible_partial_update(tablet_schema, skip_bitmap));
} break;
case doris::PartialUpdateNewRowPolicyPB::ERROR: {
return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
"Can't append new rows in partial update when partial_update_new_key_policy is "
"ERROR. Row with key=[{}] is not in table.",
line());
} break;
}
return Status::OK();
}
Expand Down
13 changes: 6 additions & 7 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <functional>
#include <map>
#include <set>
#include <string>
Expand All @@ -43,19 +44,16 @@ class BitmapValue;

struct PartialUpdateInfo {
Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
UniqueKeyUpdateModePB unique_key_update_mode, PartialUpdateNewRowPolicyPB policy,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
Status handle_non_strict_mode_not_found_error(const TabletSchema& tablet_schema,
BitmapValue* skip_bitmap = nullptr) const;

Status handle_not_found_error_for_fixed_partial_update(const TabletSchema& tablet_schema) const;
Status handle_not_found_error_for_flexible_partial_update(const TabletSchema& tablet_schema,
BitmapValue* skip_bitmap) const;
Status handle_new_key(const TabletSchema& tablet_schema,
const std::function<std::string()>& line,
BitmapValue* skip_bitmap = nullptr);
std::string summary() const;

std::string partial_update_mode_str() const {
Expand Down Expand Up @@ -84,6 +82,7 @@ struct PartialUpdateInfo {

public:
UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT};
PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND};
int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
Expand Down
14 changes: 5 additions & 9 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,7 @@ Status SegmentWriter::probe_key_for_mow(
specified_rowsets, &loc, _mow_context->max_version,
segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++stats.num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else if (!have_delete_sign) {
if (!have_delete_sign) {
RETURN_IF_ERROR(not_found_cb());
}
++stats.num_rows_new_added;
Expand Down Expand Up @@ -677,8 +671,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
(delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0);

auto not_found_cb = [&]() {
return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
*_tablet_schema);
return _opts.rowset_ctx->partial_update_info->handle_new_key(
*_tablet_schema, [&]() -> std::string {
return block->dump_one_line(block_pos, _num_sort_key_columns);
});
};
auto update_read_plan = [&](const RowLocation& loc) {
read_plan.prepare_to_read(loc, segment_pos);
Expand Down
Loading

0 comments on commit 89b3d62

Please sign in to comment.