Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 10, 2023
1 parent b9762ac commit e967fc3
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
it.rowset_ids.insert(_output_rowset->rowset_id());
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
it.partition_id, it.transaction_id, _tablet->tablet_id(),
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids);
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids, nullptr);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ Status DataDir::load() {
Status commit_txn_status = _txn_manager->commit_txn(
_meta, rowset_meta->partition_id(), rowset_meta->txn_id(),
rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(),
rowset, true, nullptr);
rowset, true);
if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id()
<< " to tablet: " << rowset_meta->tablet_id()
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
del_preds.pop();
}
Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn(
request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false,
nullptr);
request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false);
if (!commit_status.ok() && !commit_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
res = std::move(commit_status);
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
LOG(INFO) << fmt::format(
"[SegmentWriter::append_block_with_partial_content]num_rows: {}, block->columns(): {}, "
"_tablet_schema->num_key_columns(): {}, _tablet_schema->num_columns(): {}",
num_rows, block->columns(), _tablet_schema->num_key_columns(),
_tablet_schema->num_columns());
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InternalError(
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ Status RowsetBuilder::commit_txn() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
_req.load_id, _rowset, false,
_partial_update_info);
_req.load_id, _rowset, false);

if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
Expand All @@ -283,7 +282,7 @@ Status RowsetBuilder::commit_txn() {
if (_tablet->enable_unique_key_merge_on_write()) {
_storage_engine->txn_manager()->set_txn_related_delete_bitmap(
_req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->tablet_uid(), true,
_delete_bitmap, _rowset_ids);
_delete_bitmap, _rowset_ids, _partial_update_info);
}

_is_committed = true;
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3023,6 +3023,10 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
if (pos > 0) {
auto partial_update_info = rowset_writer->get_partial_update_info();
DCHECK(partial_update_info);
LOG(INFO) << fmt::format(
"[Tablet::calc_segment_delete_bitmap]before generate_new_block_for_partial_update, "
"pos: {}",
pos);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
rowset_schema, partial_update_info->missing_cids, partial_update_info->update_cids,
read_plan_ori, read_plan_update, rsid_to_rowset, &block));
Expand Down
21 changes: 9 additions & 12 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,9 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac

Status TxnManager::commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
TTransactionId transaction_id, const PUniqueId& load_id,
const RowsetSharedPtr& rowset_ptr, bool is_recovery,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
const RowsetSharedPtr& rowset_ptr, bool is_recovery) {
return commit_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
tablet->tablet_id(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery,
partial_update_info);
tablet->tablet_id(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery);
}

Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
Expand All @@ -177,11 +175,11 @@ Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr&
tablet->tablet_id(), tablet->tablet_uid());
}

void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
TTransactionId transaction_id, TTabletId tablet_id,
TabletUid tablet_uid, bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids) {
void TxnManager::set_txn_related_delete_bitmap(
TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, tablet_uid);

Expand All @@ -207,14 +205,14 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
load_info.unique_key_merge_on_write = unique_key_merge_on_write;
load_info.delete_bitmap = delete_bitmap;
load_info.rowset_ids = rowset_ids;
load_info.partial_update_info = partial_update_info;
}
}

Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id, TTabletId tablet_id,
TabletUid tablet_uid, const PUniqueId& load_id,
const RowsetSharedPtr& rowset_ptr, bool is_recovery,
std::shared_ptr<PartialUpdateInfo> partial_update_info) {
const RowsetSharedPtr& rowset_ptr, bool is_recovery) {
if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
LOG(FATAL) << "invalid commit req "
<< " partition_id=" << partition_id << " transaction_id=" << transaction_id
Expand Down Expand Up @@ -296,7 +294,6 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
load_info.unique_key_merge_on_write = true;
load_info.delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
load_info.partial_update_info = partial_update_info;
}
}
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ class TxnManager {

Status commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
TTransactionId transaction_id, const PUniqueId& load_id,
const RowsetSharedPtr& rowset_ptr, bool is_recovery,
std::shared_ptr<PartialUpdateInfo> partial_update_info);
const RowsetSharedPtr& rowset_ptr, bool is_recovery);

Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
TTransactionId transaction_id, const Version& version,
Expand All @@ -137,8 +136,7 @@ class TxnManager {

Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
const RowsetSharedPtr& rowset_ptr, bool is_recovery,
std::shared_ptr<PartialUpdateInfo> partial_update_info);
const RowsetSharedPtr& rowset_ptr, bool is_recovery);

// remove a txn from txn manager
// not persist rowset meta because
Expand Down Expand Up @@ -188,7 +186,8 @@ class TxnManager {
TTabletId tablet_id, TabletUid tablet_uid,
bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids);
const RowsetIdUnorderedSet& rowset_ids,
std::shared_ptr<PartialUpdateInfo> partial_update_info);
void get_all_commit_tablet_txn_info_by_tablet(
const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec);

Expand Down
4 changes: 2 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
rowset_meta->load_id(), rowset, false, nullptr);
rowset_meta->load_id(), rowset, false);
if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
auto err_msg = fmt::format(
"failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, "
Expand All @@ -717,7 +717,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
if (local_tablet->enable_unique_key_merge_on_write()) {
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true,
delete_bitmap, pre_rowset_ids);
delete_bitmap, pre_rowset_ids, nullptr);
}

tstatus.__set_status_code(TStatusCode::OK);
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset(
Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(),
rowset_meta->tablet_id(), tablet->tablet_uid(), rowset_meta->load_id(), rowset,
false, nullptr);
false);
if (!commit_txn_status && !commit_txn_status.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
LOG(WARNING) << "failed to add committed rowset for slave replica. rowset_id="
<< rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()
Expand Down

0 comments on commit e967fc3

Please sign in to comment.