Skip to content

Commit

Permalink
[fix](move-memtable) do not retry open streams (apache#41550)
Browse files Browse the repository at this point in the history
## Proposed changes

Currently, a second sink may retry open a LoadStreamStub if a previous
sink failed to open the stream.
In some cases the load stream could receive both opens, causing the
actual open stream count to be greater than the expected total streams
count, which leads to use-after-free afterwards.

This PR disables the retry of `LoadStreamStub::open()`.
  • Loading branch information
kaijchen committed Oct 16, 2024
1 parent 9b1f290 commit 4380465
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
60 changes: 29 additions & 31 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
_is_incremental(incremental) {};

LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
if (_is_open.load() && !_is_closed.load()) {
auto ret = brpc::StreamClose(_stream_id);
LOG(INFO) << *this << " is deconstructed, close " << (ret == 0 ? "success" : "failed");
}
Expand All @@ -149,8 +149,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
int64_t idle_timeout_ms, bool enable_profile) {
std::unique_lock<bthread::Mutex> lock(_open_mutex);
if (_is_init.load()) {
return _init_st;
return _status;
}
_is_init.store(true);
_dst_id = node_info.id;
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
Expand All @@ -160,8 +161,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
brpc::Controller cntl;
if (int ret = brpc::StreamCreate(&_stream_id, cntl, &opt)) {
delete opt.handler;
_init_st = Status::Error<true>(ret, "Failed to create stream");
return _init_st;
_status = Status::Error<true>(ret, "Failed to create stream");
return _status;
}
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenLoadStreamRequest request;
Expand All @@ -174,8 +175,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
} else if (total_streams > 0) {
request.set_total_streams(total_streams);
} else {
_init_st = Status::InternalError("total_streams should be greator than 0");
return _init_st;
_status = Status::InternalError("total_streams should be greator than 0");
return _status;
}
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
Expand All @@ -199,23 +200,23 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
if (cntl.Failed()) {
brpc::StreamClose(_stream_id);
_init_st = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
cntl.ErrorText());
return _init_st;
_status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
cntl.ErrorText());
return _status;
}
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_init.store(true);
_is_open.store(true);
return Status::OK();
}

// APPEND_DATA
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, uint64_t offset, std::span<const Slice> data,
bool segment_eos, FileType file_type) {
if (!_is_init.load()) {
add_failed_tablet(tablet_id, _init_st);
return _init_st;
if (!_is_open.load()) {
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
Expand All @@ -240,9 +241,9 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t segment_id, const SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
if (!_is_init.load()) {
add_failed_tablet(tablet_id, _init_st);
return _init_st;
if (!_is_open.load()) {
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
Expand All @@ -266,8 +267,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64

// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (!_is_init.load()) {
return _init_st;
if (!_is_open.load()) {
return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
Expand All @@ -276,19 +277,19 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
_close_st = _encode_and_send(header);
if (!_close_st.ok()) {
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _close_st;
return _close_st;
_status = _encode_and_send(header);
if (!_status.ok()) {
LOG(WARNING) << "stream " << _stream_id << " close failed: " << _status;
return _status;
}
_is_closing.store(true);
return Status::OK();
}

// GET_SCHEMA
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
if (!_is_init.load()) {
return _init_st;
if (!_is_open.load()) {
return _status;
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
Expand All @@ -310,8 +311,8 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {

Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms) {
if (!_is_init.load()) {
return _init_st;
if (!_is_open.load()) {
return _status;
}
if (_tablet_schema_for_index->contains(index_id)) {
return Status::OK();
Expand All @@ -338,11 +339,8 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i

Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
if (!_is_init.load()) {
return _init_st;
}
if (!_is_closing.load()) {
return _close_st;
return _status;
}
if (_is_closed.load()) {
return _check_cancel();
Expand Down Expand Up @@ -371,7 +369,7 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {

void LoadStreamStub::cancel(Status reason) {
LOG(WARNING) << *this << " is cancelled because of " << reason;
if (_is_init.load()) {
if (_is_open.load()) {
brpc::StreamClose(_stream_id);
}
{
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {

int64_t dst_id() const { return _dst_id; }

bool is_inited() const { return _is_init.load(); }
bool is_open() const { return _is_open.load(); }

bool is_incremental() const { return _is_incremental; }

Expand Down Expand Up @@ -230,6 +230,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {

protected:
std::atomic<bool> _is_init;
std::atomic<bool> _is_open;
std::atomic<bool> _is_closing;
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
Expand All @@ -239,8 +240,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
Status _init_st = Status::InternalError<false>("Stream is not open");
Status _close_st;
Status _status = Status::InternalError<false>("Stream is not open");
Status _cancel_st;

bthread::Mutex _open_mutex;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
auto stream = _load_stream_map->at(node_id)->at(_stream_index);
for (int i = 1; i < _stream_per_node && !stream->is_inited(); i++) {
for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) {
stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node);
}
streams.emplace_back(std::move(stream));
Expand Down

0 comments on commit 4380465

Please sign in to comment.