Skip to content

Commit

Permalink
[fix](auto-partition) Fix auto partition load failure in multi replica (
Browse files Browse the repository at this point in the history
apache#36344)

## Proposed changes
One sender may incremental_open `tablets_channel` many times. but only
close one time. so don't count num_senders duplicated for one sender
instance.
  • Loading branch information
zclllyybb authored and iszhangpch committed Jun 21, 2024
1 parent 3e093b9 commit e2dc339
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 17 deletions.
9 changes: 7 additions & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
if (it != _tablets_channels.end()) {
channel = it->second;
} else {
// just for VLOG
if (_txn_id == 0) [[unlikely]] {
_txn_id = params.txn_id();
}
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
BaseStorageEngine& engine = ExecEnv::GetInstance()->storage_engine();
Expand Down Expand Up @@ -201,7 +205,8 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
// for init node, we close waiting(hang on) all close request and let them return together.
if (request.has_hang_wait() && request.hang_wait()) {
DCHECK(!channel->is_incremental_channel());
VLOG_TRACE << "reciever close waiting!" << request.sender_id();
VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id,
request.index_id(), request.sender_id());
int count = 0;
while (!channel->is_finished()) {
bthread_usleep(1000);
Expand All @@ -223,7 +228,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
VLOG_NOTICE << "load " << _load_id.to_string() << " closed tablets_channel " << index_id;
LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
Expand Down Expand Up @@ -84,6 +85,7 @@ class LoadChannel {

private:
UniqueId _load_id;
int64_t _txn_id = 0;

SpinLock _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
Expand Down
28 changes: 20 additions & 8 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <time.h>

#include "cloud/cloud_delta_writer.h"
#include "cloud/config.h"
#include <ctime>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
// IWYU pragma: no_include <bits/chrono.h>
Expand Down Expand Up @@ -134,8 +133,8 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
if (_state == kOpened || _state == kFinished) {
return Status::OK();
}
LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << request.tablets().size()
<< ", timeout(s): " << request.load_channel_timeout_s();
LOG(INFO) << fmt::format("open tablets channel of index {}, tablets num: {} timeout(s): {}",
_index_id, request.tablets().size(), request.load_channel_timeout_s());
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = std::make_shared<OlapTableSchemaParam>();
Expand All @@ -162,6 +161,9 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
} else {
_num_remaining_senders = max_sender;
}
LOG(INFO) << fmt::format(
"txn {}: TabletsChannel of index {} init senders {} with incremental {}", _txn_id,
_index_id, _num_remaining_senders, _open_by_incremental ? "on" : "off");
// just use max_sender no matter incremental or not cuz we dont know how many senders will open.
_next_seqs.resize(max_sender, 0);
_closed_senders.Reset(max_sender);
Expand All @@ -183,8 +185,16 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para

std::lock_guard<std::mutex> l(_lock);

// one sender may incremental_open many times. but only close one time. so dont count duplicately.
if (_open_by_incremental) {
_num_remaining_senders++;
if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
_recieved_senders.insert(params.sender_id());
_num_remaining_senders++;
} else if (!params.has_sender_id()) { // for compatible
_num_remaining_senders++;
}
VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
_num_remaining_senders);
}

std::vector<SlotDescriptor*>* index_slots = nullptr;
Expand Down Expand Up @@ -268,8 +278,10 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq
_num_remaining_senders--;
*finished = (_num_remaining_senders == 0);

LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
<< ", backend id: " << backend_id << " remaining sender: " << _num_remaining_senders;
LOG(INFO) << fmt::format(
"txn {}: close tablets channel of index {} , sender id: {}, backend {}, remain "
"senders: {}",
_txn_id, _index_id, sender_id, backend_id, _num_remaining_senders);

if (!*finished) {
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class BaseTabletsChannel {
bool _open_by_incremental = false;

// next sequence we expect
std::set<int32_t> _recieved_senders;
int _num_remaining_senders = 0;
std::vector<int64_t> _next_seqs;
Bitmap _closed_senders;
Expand Down
21 changes: 14 additions & 7 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <vector>

#include "cloud/config.h"
#include "olap/inverted_index_parser.h"
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
Expand Down Expand Up @@ -81,10 +80,7 @@
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/core/block.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/vtablet_block_convertor.h"
Expand Down Expand Up @@ -399,6 +395,7 @@ void VNodeChannel::_open_internal(bool is_incremental) {
request->set_allocated_id(&_parent->_load_id);
request->set_index_id(_index_channel->_index_id);
request->set_txn_id(_parent->_txn_id);
request->set_sender_id(_parent->_sender_id);
request->set_allocated_schema(_parent->_schema->to_protobuf());
if (_parent->_t_sink.olap_table_sink.__isset.storage_vault_id) {
request->set_storage_vault_id(_parent->_t_sink.olap_table_sink.storage_vault_id);
Expand Down Expand Up @@ -437,6 +434,8 @@ void VNodeChannel::_open_internal(bool is_incremental) {
if (config::tablet_writer_ignore_eovercrowded) {
open_callback->cntl_->ignore_eovercrowded();
}
VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental: {}, senders: {}",
_parent->_txn_id, _node_id, is_incremental, _parent->_num_senders);
// the real transmission here. the corresponding BE's load mgr will open load channel for it.
_stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(),
open_closure->response_.get(), open_closure.get());
Expand Down Expand Up @@ -1406,12 +1405,14 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
if (!status.ok()) {
break;
}
VLOG_TRACE << _sender_id << " first stage close start";
VLOG_TRACE << _sender_id << " first stage close start " << _txn_id;
index_channel->for_init_node_channel(
[&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) {
[&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
return;
}
VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host()
<< "mark close1 for inits " << _txn_id;
ch->mark_close(true);
if (ch->is_cancelled()) {
status = cancel_channel_and_check_intolerable_failure(
Expand All @@ -1427,6 +1428,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
return;
}
auto s = ch->close_wait(_state);
VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host()
<< "close1 wait finished!";
if (!s.ok()) {
status = cancel_channel_and_check_intolerable_failure(
status, s.to_string(), index_channel, ch);
Expand All @@ -1435,19 +1438,23 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status
if (!status.ok()) {
break;
}
VLOG_DEBUG << _sender_id << " first stage finished. closeing inc nodes " << _txn_id;
index_channel->for_inc_node_channel(
[&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) {
[&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
return;
}
// only first try close, all node channels will mark_close()
VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host()
<< "mark close2 for inc " << _txn_id;
ch->mark_close();
if (ch->is_cancelled()) {
status = cancel_channel_and_check_intolerable_failure(
status, ch->get_cancel_msg(), index_channel, ch);
}
});
} else { // not has_incremental_node_channel
VLOG_TRACE << _sender_id << " has no incremental channels " << _txn_id;
index_channel->for_each_node_channel(
[&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ message PTabletWriterOpenRequest {
optional int64 txn_expiration = 16; // Absolute time
optional bool write_file_cache = 17;
optional string storage_vault_id = 18;
optional int32 sender_id = 19;
};

message PTabletWriterOpenResult {
Expand Down

0 comments on commit e2dc339

Please sign in to comment.