From 78536fc97c2cdf16b9ec84dad8444bee4895c3af Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 27 Nov 2024 18:29:59 +0800 Subject: [PATCH 1/3] [fix](move-memtable) tolerate non-open streams in close wait --- be/src/vec/sink/load_stream_stub.cpp | 16 ++++++++-------- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 +++++++++--- .../test_multi_replica_fault_injection.groovy | 13 ++++++++----- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 979daf6a85e682..7bfe9f1cead055 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -221,10 +221,8 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { - if (segment_id != 0) { - return Status::OK(); - } + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { + return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); @@ -248,10 +246,8 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", { - if (segment_id != 0) { - return Status::OK(); - } + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { + return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); @@ -343,6 +339,10 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); if (!_is_closing.load()) { + if (!_is_open.load()) { + // we don't need to close wait on non-open streams + return Status::OK(); + } return _status; } if (_is_closed.load()) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 3dc58be3bcde88..cd196a8f2b3df2 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -269,14 +269,20 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { } Status VTabletWriterV2::_open_streams() { - bool fault_injection_skip_be = true; + int fault_injection_skip_be = 0; bool any_backend = false; bool any_success = false; for (auto& [dst_id, _] : _tablets_for_node) { auto streams = _load_stream_map->get_or_create(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { - if (fault_injection_skip_be) { - fault_injection_skip_be = false; + if (fault_injection_skip_be < 1) { + fault_injection_skip_be++; + continue; + } + }); + DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", { + if (fault_injection_skip_be < 2) { + fault_injection_skip_be++; continue; } }); diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 2f6afd5ca6925b..d09983d52d0dc3 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -75,14 +75,15 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { file "baseall.txt" } - def load_with_injection = { injection, error_msg-> + def load_with_injection = { injection, error_msg, success=false-> try { sql "truncate table test" GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("Expected Exception '%s', actual success", error_msg)) } catch(Exception e) { logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) + assertTrue(e.getMessage().contains(error_msg), e.toString()) } finally { GetDebugPoint().disableDebugPointForAllBEs(injection) } @@ -90,15 +91,17 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { // StreamSinkFileWriter appendv write segment failed one replica // success - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess") + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", "sucess", true) // StreamSinkFileWriter appendv write segment failed two replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", "add segment failed") // StreamSinkFileWriter appendv write segment failed all replica load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") // test segment num check when LoadStreamStub missed tail segments - load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") + load_with_injection("LoadStreamStub.skip_send_segment", "segment num mismatch") // test one backend open failure - load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success") + load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success", true) + // test two backend open failure + load_with_injection("VTabletWriterV2._open_streams.skip_two_backends", "not enough streams 1/3") sql """ set enable_memtable_on_sink_node=false """ } } From 2e9c95a776e9d88a8a159ad36ae1f161ff5a1f1a Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 27 Nov 2024 18:39:43 +0800 Subject: [PATCH 2/3] style --- be/src/vec/sink/load_stream_stub.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 7bfe9f1cead055..248a7f997735e9 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -221,9 +221,7 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { - return Status::OK(); - }); + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; @@ -246,9 +244,7 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 add_failed_tablet(tablet_id, _status); return _status; } - DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { - return Status::OK(); - }); + DBUG_EXECUTE_IF("LoadStreamStub.skip_send_segment", { return Status::OK(); }); PStreamHeader header; header.set_src_id(_src_id); *header.mutable_load_id() = _load_id; From 8e58c4b9b236ab0a719fea02d4e0b65863275cf2 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 27 Nov 2024 18:56:31 +0800 Subject: [PATCH 3/3] reorder checks --- be/src/vec/sink/load_stream_stub.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 248a7f997735e9..02767b83d00b84 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -334,11 +334,11 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); + if (!_is_open.load()) { + // we don't need to close wait on non-open streams + return Status::OK(); + } if (!_is_closing.load()) { - if (!_is_open.load()) { - // we don't need to close wait on non-open streams - return Status::OK(); - } return _status; } if (_is_closed.load()) {