diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 6159c1c11a9ca6..1929ffbb78ecdf 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -141,8 +141,13 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr& mow_context) } Status RowsetBuilder::check_tablet_version_count() { - if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) || - GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { + bool injection = false; + DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version", + { injection = true; }); + if (injection) { + // do not return if injection + } else if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) || + GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { return Status::OK(); } //trigger compaction diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index 741cec08ce5bfe..080e8f8c7ae1c1 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -69,7 +69,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_refresh_mtmv," + "test_spark_load," + "test_broker_load_func," + - "test_stream_stub_fault_injection," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this directories will not be executed diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy index fc2abe7fb0ad6f..bccaa8aa62f84d 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy @@ -71,67 +71,32 @@ suite("test_load_stream_back_pressure_fault_injection", "nonConcurrent") { try { GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait") - def thread1 = new Thread({ - try { - def res = sql "insert into test select * from baseall where k1 <= 3" - logger.info(res.toString()) - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains("Communications link failure")) + // the kill thread only means to end the test faster when the code does not behave as expected + def kill_thread = new Thread({ + sleep(5000) + def processList = sql "show processlist" + logger.info(processList.toString()) + processList.each { item -> + logger.info(item[1].toString()) + logger.info(item[11].toString()) + if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ + def res = sql "kill ${item[1]}" + logger.info(res.toString()) + } } }) - thread1.start() - - sleep(1000) - - def processList = sql "show processlist" - logger.info(processList.toString()) - processList.each { item -> - logger.info(item[1].toString()) - logger.info(item[11].toString()) - if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ - def res = sql "kill ${item[1]}" - logger.info(res.toString()) - } - } + kill_thread.start() + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + assertTrue(false, "Expected exception to be thrown") } catch(Exception e) { logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("wait flush token back pressure time is more than load_stream_max_wait_flush_token_time")) } finally { GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait") } - try { - GetDebugPoint().enableDebugPointForAllBEs("TabletStream.add_segment.long_wait") - def thread1 = new Thread({ - try { - def res = sql "insert into test select * from baseall where k1 <= 3" - logger.info(res.toString()) - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains("Communications link failure")) - } - }) - thread1.start() - - sleep(1000) - - def processList = sql "show processlist" - logger.info(processList.toString()) - processList.each { item -> - logger.info(item[1].toString()) - logger.info(item[11].toString()) - if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ - def res = sql "kill ${item[1]}" - logger.info(res.toString()) - } - } - } catch(Exception e) { - logger.info(e.getMessage()) - } finally { - GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.long_wait") - } - sql """ DROP TABLE IF EXISTS `baseall` """ sql """ DROP TABLE IF EXISTS `test` """ sql """ set enable_memtable_on_sink_node=false """ -} \ No newline at end of file +} diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy index 03a61006fedcca..f74ce75639faad 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy @@ -113,10 +113,11 @@ suite("load_stream_fault_injection", "nonConcurrent") { } } - def load_with_injection = { injection, expect_errmsg -> + def load_with_injection = { injection, expect_errmsg, success=false-> try { GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg)) } catch(Exception e) { // assertTrue(e.getMessage().contains("Process has no memory available")) // the msg should contain the root cause logger.info(e.getMessage()) @@ -125,11 +126,12 @@ suite("load_stream_fault_injection", "nonConcurrent") { } } - def load_with_injection2 = { injection1, injection2, error_msg-> + def load_with_injection2 = { injection1, injection2, error_msg, success=false-> try { GetDebugPoint().enableDebugPointForAllBEs(injection1) GetDebugPoint().enableDebugPointForAllBEs(injection2) sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("expected Exception '%s', actual success", expect_errmsg)) } catch(Exception e) { logger.info(e.getMessage()) assertTrue(e.getMessage().contains(error_msg)) @@ -149,16 +151,12 @@ suite("load_stream_fault_injection", "nonConcurrent") { load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "") // LoadStreamWriter close_segment meet file writer failed to close error load_with_injection("LocalFileWriter.close.failed", "") - // LoadStreamWriter close_segment meet bytes_appended and real file size not match error - load_with_injection("FileWriter.close_segment.zero_bytes_appended", "") // LoadStreamWriter close_writer/add_segment meet not inited error load_with_injection("TabletStream.init.uninited_writer", "") // LoadStreamWriter add_segment meet not bad segid error load_with_injection("LoadStreamWriter.add_segment.bad_segid", "") // LoadStreamWriter add_segment meet null file writer error load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "") - // LoadStreamWriter add_segment meet bytes_appended and real file size not match error - load_with_injection("FileWriter.add_segment.zero_bytes_appended", "") // LoadStream init failed coz LoadStreamWriter init failed load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "") // LoadStream add_segment meet unknown segid in request header diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy index 4a87f1daf6b2d9..58b6ba4a075e33 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy @@ -74,6 +74,7 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") def res = sql "insert into test select * from baseall where k1 <= 3" logger.info(res.toString()) + assertTrue(false, "Expected Exception to be thrown") } catch(Exception e) { logger.info(e.getMessage()) assertTrue(e.getMessage().contains("cancel")) @@ -85,4 +86,4 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") { sql """ DROP TABLE IF EXISTS `baseall` """ sql """ DROP TABLE IF EXISTS `test` """ sql """ set enable_memtable_on_sink_node=false """ -} \ No newline at end of file +} diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy index 5b1f9fba05b4bc..48c32883302e40 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy @@ -69,10 +69,11 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") { file "baseall.txt" } - def load_with_injection = { injection, error_msg-> + def load_with_injection = { injection, error_msg, success=false-> try { GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg)) } catch(Exception e) { logger.info(e.getMessage()) assertTrue(e.getMessage().contains(error_msg)) @@ -87,8 +88,6 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") { load_with_injection("StreamSinkFileWriter.finalize.finalize_failed", "failed to send segment eos to any replicas") // LoadStreams stream wait failed load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32") - // LoadStreams keeping stream when release - load_with_injection("LoadStreams.release.keeping_streams", "") sql """ DROP TABLE IF EXISTS `baseall` """ sql """ DROP TABLE IF EXISTS `test` """ diff --git a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy index 5b954f1171ce7c..0ba2f0863e2e6b 100644 --- a/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy +++ b/regression-test/suites/fault_injection_p0/test_memtable_flush_fault.groovy @@ -55,6 +55,7 @@ suite("test_memtable_flush_fault", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("FlushToken.submit_flush_error") sql insert_sql sql "sync" + assertTrue(false, "Expected Exception dbug_be_memtable_submit_flush_error") } catch (Exception e){ logger.info(e.getMessage()) assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error")) diff --git a/regression-test/suites/fault_injection_p0/test_rowset_writer_fault.groovy b/regression-test/suites/fault_injection_p0/test_rowset_writer_fault.groovy index 005e4b6bc974d8..0e5113f2a6ed14 100644 --- a/regression-test/suites/fault_injection_p0/test_rowset_writer_fault.groovy +++ b/regression-test/suites/fault_injection_p0/test_rowset_writer_fault.groovy @@ -56,11 +56,8 @@ suite("test_rowset_writer_fault", "nonConcurrent") { assertEquals("fail", json.Status.toLowerCase()) } } - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) } finally { GetDebugPoint().disableDebugPointForAllBEs(injection) } sql """ DROP TABLE IF EXISTS `baseall` """ -} \ No newline at end of file +}