Skip to content

Commit

Permalink
[test](load) injection cases should check Exception is thrown (#44713) (
Browse files Browse the repository at this point in the history
#45321)

backport #44713
  • Loading branch information
kaijchen authored Dec 17, 2024
1 parent b6a1803 commit 8dc8456
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 70 deletions.
9 changes: 7 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
}

Status RowsetBuilder::check_tablet_version_count() {
if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
bool injection = false;
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
{ injection = true; });
if (injection) {
// do not return if injection
} else if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::OK();
}
//trigger compaction
Expand Down
1 change: 0 additions & 1 deletion regression-test/pipeline/p0/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th
"test_refresh_mtmv," +
"test_spark_load," +
"test_broker_load_func," +
"test_stream_stub_fault_injection," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line

// this directories will not be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,67 +71,32 @@ suite("test_load_stream_back_pressure_fault_injection", "nonConcurrent") {

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
// the kill thread only means to end the test faster when the code does not behave as expected
def kill_thread = new Thread({
sleep(5000)
def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
kill_thread.start()
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
assertTrue(false, "Expected exception to be thrown")
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("wait flush token back pressure time is more than load_stream_max_wait_flush_token_time"))
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.add_segment.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.long_wait")
}

sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}

def load_with_injection = { injection, expect_errmsg ->
def load_with_injection = { injection, expect_errmsg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
// assertTrue(e.getMessage().contains("Process has no memory available")) // the msg should contain the root cause
logger.info(e.getMessage())
Expand All @@ -125,11 +126,12 @@ suite("load_stream_fault_injection", "nonConcurrent") {
}
}

def load_with_injection2 = { injection1, injection2, error_msg->
def load_with_injection2 = { injection1, injection2, error_msg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection1)
GetDebugPoint().enableDebugPointForAllBEs(injection2)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
Expand All @@ -149,16 +151,12 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
// LoadStreamWriter close_segment meet file writer failed to close error
load_with_injection("LocalFileWriter.close.failed", "")
// LoadStreamWriter close_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
// LoadStreamWriter close_writer/add_segment meet not inited error
load_with_injection("TabletStream.init.uninited_writer", "")
// LoadStreamWriter add_segment meet not bad segid error
load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
// LoadStreamWriter add_segment meet null file writer error
load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
// LoadStreamWriter add_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
// LoadStream init failed coz LoadStreamWriter init failed
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version", "")
// LoadStream add_segment meet unknown segid in request header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait")
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
assertTrue(false, "Expected Exception to be thrown")
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("cancel"))
Expand All @@ -85,4 +86,4 @@ suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") {
sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
sql """ set enable_memtable_on_sink_node=false """
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
file "baseall.txt"
}

def load_with_injection = { injection, error_msg->
def load_with_injection = { injection, error_msg, success=false->
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
sql "insert into test select * from baseall where k1 <= 3"
assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg))
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
Expand All @@ -87,8 +88,6 @@ suite("test_stream_stub_fault_injection", "nonConcurrent") {
load_with_injection("StreamSinkFileWriter.finalize.finalize_failed", "failed to send segment eos to any replicas")
// LoadStreams stream wait failed
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32")
// LoadStreams keeping stream when release
load_with_injection("LoadStreams.release.keeping_streams", "")

sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ suite("test_memtable_flush_fault", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs("FlushToken.submit_flush_error")
sql insert_sql
sql "sync"
assertTrue(false, "Expected Exception dbug_be_memtable_submit_flush_error")
} catch (Exception e){
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ suite("test_rowset_writer_fault", "nonConcurrent") {
assertEquals("fail", json.Status.toLowerCase())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
sql """ DROP TABLE IF EXISTS `baseall` """
}
}

0 comments on commit 8dc8456

Please sign in to comment.