Skip to content

Commit

Permalink
[fix](move-memtable) tolerate non-open streams in close wait (#44680)
Browse files Browse the repository at this point in the history
Related PR: #44344
`VTabletWriterV2::_select_streams()` is already checking if there is
enough downstream BE to meet the replication requirements.
`VTabletWriterV2::close()` should tolerate those non-open streams on
close wait.

Debug point `VTabletWriterV2._open_streams.skip_two_backends` is added
along with `VTabletWriterV2._open_streams.skip_one_backend` to check
this behavior.
  • Loading branch information
kaijchen authored and Your Name committed Dec 7, 2024
1 parent 2543af8 commit c2b3c2a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
16 changes: 6 additions & 10 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
return Status::OK();
}
});
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
Expand All @@ -245,11 +241,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
add_failed_tablet(tablet_id, _status);
return _status;
}
DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
if (segment_id != 0) {
return Status::OK();
}
});
DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
Expand Down Expand Up @@ -339,6 +331,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i

Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
if (!_is_open.load()) {
// we don't need to close wait on non-open streams
return Status::OK();
}
if (!_is_closing.load()) {
return _status;
}
Expand Down
12 changes: 9 additions & 3 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,20 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
}

Status VTabletWriterV2::_open_streams() {
bool fault_injection_skip_be = true;
int fault_injection_skip_be = 0;
bool any_backend = false;
bool any_success = false;
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = _load_stream_map->get_or_create(dst_id);
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
if (fault_injection_skip_be) {
fault_injection_skip_be = false;
if (fault_injection_skip_be < 1) {
fault_injection_skip_be++;
continue;
}
});
DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
if (fault_injection_skip_be < 2) {
fault_injection_skip_be++;
continue;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,33 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") {
file "baseall.txt"
}

def load_with_injection = { injection, error_msg->
def load_with_injection = { injection, error_msg, success=false->
try {
sql "truncate table test"
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("Expected Exception '%s', actual success", error_msg))
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
assertTrue(e.getMessage().contains(error_msg), e.toString())
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
}

// StreamSinkFileWriter appendv write segment failed one replica
// success
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess")
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess", true)
// StreamSinkFileWriter appendv write segment failed two replica
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed")
// StreamSinkFileWriter appendv write segment failed all replica
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas")
// test segment num check when LoadStreamStub missed tail segments
load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch")
load_with_injection("LoadStreamStub.skip_send_segment", "segment num mismatch")
// test one backend open failure
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success")
load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success", true)
// test two backend open failure
load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", "not enough streams 1/3")
sql """ set enable_memtable_on_sink_node=false """
}
}

0 comments on commit c2b3c2a

Please sign in to comment.