Skip to content

Commit

Permalink
[branch-2.1](auto-partition) Fix auto partition load failure in multi…
Browse files Browse the repository at this point in the history
… replica (#36586)

this pr
1. picked #35630, which was reverted #36098 before.
2. picked #36344 from master

these two pr fixed existing bug about auto partition load.

---------

Co-authored-by: Kaijie Chen <[email protected]>
  • Loading branch information
zclllyybb and kaijchen authored Jun 20, 2024
1 parent 6df1a9a commit bd47d5a
Show file tree
Hide file tree
Showing 28 changed files with 492 additions and 148 deletions.
17 changes: 10 additions & 7 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() {
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;

// initial partitions
// initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);

if (!_t_param.partitions_is_fake) {
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
}
} else {
_partitions_map->emplace(
std::tuple {part->end_key.first, part->end_key.second, false}, part);
}
} else {
_partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
part);
}
}

Expand Down
28 changes: 26 additions & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace doris {
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");

LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile)
std::string sender_ip, int64_t backend_id, bool enable_profile)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_sender_ip(std::move(sender_ip)),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
Expand Down Expand Up @@ -96,6 +96,10 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
if (it != _tablets_channels.end()) {
channel = it->second;
} else {
// just for VLOG
if (_txn_id == 0) [[unlikely]] {
_txn_id = params.txn_id();
}
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
// TODO(plat1ko): CloudTabletsChannel
Expand Down Expand Up @@ -161,6 +165,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
}

// 3. handle eos
// if channel is incremental, maybe hang on close until all close request arrived.
if (request.has_eos() && request.eos()) {
st = _handle_eos(channel.get(), request, response);
_report_profile(response);
Expand All @@ -182,6 +187,24 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
auto index_id = request.index_id();

RETURN_IF_ERROR(channel->close(this, request, response, &finished));

// for init node, we close waiting(hang on) all close request and let them return together.
if (request.has_hang_wait() && request.hang_wait()) {
DCHECK(!channel->is_incremental_channel());
VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id,
request.index_id(), request.sender_id());
int count = 0;
while (!channel->is_finished()) {
bthread_usleep(1000);
count++;
}
// now maybe finished or cancelled.
VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec
return Status::InternalError("Tablets channel didn't wait all close");
}
}

if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
Expand All @@ -191,6 +214,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
Expand Down
11 changes: 3 additions & 8 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,20 @@

#pragma once

#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"

namespace doris {
Expand All @@ -52,7 +46,7 @@ class BaseTabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
const std::string& sender_ip, int64_t backend_id, bool enable_profile);
std::string sender_ip, int64_t backend_id, bool enable_profile);
~LoadChannel();

// open a new load channel if not exist
Expand Down Expand Up @@ -91,6 +85,7 @@ class LoadChannel {

private:
UniqueId _load_id;
int64_t _txn_id = 0;

SpinLock _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
Expand Down
8 changes: 0 additions & 8 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,17 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <queue>
#include <string>
#include <tuple>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel.h"
#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/thread.h"

namespace doris {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = request->total_streams();
DCHECK(_total_streams > 0) << "total streams should be greator than 0";
_is_incremental = (_total_streams == 0);

_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class LoadStream : public brpc::StreamInputHandler {
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
_open_streams[src_id]++;
if (_is_incremental) {
_total_streams++;
}
}

Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
Expand Down Expand Up @@ -167,6 +170,7 @@ class LoadStream : public brpc::StreamInputHandler {
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
QueryThreadContext _query_thread_context;
bool _is_incremental = false;
};

using LoadStreamPtr = std::unique_ptr<LoadStream>;
Expand Down
55 changes: 48 additions & 7 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <time.h>

#include <ctime>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
Expand Down Expand Up @@ -132,17 +133,40 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
if (_state == kOpened || _state == kFinished) {
return Status::OK();
}
LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << request.tablets().size()
<< ", timeout(s): " << request.load_channel_timeout_s();
LOG(INFO) << fmt::format("open tablets channel of index {}, tablets num: {} timeout(s): {}",
_index_id, request.tablets().size(), request.load_channel_timeout_s());
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();

_num_remaining_senders = request.num_senders();
_next_seqs.resize(_num_remaining_senders, 0);
_closed_senders.Reset(_num_remaining_senders);
int max_sender = request.num_senders();
/*
* a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
* there are two possibilities:
* 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be
* called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never
* be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
* the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
* return together to avoid close-then-incremental-open problem.
* 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
* (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
* distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
* then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
* and also need same number of senders' close to close. but will not hang.
*/
if (_open_by_incremental) {
DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
} else {
_num_remaining_senders = max_sender;
}
LOG(INFO) << fmt::format(
"txn {}: TabletsChannel of index {} init senders {} with incremental {}", _txn_id,
_index_id, _num_remaining_senders, _open_by_incremental ? "on" : "off");
// just use max_sender no matter incremental or not cuz we dont know how many senders will open.
_next_seqs.resize(max_sender, 0);
_closed_senders.Reset(max_sender);

RETURN_IF_ERROR(_open_all_writers(request));

Expand All @@ -152,10 +176,27 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {

Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
SCOPED_TIMER(_incremental_open_timer);
if (_state == kInitialized) { // haven't opened

// current node first opened by incremental open
if (_state == kInitialized) {
_open_by_incremental = true;
RETURN_IF_ERROR(open(params));
}

std::lock_guard<std::mutex> l(_lock);

// one sender may incremental_open many times. but only close one time. so dont count duplicately.
if (_open_by_incremental) {
if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
_recieved_senders.insert(params.sender_id());
_num_remaining_senders++;
} else if (!params.has_sender_id()) { // for compatible
_num_remaining_senders++;
}
VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
_num_remaining_senders);
}

std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include <atomic>
#include <cstdint>
#include <functional>
#include <map>
#include <mutex>
#include <ostream>
#include <shared_mutex>
Expand Down Expand Up @@ -113,6 +111,11 @@ class BaseTabletsChannel {

size_t num_rows_filtered() const { return _num_rows_filtered; }

// means this tablets in this BE is incremental opened partitions.
bool is_incremental_channel() const { return _open_by_incremental; }

bool is_finished() const { return _state == kFinished; }

protected:
Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request);

Expand Down Expand Up @@ -151,10 +154,11 @@ class BaseTabletsChannel {
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::shared_ptr<OlapTableSchemaParam> _schema;

TupleDescriptor* _tuple_desc = nullptr;
bool _open_by_incremental = false;

// next sequence we expect
std::set<int32_t> _recieved_senders;
int _num_remaining_senders = 0;
std::vector<int64_t> _next_seqs;
Bitmap _closed_senders;
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}

std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
Expand All @@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index));
_enable_unique_mow_for_index, incremental));
}
_streams_for_node[dst_id] = streams;
return streams;
Expand Down Expand Up @@ -101,10 +101,13 @@ bool LoadStreamMap::release() {
return false;
}

Status LoadStreamMap::close_load() {
return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status {
Status LoadStreamMap::close_load(bool incremental) {
return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
const auto& tablets = _tablets_to_commit[dst_id];
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
RETURN_IF_ERROR(stream->close_load(tablets));
}
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class LoadStreamMap {
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id);
std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false);

std::shared_ptr<Streams> at(int64_t dst_id);

Expand All @@ -95,7 +95,7 @@ class LoadStreamMap {

// send CLOSE_LOAD to all streams, return ERROR if any.
// only call this method after release() returns true.
Status close_load();
Status close_load(bool incremental);

private:
const UniqueId _load_id;
Expand Down
13 changes: 10 additions & 3 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler

LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map)
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
_enable_unique_mow_for_index(mow_map) {};
_enable_unique_mow_for_index(mow_map),
_is_incremental(incremental) {};

LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
Expand Down Expand Up @@ -168,7 +169,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
request.set_total_streams(total_streams);
if (_is_incremental) {
request.set_total_streams(0);
} else if (total_streams > 0) {
request.set_total_streams(total_streams);
} else {
return Status::InternalError("total_streams should be greator than 0");
}
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
Expand Down
Loading

0 comments on commit bd47d5a

Please sign in to comment.