Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](load) injection cases should check Exception is thrown #44713

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,13 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_cont
}

Status RowsetBuilder::check_tablet_version_count() {
if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
bool injection = false;
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
{ injection = true; });
if (injection) {
// do not return if injection
} else if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::OK();
}
//trigger compaction
Expand Down
1 change: 0 additions & 1 deletion regression-test/pipeline/p0/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th
"test_refresh_mtmv," +
"test_spark_load," +
"test_broker_load_func," +
"test_stream_stub_fault_injection," +
"test_index_compaction_failure_injection," +
"test_full_compaction_run_status," +
"test_topn_fault_injection," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,67 +71,32 @@ suite("test_load_stream_back_pressure_fault_injection", "nonConcurrent") {

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
// the kill thread only means to end the test faster when the code does not behave as expected
def kill_thread = new Thread({
sleep(5000)
def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
kill_thread.start()
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
assertTrue(false, "Expected exception to be thrown")
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("wait flush token back pressure time is more than load_stream_max_wait_flush_token_time"))
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.add_segment.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.long_wait")
}

sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}

def load_with_injection = { injection, expect_errmsg ->
def load_with_injection = { injection, expect_errmsg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
// assertTrue(e.getMessage().contains("Process has no memory available")) // the msg should contain the root cause
logger.info(e.getMessage())
Expand All @@ -125,11 +126,12 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}

def load_with_injection2 = { injection1, injection2, error_msg->
def load_with_injection2 = { injection1, injection2, error_msg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection1)
GetDebugPoint().enableDebugPointForAllBEs(injection2)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
Expand All @@ -155,8 +157,6 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LoadStreamWriter.close.inverted_writers_size_not_match", "")
load_with_injection("LoadStreamWriter.close.file_not_closed", "")
load_with_injection("LoadStreamWriter.close.inverted_file_not_closed", "")
// LoadStreamWriter close_writer meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
// LoadStreamWriter close_writer/add_segment meet not inited error
load_with_injection("TabletStream.init.uninited_writer", "")
// LoadStreamWriter init failure
Expand All @@ -168,8 +168,6 @@ suite("load_stream_fault_injection", "nonConcurrent") {
// LoadStreamWriter add_segment meet null file writer error
load_with_injection("LoadStreamWriter._calc_file_size.null_file_writer", "")
load_with_injection("LoadStreamWriter._calc_file_size.file_not_closed", "")
// LoadStreamWriter add_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
// LoadStream init failed coz LoadStreamWriter init failed
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "")
// LoadStream add_segment meet unknown segid in request header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
assertTrue(false, "Expected Exception to be thrown")
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("cancel"))
Expand All @@ -85,4 +86,4 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
file "baseall.txt"
}

def load_with_injection = { injection, error_msg->
def load_with_injection = { injection, error_msg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
Expand All @@ -87,8 +88,6 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
load_with_injection("StreamSinkFileWriter.finalize.finalize_failed", "failed to send segment eos to any replicas")
// LoadStreams stream wait failed
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32")
// LoadStreams keeping stream when release
load_with_injection("LoadStreams.release.keeping_streams", "")

sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ suite("test_memtable_flush_fault", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs("FlushToken.submit_flush_error")
sql insert_sql
sql "sync"
assertTrue(false, "Expected Exception dbug_be_memtable_submit_flush_error")
} catch (Exception e){
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ suite("test_rowset_writer_fault", "nonConcurrent") {
assertEquals("fail", json.Status.toLowerCase())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
sql """ DROP TABLE IF EXISTS `baseall` """
}
}
Loading