From 3c7687a7e4893069d22470f0628da27bbbf65471 Mon Sep 17 00:00:00 2001 From: packy92 <110370499+packy92@users.noreply.github.com> Date: Tue, 4 Jun 2024 17:40:53 +0800 Subject: [PATCH 1/2] [UT] fix unstable ut (#46617) Signed-off-by: packy92 (cherry picked from commit 92604ff78c4cdc9876086c9246a865a45b0a8e9c) # Conflicts: # test/lib/sr_sql_lib.py --- test/lib/sr_sql_lib.py | 425 ++++++++++++++++++ .../R/test_clear_stats | 8 +- .../T/test_clear_stats | 5 +- 3 files changed, 428 insertions(+), 10 deletions(-) diff --git a/test/lib/sr_sql_lib.py b/test/lib/sr_sql_lib.py index 2bbecfea7dce2..aedb0963fd9c9 100644 --- a/test/lib/sr_sql_lib.py +++ b/test/lib/sr_sql_lib.py @@ -994,3 +994,428 @@ def check_es_table_metadata_ready(self, table_name): time.sleep(10) count += 1 tools.assert_true(False, "check es table metadata 600s timeout") +<<<<<<< HEAD +======= + + def _stream_load(self, label, database_name, table_name, filepath, headers=None, meta_sync=True): + """ """ + url = ( + "http://" + + self.mysql_host + + ":" + + self.http_port + + "/api/" + + database_name + + "/" + + table_name + + "/_stream_load" + ) + params = [ + "curl", + "--location-trusted", + "-u", + "%s:%s" % (self.mysql_user, self.mysql_password), + "-T", + filepath, + "-XPUT", + "-H", + "label:%s" % label, + ] + + if headers: + for k, v in headers.items(): + params.append("-H") + params.append("%s:%s" % (k, v)) + + params.append(url) + stream_load_sql = " ".join(param for param in params) + log.info(stream_load_sql) + + cmd_res = subprocess.run( + params, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + timeout=120, + ) + + log.info(cmd_res) + if cmd_res.returncode != 0: + return {"Status": "CommandFail", "Message": cmd_res.stderr} + + res = json.loads(cmd_res.stdout) + + if meta_sync: + self.meta_sync() + + if res["Status"] == "Publish Timeout": + cmd = "curl -s --location-trusted -u %s:%s http://%s:%s/api/%s/get_load_state?label=%s" % ( + self.mysql_user, + self.mysql_password, + self.mysql_host, + self.http_port, + database_name, + label, + ) + print(cmd) + cmd = cmd.split(" ") + for i in range(60): + time.sleep(3) + res = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + timeout=60, + ) + + if res.returncode != 0: + return {"Status": "CommandFail", "Message": res.stderr} + + res = json.loads(res.stdout) + if res["state"] == "VISIBLE": + break + else: + log.error(res) + res["Status"] = "Failed" + return res + + def prepare_data(self, data_name, db): + """ load data """ + tools.assert_in(data_name, ["ssb", "tpch", "tpcds"], "Unsupported data!") + + # create tables + create_table_sqls = self.get_sql_from_file("create.sql", dir_path=os.path.join(common_sql_path, data_name)) + res = self.execute_sql(create_table_sqls, True) + tools.assert_true(res["status"], "create %s table error, %s" % (data_name, res["msg"])) + # load data + data_files = self.get_common_data_files(data_name) + for data in data_files: + if ".gitkeep" in data: + continue + label = "%s_load_label_%s" % (data_name, uuid.uuid1().hex) + file_name = data.split("/")[-1] + table_name = file_name.split(".")[0] + log.info("Load %s..." % table_name) + headers = {"column_separator": "|"} + + # tpcds prepare + if data_name == "tpcds": + switch = { + "web_returns": "wr_returned_date_sk,wr_returned_time_sk,wr_item_sk,\ + wr_refunded_customer_sk,wr_refunded_cdemo_sk,wr_refunded_hdemo_sk,\ + wr_refunded_addr_sk,wr_returning_customer_sk,wr_returning_cdemo_sk,\ + wr_returning_hdemo_sk,wr_returning_addr_sk,wr_web_page_sk,wr_reason_sk,\ + wr_order_number,wr_return_quantity,wr_return_amt,wr_return_tax,\ + wr_return_amt_inc_tax,wr_fee,wr_return_ship_cost,wr_refunded_cash,\ + wr_reversed_charge,wr_account_credit,wr_net_loss", + "web_sales": "ws_sold_date_sk,ws_sold_time_sk,ws_ship_date_sk,ws_item_sk,\ + ws_bill_customer_sk,ws_bill_cdemo_sk,ws_bill_hdemo_sk,ws_bill_addr_sk,\ + ws_ship_customer_sk,ws_ship_cdemo_sk,ws_ship_hdemo_sk,ws_ship_addr_sk,\ + ws_web_page_sk,ws_web_site_sk,ws_ship_mode_sk,ws_warehouse_sk,ws_promo_sk,\ + ws_order_number,ws_quantity,ws_wholesale_cost,ws_list_price,ws_sales_price,\ + ws_ext_discount_amt,ws_ext_sales_price,ws_ext_wholesale_cost,ws_ext_list_price,\ + ws_ext_tax,ws_coupon_amt,ws_ext_ship_cost,ws_net_paid,ws_net_paid_inc_tax,\ + ws_net_paid_inc_ship,ws_net_paid_inc_ship_tax,ws_net_profit", + "catalog_returns": "cr_returned_date_sk,cr_returned_time_sk,cr_item_sk,cr_refunded_customer_sk,\ + cr_refunded_cdemo_sk,cr_refunded_hdemo_sk,cr_refunded_addr_sk,\ + cr_returning_customer_sk,cr_returning_cdemo_sk,cr_returning_hdemo_sk,\ + cr_returning_addr_sk,cr_call_center_sk,cr_catalog_page_sk,cr_ship_mode_sk,\ + cr_warehouse_sk,cr_reason_sk,cr_order_number,cr_return_quantity,cr_return_amount,\ + cr_return_tax,cr_return_amt_inc_tax,cr_fee,cr_return_ship_cost,cr_refunded_cash,\ + cr_reversed_charge,cr_store_credit,cr_net_loss", + "catalog_sales": "cs_sold_date_sk,cs_sold_time_sk,cs_ship_date_sk,cs_bill_customer_sk,cs_bill_cdemo_sk,\ + cs_bill_hdemo_sk,cs_bill_addr_sk,cs_ship_customer_sk,cs_ship_cdemo_sk,cs_ship_hdemo_sk,\ + cs_ship_addr_sk,cs_call_center_sk,cs_catalog_page_sk,cs_ship_mode_sk,cs_warehouse_sk,\ + cs_item_sk,cs_promo_sk,cs_order_number,cs_quantity,cs_wholesale_cost,cs_list_price,\ + cs_sales_price,cs_ext_discount_amt,cs_ext_sales_price,cs_ext_wholesale_cost,\ + cs_ext_list_price,cs_ext_tax,cs_coupon_amt,cs_ext_ship_cost,cs_net_paid,\ + cs_net_paid_inc_tax,cs_net_paid_inc_ship,cs_net_paid_inc_ship_tax,cs_net_profit", + "store_returns": "sr_returned_date_sk,sr_return_time_sk,sr_item_sk,sr_customer_sk,sr_cdemo_sk,\ + sr_hdemo_sk,sr_addr_sk,sr_store_sk,sr_reason_sk,sr_ticket_number,sr_return_quantity,\ + sr_return_amt,sr_return_tax,sr_return_amt_inc_tax,sr_fee,sr_return_ship_cost,\ + sr_refunded_cash,sr_reversed_charge,sr_store_credit,sr_net_loss", + "store_sales": "ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk,ss_hdemo_sk,\ + ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_quantity,ss_wholesale_cost,\ + ss_list_price,ss_sales_price,ss_ext_discount_amt,ss_ext_sales_price,ss_ext_wholesale_cost,\ + ss_ext_list_price,ss_ext_tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit", + } + if table_name in switch.keys(): + headers["columns"] = switch[table_name] + + res = self._stream_load(label, db, table_name, data, headers) + tools.assert_equal(res["Status"], "Success", "Prepare %s data error: %s" % (data_name, res["Message"])) + + def execute_cmd(self, exec_url): + cmd_template = "curl -XPOST -u {user}:{passwd} '" + exec_url + "'" + cmd = cmd_template.format(user=self.mysql_user, passwd=self.mysql_password) + res = subprocess.run( + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=1800 + ) + return str(res) + + def post_http_request(self, exec_url) -> str: + """Sends a POST request. + + Returns: + the response content. + """ + res = requests.post(exec_url, auth=HTTPBasicAuth(self.mysql_user, self.mysql_password)) + tools.assert_equal(200, res.status_code, f"failed to post http request [res={res}] [url={exec_url}]") + return res.content.decode("utf-8") + + def manual_compact(self, database_name, table_name): + sql = "show tablet from " + database_name + "." + table_name + res = self.execute_sql(sql, "dml") + tools.assert_true(res["status"], res["msg"]) + url = res["result"][0][20] + + pos = url.find("api") + exec_url = url[0:pos] + "api/update_config?min_cumulative_compaction_num_singleton_deltas=0" + res = self.execute_cmd(exec_url) + print(res) + + exec_url = url[0:pos] + "api/update_config?base_compaction_interval_seconds_since_last_operation=0" + res = self.execute_cmd(exec_url) + print(res) + + exec_url = url[0:pos] + "api/update_config?cumulative_compaction_skip_window_seconds=1" + res = self.execute_cmd(exec_url) + print(res) + + exec_url = url.replace("compaction/show", "compact") + "&compaction_type=cumulative" + res = self.execute_cmd(exec_url) + print(res) + + exec_url = url.replace("compaction/show", "compact") + "&compaction_type=base" + res = self.execute_cmd(exec_url) + print(res) + + def wait_analyze_finish(self, database_name, table_name, sql): + timeout = 300 + analyze_sql = "show analyze status where `Database` = 'default_catalog.%s'" % database_name + res = self.execute_sql(analyze_sql, "dml") + while timeout > 0: + res = self.execute_sql(analyze_sql, "dml") + if len(res["result"]) > 0: + for table in res["result"]: + if table[2] == table_name and table[4] == "FULL" and table[6] == "SUCCESS": + break + break + else: + time.sleep(1) + timeout -= 1 + else: + tools.assert_true(False, "analyze timeout") + + finished = False + counter = 0 + while True: + res = self.execute_sql(sql, "dml") + tools.assert_true(res["status"], res["msg"]) + if str(res["result"]).find("Decode") > 0: + finished = True + break + time.sleep(5) + if counter > 10: + break + counter = counter + 1 + + tools.assert_true(finished, "analyze timeout") + + def wait_compaction_finish(self, table_name: str, expected_num_segments: int): + timeout = 300 + scan_table_sql = f"SELECT /*+SET_VAR(enable_profile=true,enable_async_profile=false)*/ COUNT(1) FROM {table_name}" + fetch_segments_sql = r""" + with profile as ( + select unnest as line from (values(1))t(v) join unnest(split(get_query_profile(last_query_id()), "\n")) + ) + select regexp_extract(line, ".*- SegmentsReadCount: (?:.*\\()?(\\d+)\\)?", 1) as value + from profile + where line like "%- SegmentsReadCount%" + """ + + while timeout > 0: + res = self.execute_sql(scan_table_sql) + tools.assert_true(res["status"], f'Fail to execute scan_table_sql, error=[{res["msg"]}]') + + res = self.execute_sql(fetch_segments_sql) + tools.assert_true(res["status"], f'Fail to execute fetch_segments_sql, error=[{res["msg"]}]') + + if res["result"] == str(expected_num_segments): + break + + time.sleep(1) + timeout -= 1 + else: + tools.assert_true(False, "wait compaction timeout") + + def _get_backend_http_endpoints(self) -> List[Dict]: + """Get the http host and port of all the backends. + + Returns: + a dict list, each of which contains the key "host" and "host" of a backend. + """ + res = self.execute_sql("show backends;", ori=True) + tools.assert_true(res["status"], res["msg"]) + + backends = [] + for row in res["result"]: + backends.append({ + "host": row[1], + "port": row[4], + }) + + return backends + + def update_be_config(self, key, value): + """Update the config to all the backends. + """ + backends = self._get_backend_http_endpoints() + for backend in backends: + exec_url = f"http://{backend['host']}:{backend['port']}/api/update_config?{key}={value}" + print(f"post {exec_url}") + res = self.post_http_request(exec_url) + + res_json = json.loads(res) + tools.assert_dict_contains_subset({"status": "OK"}, res_json, + f"failed to update be config [response={res}] [url={exec_url}]") + + def assert_table_cardinality(self, sql, rows): + """ + assert table with an expected row counts + """ + res = self.execute_sql(sql, True) + expect = r"cardinality=" + rows + match = re.search(expect, str(res["result"])) + print(expect) + tools.assert_true(match, "expected cardinality: " + rows + ". but found: " + str(res["result"])) + + def wait_refresh_dictionary_finish(self, name, check_status): + """ + wait dictionary refresh job finish and return status + """ + status = "" + while True: + res = self.execute_sql( + "SHOW DICTIONARY %s" % name, + True, + ) + + status = res["result"][0][6] + if status != ("REFRESHING") and status != ("COMMITTING"): + break + time.sleep(0.5) + tools.assert_equal(check_status, status, "wait refresh dictionary finish error") + + def set_first_tablet_bad_and_recover(self, table_name): + """ + set table first tablet as bad replica and recover until success + """ + res = self.execute_sql( + "SHOW TABLET FROM %s" % table_name, + True, + ) + + tablet_id = res["result"][0][0] + backend_id = res["result"][0][2] + + res = self.execute_sql( + "ADMIN SET REPLICA STATUS PROPERTIES('tablet_id' = '%s', 'backend_id' = '%s', 'status' = 'bad')" % (tablet_id, backend_id), + True, + ) + + time.sleep(20) + + while True: + res = self.execute_sql( + "SHOW TABLET FROM %s" % table_name, + True, + ) + + if len(res["result"]) != 2: + time.sleep(0.5) + else: + break + def assert_explain_contains(self, query, *expects): + """ + assert explain result contains expect string + """ + sql = "explain %s" % (query) + res = self.execute_sql(sql, True) + for expect in expects: + tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan" % (expect)) + + def assert_explain_not_contains(self, query, *expects): + """ + assert explain result contains expect string + """ + sql = "explain %s" % (query) + res = self.execute_sql(sql, True) + for expect in expects: + tools.assert_true(str(res["result"]).find(expect) == -1, "assert expect %s is found in plan" % (expect)) + + def assert_explain_costs_contains(self, query, *expects): + """ + assert explain costs result contains expect string + """ + sql = "explain costs %s" % (query) + res = self.execute_sql(sql, True) + for expect in expects: + tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan" % (expect)) + + def assert_trace_values_contains(self, query, *expects): + """ + assert trace values result contains expect string + """ + sql = "trace values %s" % (query) + res = self.execute_sql(sql, True) + for expect in expects: + tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan, error msg is %s" % (expect, str(res["result"]))) + + def assert_prepare_execute(self, db, query, params=()): + conn = mysql.connector.connect( + host=self.mysql_host, + user=self.mysql_user, + password="", + port=self.mysql_port, + database=db + ) + cursor = conn.cursor(prepared=True) + + try: + if params: + cursor.execute(query, ['2']) + else: + cursor.execute(query) + cursor.fetchall() + except mysql.connector.Error as e: + tools.assert_true(1 == 0, e) + + finally: + cursor.close() + conn.close() + + def assert_trace_times_contains(self, query, *expects): + """ + assert trace times result contains expect string + """ + sql = "trace times %s" % (query) + res = self.execute_sql(sql, True) + for expect in expects: + tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan, error msg is %s" % (expect, str(res["result"]))) + + def assert_clear_stale_stats(self, query, expect_num): + timeout = 300 + num = 0; + while timeout > 0: + res = self.execute_sql(query) + num = res["result"] + if int(num) < expect_num: + break; + time.sleep(10) + timeout -= 10 + else: + tools.assert_true(False, "clear stale column stats timeout. The number of stale column stats is %s" % num) + +>>>>>>> 92604ff78c ([UT] fix unstable ut (#46617)) diff --git a/test/sql/test_refresh_statistics/R/test_clear_stats b/test/sql/test_refresh_statistics/R/test_clear_stats index 84c164d542c89..0119927488015 100644 --- a/test/sql/test_refresh_statistics/R/test_clear_stats +++ b/test/sql/test_refresh_statistics/R/test_clear_stats @@ -65,13 +65,9 @@ analyze table test_clear_stats.tbl WITH SYNC MODE; -- result: test_clear_stats.tbl analyze status OK -- !result -select sleep(80); +function: assert_clear_stale_stats("select count(*) from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'", 3) -- result: -1 --- !result -select count(*) <= 3 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; --- result: -1 +None -- !result ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "43200"); -- result: diff --git a/test/sql/test_refresh_statistics/T/test_clear_stats b/test/sql/test_refresh_statistics/T/test_clear_stats index bc2a4a77c728c..47e127cff3e4d 100644 --- a/test/sql/test_refresh_statistics/T/test_clear_stats +++ b/test/sql/test_refresh_statistics/T/test_clear_stats @@ -33,11 +33,8 @@ analyze table test_clear_stats.tbl WITH SYNC MODE; truncate table tbl; insert into tbl values (1, 1), (2, 2), (3, 3), (4, 4); analyze table test_clear_stats.tbl WITH SYNC MODE; -select sleep(80); -select count(*) <= 3 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; +function: assert_clear_stale_stats("select count(*) from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'", 3) ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "43200"); ADMIN SET FRONTEND CONFIG ("statistic_manager_sleep_time_sec" = "60"); - - From fa06a84281f92bb282796111831480eda4337200 Mon Sep 17 00:00:00 2001 From: packy92 Date: Wed, 5 Jun 2024 10:27:08 +0800 Subject: [PATCH 2/2] resolve conflict Signed-off-by: packy92 --- test/lib/sr_sql_lib.py | 414 +---------------------------------------- 1 file changed, 1 insertion(+), 413 deletions(-) diff --git a/test/lib/sr_sql_lib.py b/test/lib/sr_sql_lib.py index aedb0963fd9c9..72e10b2df4ed8 100644 --- a/test/lib/sr_sql_lib.py +++ b/test/lib/sr_sql_lib.py @@ -994,416 +994,6 @@ def check_es_table_metadata_ready(self, table_name): time.sleep(10) count += 1 tools.assert_true(False, "check es table metadata 600s timeout") -<<<<<<< HEAD -======= - - def _stream_load(self, label, database_name, table_name, filepath, headers=None, meta_sync=True): - """ """ - url = ( - "http://" - + self.mysql_host - + ":" - + self.http_port - + "/api/" - + database_name - + "/" - + table_name - + "/_stream_load" - ) - params = [ - "curl", - "--location-trusted", - "-u", - "%s:%s" % (self.mysql_user, self.mysql_password), - "-T", - filepath, - "-XPUT", - "-H", - "label:%s" % label, - ] - - if headers: - for k, v in headers.items(): - params.append("-H") - params.append("%s:%s" % (k, v)) - - params.append(url) - stream_load_sql = " ".join(param for param in params) - log.info(stream_load_sql) - - cmd_res = subprocess.run( - params, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - timeout=120, - ) - - log.info(cmd_res) - if cmd_res.returncode != 0: - return {"Status": "CommandFail", "Message": cmd_res.stderr} - - res = json.loads(cmd_res.stdout) - - if meta_sync: - self.meta_sync() - - if res["Status"] == "Publish Timeout": - cmd = "curl -s --location-trusted -u %s:%s http://%s:%s/api/%s/get_load_state?label=%s" % ( - self.mysql_user, - self.mysql_password, - self.mysql_host, - self.http_port, - database_name, - label, - ) - print(cmd) - cmd = cmd.split(" ") - for i in range(60): - time.sleep(3) - res = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - timeout=60, - ) - - if res.returncode != 0: - return {"Status": "CommandFail", "Message": res.stderr} - - res = json.loads(res.stdout) - if res["state"] == "VISIBLE": - break - else: - log.error(res) - res["Status"] = "Failed" - return res - - def prepare_data(self, data_name, db): - """ load data """ - tools.assert_in(data_name, ["ssb", "tpch", "tpcds"], "Unsupported data!") - - # create tables - create_table_sqls = self.get_sql_from_file("create.sql", dir_path=os.path.join(common_sql_path, data_name)) - res = self.execute_sql(create_table_sqls, True) - tools.assert_true(res["status"], "create %s table error, %s" % (data_name, res["msg"])) - # load data - data_files = self.get_common_data_files(data_name) - for data in data_files: - if ".gitkeep" in data: - continue - label = "%s_load_label_%s" % (data_name, uuid.uuid1().hex) - file_name = data.split("/")[-1] - table_name = file_name.split(".")[0] - log.info("Load %s..." % table_name) - headers = {"column_separator": "|"} - - # tpcds prepare - if data_name == "tpcds": - switch = { - "web_returns": "wr_returned_date_sk,wr_returned_time_sk,wr_item_sk,\ - wr_refunded_customer_sk,wr_refunded_cdemo_sk,wr_refunded_hdemo_sk,\ - wr_refunded_addr_sk,wr_returning_customer_sk,wr_returning_cdemo_sk,\ - wr_returning_hdemo_sk,wr_returning_addr_sk,wr_web_page_sk,wr_reason_sk,\ - wr_order_number,wr_return_quantity,wr_return_amt,wr_return_tax,\ - wr_return_amt_inc_tax,wr_fee,wr_return_ship_cost,wr_refunded_cash,\ - wr_reversed_charge,wr_account_credit,wr_net_loss", - "web_sales": "ws_sold_date_sk,ws_sold_time_sk,ws_ship_date_sk,ws_item_sk,\ - ws_bill_customer_sk,ws_bill_cdemo_sk,ws_bill_hdemo_sk,ws_bill_addr_sk,\ - ws_ship_customer_sk,ws_ship_cdemo_sk,ws_ship_hdemo_sk,ws_ship_addr_sk,\ - ws_web_page_sk,ws_web_site_sk,ws_ship_mode_sk,ws_warehouse_sk,ws_promo_sk,\ - ws_order_number,ws_quantity,ws_wholesale_cost,ws_list_price,ws_sales_price,\ - ws_ext_discount_amt,ws_ext_sales_price,ws_ext_wholesale_cost,ws_ext_list_price,\ - ws_ext_tax,ws_coupon_amt,ws_ext_ship_cost,ws_net_paid,ws_net_paid_inc_tax,\ - ws_net_paid_inc_ship,ws_net_paid_inc_ship_tax,ws_net_profit", - "catalog_returns": "cr_returned_date_sk,cr_returned_time_sk,cr_item_sk,cr_refunded_customer_sk,\ - cr_refunded_cdemo_sk,cr_refunded_hdemo_sk,cr_refunded_addr_sk,\ - cr_returning_customer_sk,cr_returning_cdemo_sk,cr_returning_hdemo_sk,\ - cr_returning_addr_sk,cr_call_center_sk,cr_catalog_page_sk,cr_ship_mode_sk,\ - cr_warehouse_sk,cr_reason_sk,cr_order_number,cr_return_quantity,cr_return_amount,\ - cr_return_tax,cr_return_amt_inc_tax,cr_fee,cr_return_ship_cost,cr_refunded_cash,\ - cr_reversed_charge,cr_store_credit,cr_net_loss", - "catalog_sales": "cs_sold_date_sk,cs_sold_time_sk,cs_ship_date_sk,cs_bill_customer_sk,cs_bill_cdemo_sk,\ - cs_bill_hdemo_sk,cs_bill_addr_sk,cs_ship_customer_sk,cs_ship_cdemo_sk,cs_ship_hdemo_sk,\ - cs_ship_addr_sk,cs_call_center_sk,cs_catalog_page_sk,cs_ship_mode_sk,cs_warehouse_sk,\ - cs_item_sk,cs_promo_sk,cs_order_number,cs_quantity,cs_wholesale_cost,cs_list_price,\ - cs_sales_price,cs_ext_discount_amt,cs_ext_sales_price,cs_ext_wholesale_cost,\ - cs_ext_list_price,cs_ext_tax,cs_coupon_amt,cs_ext_ship_cost,cs_net_paid,\ - cs_net_paid_inc_tax,cs_net_paid_inc_ship,cs_net_paid_inc_ship_tax,cs_net_profit", - "store_returns": "sr_returned_date_sk,sr_return_time_sk,sr_item_sk,sr_customer_sk,sr_cdemo_sk,\ - sr_hdemo_sk,sr_addr_sk,sr_store_sk,sr_reason_sk,sr_ticket_number,sr_return_quantity,\ - sr_return_amt,sr_return_tax,sr_return_amt_inc_tax,sr_fee,sr_return_ship_cost,\ - sr_refunded_cash,sr_reversed_charge,sr_store_credit,sr_net_loss", - "store_sales": "ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk,ss_hdemo_sk,\ - ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_quantity,ss_wholesale_cost,\ - ss_list_price,ss_sales_price,ss_ext_discount_amt,ss_ext_sales_price,ss_ext_wholesale_cost,\ - ss_ext_list_price,ss_ext_tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit", - } - if table_name in switch.keys(): - headers["columns"] = switch[table_name] - - res = self._stream_load(label, db, table_name, data, headers) - tools.assert_equal(res["Status"], "Success", "Prepare %s data error: %s" % (data_name, res["Message"])) - - def execute_cmd(self, exec_url): - cmd_template = "curl -XPOST -u {user}:{passwd} '" + exec_url + "'" - cmd = cmd_template.format(user=self.mysql_user, passwd=self.mysql_password) - res = subprocess.run( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=1800 - ) - return str(res) - - def post_http_request(self, exec_url) -> str: - """Sends a POST request. - - Returns: - the response content. - """ - res = requests.post(exec_url, auth=HTTPBasicAuth(self.mysql_user, self.mysql_password)) - tools.assert_equal(200, res.status_code, f"failed to post http request [res={res}] [url={exec_url}]") - return res.content.decode("utf-8") - - def manual_compact(self, database_name, table_name): - sql = "show tablet from " + database_name + "." + table_name - res = self.execute_sql(sql, "dml") - tools.assert_true(res["status"], res["msg"]) - url = res["result"][0][20] - - pos = url.find("api") - exec_url = url[0:pos] + "api/update_config?min_cumulative_compaction_num_singleton_deltas=0" - res = self.execute_cmd(exec_url) - print(res) - - exec_url = url[0:pos] + "api/update_config?base_compaction_interval_seconds_since_last_operation=0" - res = self.execute_cmd(exec_url) - print(res) - - exec_url = url[0:pos] + "api/update_config?cumulative_compaction_skip_window_seconds=1" - res = self.execute_cmd(exec_url) - print(res) - - exec_url = url.replace("compaction/show", "compact") + "&compaction_type=cumulative" - res = self.execute_cmd(exec_url) - print(res) - - exec_url = url.replace("compaction/show", "compact") + "&compaction_type=base" - res = self.execute_cmd(exec_url) - print(res) - - def wait_analyze_finish(self, database_name, table_name, sql): - timeout = 300 - analyze_sql = "show analyze status where `Database` = 'default_catalog.%s'" % database_name - res = self.execute_sql(analyze_sql, "dml") - while timeout > 0: - res = self.execute_sql(analyze_sql, "dml") - if len(res["result"]) > 0: - for table in res["result"]: - if table[2] == table_name and table[4] == "FULL" and table[6] == "SUCCESS": - break - break - else: - time.sleep(1) - timeout -= 1 - else: - tools.assert_true(False, "analyze timeout") - - finished = False - counter = 0 - while True: - res = self.execute_sql(sql, "dml") - tools.assert_true(res["status"], res["msg"]) - if str(res["result"]).find("Decode") > 0: - finished = True - break - time.sleep(5) - if counter > 10: - break - counter = counter + 1 - - tools.assert_true(finished, "analyze timeout") - - def wait_compaction_finish(self, table_name: str, expected_num_segments: int): - timeout = 300 - scan_table_sql = f"SELECT /*+SET_VAR(enable_profile=true,enable_async_profile=false)*/ COUNT(1) FROM {table_name}" - fetch_segments_sql = r""" - with profile as ( - select unnest as line from (values(1))t(v) join unnest(split(get_query_profile(last_query_id()), "\n")) - ) - select regexp_extract(line, ".*- SegmentsReadCount: (?:.*\\()?(\\d+)\\)?", 1) as value - from profile - where line like "%- SegmentsReadCount%" - """ - - while timeout > 0: - res = self.execute_sql(scan_table_sql) - tools.assert_true(res["status"], f'Fail to execute scan_table_sql, error=[{res["msg"]}]') - - res = self.execute_sql(fetch_segments_sql) - tools.assert_true(res["status"], f'Fail to execute fetch_segments_sql, error=[{res["msg"]}]') - - if res["result"] == str(expected_num_segments): - break - - time.sleep(1) - timeout -= 1 - else: - tools.assert_true(False, "wait compaction timeout") - - def _get_backend_http_endpoints(self) -> List[Dict]: - """Get the http host and port of all the backends. - - Returns: - a dict list, each of which contains the key "host" and "host" of a backend. - """ - res = self.execute_sql("show backends;", ori=True) - tools.assert_true(res["status"], res["msg"]) - - backends = [] - for row in res["result"]: - backends.append({ - "host": row[1], - "port": row[4], - }) - - return backends - - def update_be_config(self, key, value): - """Update the config to all the backends. - """ - backends = self._get_backend_http_endpoints() - for backend in backends: - exec_url = f"http://{backend['host']}:{backend['port']}/api/update_config?{key}={value}" - print(f"post {exec_url}") - res = self.post_http_request(exec_url) - - res_json = json.loads(res) - tools.assert_dict_contains_subset({"status": "OK"}, res_json, - f"failed to update be config [response={res}] [url={exec_url}]") - - def assert_table_cardinality(self, sql, rows): - """ - assert table with an expected row counts - """ - res = self.execute_sql(sql, True) - expect = r"cardinality=" + rows - match = re.search(expect, str(res["result"])) - print(expect) - tools.assert_true(match, "expected cardinality: " + rows + ". but found: " + str(res["result"])) - - def wait_refresh_dictionary_finish(self, name, check_status): - """ - wait dictionary refresh job finish and return status - """ - status = "" - while True: - res = self.execute_sql( - "SHOW DICTIONARY %s" % name, - True, - ) - - status = res["result"][0][6] - if status != ("REFRESHING") and status != ("COMMITTING"): - break - time.sleep(0.5) - tools.assert_equal(check_status, status, "wait refresh dictionary finish error") - - def set_first_tablet_bad_and_recover(self, table_name): - """ - set table first tablet as bad replica and recover until success - """ - res = self.execute_sql( - "SHOW TABLET FROM %s" % table_name, - True, - ) - - tablet_id = res["result"][0][0] - backend_id = res["result"][0][2] - - res = self.execute_sql( - "ADMIN SET REPLICA STATUS PROPERTIES('tablet_id' = '%s', 'backend_id' = '%s', 'status' = 'bad')" % (tablet_id, backend_id), - True, - ) - - time.sleep(20) - - while True: - res = self.execute_sql( - "SHOW TABLET FROM %s" % table_name, - True, - ) - - if len(res["result"]) != 2: - time.sleep(0.5) - else: - break - def assert_explain_contains(self, query, *expects): - """ - assert explain result contains expect string - """ - sql = "explain %s" % (query) - res = self.execute_sql(sql, True) - for expect in expects: - tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan" % (expect)) - - def assert_explain_not_contains(self, query, *expects): - """ - assert explain result contains expect string - """ - sql = "explain %s" % (query) - res = self.execute_sql(sql, True) - for expect in expects: - tools.assert_true(str(res["result"]).find(expect) == -1, "assert expect %s is found in plan" % (expect)) - - def assert_explain_costs_contains(self, query, *expects): - """ - assert explain costs result contains expect string - """ - sql = "explain costs %s" % (query) - res = self.execute_sql(sql, True) - for expect in expects: - tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan" % (expect)) - - def assert_trace_values_contains(self, query, *expects): - """ - assert trace values result contains expect string - """ - sql = "trace values %s" % (query) - res = self.execute_sql(sql, True) - for expect in expects: - tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan, error msg is %s" % (expect, str(res["result"]))) - - def assert_prepare_execute(self, db, query, params=()): - conn = mysql.connector.connect( - host=self.mysql_host, - user=self.mysql_user, - password="", - port=self.mysql_port, - database=db - ) - cursor = conn.cursor(prepared=True) - - try: - if params: - cursor.execute(query, ['2']) - else: - cursor.execute(query) - cursor.fetchall() - except mysql.connector.Error as e: - tools.assert_true(1 == 0, e) - - finally: - cursor.close() - conn.close() - - def assert_trace_times_contains(self, query, *expects): - """ - assert trace times result contains expect string - """ - sql = "trace times %s" % (query) - res = self.execute_sql(sql, True) - for expect in expects: - tools.assert_true(str(res["result"]).find(expect) > 0, "assert expect %s is not found in plan, error msg is %s" % (expect, str(res["result"]))) def assert_clear_stale_stats(self, query, expect_num): timeout = 300 @@ -1416,6 +1006,4 @@ def assert_clear_stale_stats(self, query, expect_num): time.sleep(10) timeout -= 10 else: - tools.assert_true(False, "clear stale column stats timeout. The number of stale column stats is %s" % num) - ->>>>>>> 92604ff78c ([UT] fix unstable ut (#46617)) + tools.assert_true(False, "clear stale column stats timeout. The number of stale column stats is %s" % num) \ No newline at end of file