From 5e655d488c67cf36457a48dec4534eb9f2033d61 Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 10 Dec 2024 16:33:37 +0800 Subject: [PATCH] =?UTF-8?q?[fix](docker=20case)=20Fix=20`test=5Fsql=5Fmode?= =?UTF-8?q?=5Fnode=5Fmgr`=20and=20add=20cloud=20multi=20f=E2=80=A6=20(#441?= =?UTF-8?q?24)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ollower fuzzy test ### What problem does this PR solve? 1. fix start docker cluster slow, double cycle use one break, can't break out 2. Fix the test_sql_mode_node_mgr case that fails to meet testing objectives, as the container exits after dropping the follower. 3. Add fuzzy add/drop follower/observer cases on the cloud. 4. Fix docker compose error `java.lang.Exception: doris compose output is empty, err:` 5. Fix show frontends slow query when change fe nodes, due to `show frontends` use is isJoin function call getHostName(https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051.) 6. and Fix ``` 2024-11-28 15:50:05,272 WARN (cloud cluster check|302) [CloudClusterChecker.checkCloudBackends():518] diff cluster has exception, Cannot invoke "org.apache.doris.system.Backend.isDecommissioned()" because "be" is null java.lang.NullPointerException: Cannot invoke "org.apache.doris.system.Backend.isDecommissioned()" because "be" is null at org.apache.doris.cloud.catalog.CloudClusterChecker.updateStatus(CloudClusterChecker.java:175) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.cloud.catalog.CloudClusterChecker.checkDiffNode(CloudClusterChecker.java:263) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.cloud.catalog.CloudClusterChecker.checkCloudBackends(CloudClusterChecker.java:513) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.cloud.catalog.CloudClusterChecker.runAfterCatalogReady(CloudClusterChecker.java:320) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.common.util.Daemon.run(Daemon.java:119) ~[doris-fe.jar:1.2-SNAPSHOT] ``` --- docker/runtime/doris-compose/cluster.py | 12 +- docker/runtime/doris-compose/command.py | 32 +++-- docker/runtime/doris-compose/database.py | 58 +++++--- docker/runtime/doris-compose/doris-compose.py | 25 ++-- docker/runtime/doris-compose/requirements.txt | 3 +- .../runtime/doris-compose/resource/init_fe.sh | 7 - docker/runtime/doris-compose/utils.py | 55 ++++---- .../cloud/catalog/CloudClusterChecker.java | 4 + .../apache/doris/cloud/catalog/CloudEnv.java | 3 +- .../doris/common/proc/FrontendsProcNode.java | 25 +++- .../regression/suite/SuiteCluster.groovy | 21 +-- .../node_mgr/test_sql_mode_node_mgr.groovy | 127 ++++++++---------- 12 files changed, 206 insertions(+), 166 deletions(-) diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index ba834167bd1c63..f4522181d4b18e 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -23,6 +23,7 @@ import os import os.path import utils +import time DOCKER_DORIS_PATH = "/opt/apache-doris" LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris") @@ -139,11 +140,15 @@ def gen_subnet_prefix16(): def get_master_fe_endpoint(cluster_name): master_fe_ip_file = get_cluster_path(cluster_name) + "/status/master_fe_ip" - if os.path.exists(master_fe_ip_file): - with open(master_fe_ip_file, "r") as f: - return "{}:{}".format(f.read().strip(), FE_QUERY_PORT) + max_retries = 10 + for attempt in range(max_retries): + if os.path.exists(master_fe_ip_file): + with open(master_fe_ip_file, "r") as f: + return "{}:{}".format(f.read().strip(), FE_QUERY_PORT) + time.sleep(1) try: cluster = Cluster.load(cluster_name) + LOG.info("master file not exist, master ip get from node 1") return "{}:{}".format( cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT) except: @@ -468,6 +473,7 @@ def get_add_init_config(self): for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"): value = parser["dummy_section"].get(key) if value: + value = value.strip().strip('"') cfg.append( f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\"" ) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 7a2f3f3c195f18..638c1c465d75b1 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -183,7 +183,7 @@ def _support_boolean_action(self): return sys.version_info.major == 3 and sys.version_info.minor >= 9 def _print_table(self, header, datas): - if utils.is_enable_log(): + if utils.is_log_stdout(): table = prettytable.PrettyTable( [utils.render_green(field) for field in header]) for row in datas: @@ -598,13 +598,6 @@ def do_add_node(node_type, add_num, add_ids): related_nodes, output_real_time=output_real_time) - ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name - LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n") - LOG.info( - "Master fe query address: " + - utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) + - "\n") - if not args.start: LOG.info( utils.render_green( @@ -618,14 +611,18 @@ def do_add_node(node_type, add_num, add_ids): LOG.info("Waiting for FE master to be elected...") expire_ts = time.time() + 30 while expire_ts > time.time(): + ready = False db_mgr = database.get_db_mgr(args.NAME, False) for id in add_fe_ids: fe_state = db_mgr.get_fe(id) if fe_state is not None and fe_state.alive: + ready = True break - LOG.info("there is no fe ready") - time.sleep(5) - + if ready: + break + LOG.info("there is no fe ready") + time.sleep(1) + LOG.info("after Waiting for FE master to be elected...") if cluster.is_cloud and args.sql_mode_node_mgr: db_mgr = database.get_db_mgr(args.NAME, False) master_fe_endpoint = CLUSTER.get_master_fe_endpoint( @@ -635,7 +632,9 @@ def do_add_node(node_type, add_num, add_ids): fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}" if fe_endpoint != master_fe_endpoint: try: - db_mgr.add_fe(fe_endpoint) + db_mgr.add_fe( + fe_endpoint, "FOLLOWER" + if cluster.fe_follower else "OBSERVER") LOG.info(f"Added FE {fe_endpoint} successfully.") except Exception as e: LOG.error( @@ -661,6 +660,12 @@ def do_add_node(node_type, add_num, add_ids): "Up cluster {} succ, related node num {}".format( args.NAME, related_node_num))) + ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name + LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n") + LOG.info( + "Master fe query address: " + + utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) + + "\n") return { "fe": { "add_list": add_fe_ids, @@ -1066,8 +1071,7 @@ def parse_cluster_compose_file(cluster_name): if services is None: return COMPOSE_BAD, {} return COMPOSE_GOOD, { - service: - ComposeService( + service: ComposeService( service, list(service_conf["networks"].values())[0] ["ipv4_address"], service_conf["image"]) diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 46cdd961c9f888..370f1d5ee2afa7 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -27,12 +27,13 @@ class FEState(object): - def __init__(self, id, is_master, alive, last_heartbeat, err_msg): + def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role): self.id = id self.is_master = is_master self.alive = alive self.last_heartbeat = last_heartbeat self.err_msg = err_msg + self.role = role class BEState(object): @@ -66,11 +67,11 @@ def load_states(self): self._load_fe_states() self._load_be_states() - def add_fe(self, fe_endpoint): + def add_fe(self, fe_endpoint, role): try: - sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'" + sql = f"ALTER SYSTEM ADD {role} '{fe_endpoint}'" self._exec_query(sql) - LOG.info(f"Added FE {fe_endpoint} via SQL successfully.") + LOG.info(f"Added {role} FE {fe_endpoint} via SQL successfully.") except Exception as e: LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}") raise @@ -78,8 +79,9 @@ def add_fe(self, fe_endpoint): def drop_fe(self, fe_endpoint): id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")]) try: - self._exec_query( - "ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint)) + role = self.get_fe(id).role if self.get_fe(id) else "FOLLOWER" + self._exec_query("ALTER SYSTEM DROP {} '{}'".format( + role, fe_endpoint)) LOG.info("Drop fe {} with id {} from db succ.".format( fe_endpoint, id)) except Exception as e: @@ -152,7 +154,7 @@ def decommission_be(self, be_endpoint): .format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num, int(time.time() - start_ts))) - time.sleep(5) + time.sleep(1) def create_default_storage_vault(self, cloud_store_config): try: @@ -194,7 +196,7 @@ def _load_fe_states(self): id = CLUSTER.Node.get_id_from_ip(ip) last_heartbeat = utils.escape_null(record["LastHeartbeat"]) err_msg = record["ErrMsg"] - fe = FEState(id, is_master, alive, last_heartbeat, err_msg) + fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role) fe_states[id] = fe if is_master and alive: alive_master_fe_ip = ip @@ -223,13 +225,23 @@ def _load_be_states(self): self.be_states = be_states # return rows, and each row is a record map - def _exec_query(self, sql): + def _exec_query(self, sql, retries=3): self._prepare_conn() - with self.conn.cursor() as cursor: - cursor.execute(sql) - fields = [field_md[0] for field_md in cursor.description - ] if cursor.description else [] - return [dict(zip(fields, row)) for row in cursor.fetchall()] + for attempt in range(retries): + try: + with self.conn.cursor() as cursor: + cursor.execute(sql) + fields = [field_md[0] for field_md in cursor.description + ] if cursor.description else [] + return [dict(zip(fields, row)) for row in cursor.fetchall()] + except Exception as e: + LOG.warn(f"Error occurred: {e}") + if "timed out" in str(e).lower() and attempt < retries - 1: + LOG.warn(f"Query timed out. Retrying {attempt + 1}/{retries}...") + self._reset_conn() + else: + raise e + raise Exception("Max retries exceeded") def _prepare_conn(self): if self.conn: @@ -257,19 +269,23 @@ def get_db_mgr(cluster_name, required_load_succ=True): if not master_fe_ip: return db_mgr - has_alive_fe = False + alive_fe = None + cluster = CLUSTER.Cluster.load(cluster_name) containers = utils.get_doris_containers(cluster_name).get(cluster_name, []) for container in containers: if utils.is_container_running(container): - _, node_type, _ = utils.parse_service_name(container.name) + _, node_type, id = utils.parse_service_name(container.name) if node_type == CLUSTER.Node.TYPE_FE: - has_alive_fe = True - break - - if not has_alive_fe: + node = cluster.get_node(node_type, id) + if not alive_fe: + alive_fe = node + if node.get_ip() == master_fe_ip: + alive_fe = node + break + if not alive_fe: return db_mgr - db_mgr.master_fe_ip = master_fe_ip + db_mgr.master_fe_ip = alive_fe.get_ip() try: db_mgr.load_states() except Exception as e: diff --git a/docker/runtime/doris-compose/doris-compose.py b/docker/runtime/doris-compose/doris-compose.py index a2d3a517553f71..cf3692d53215e0 100644 --- a/docker/runtime/doris-compose/doris-compose.py +++ b/docker/runtime/doris-compose/doris-compose.py @@ -16,7 +16,9 @@ # under the License. import argparse +import cluster as CLUSTER import command +import os.path import sys import traceback import utils @@ -31,12 +33,12 @@ def parse_args(): return ap.parse_args(), ap.format_help() -def run(args, disable_log, help): +def run(args, disable_log_stdout, help): for cmd in command.ALL_COMMANDS: if args.command == cmd.name: timer = utils.Timer() result = cmd.run(args) - if cmd.print_use_time() and not disable_log: + if cmd.print_use_time() and not disable_log_stdout: timer.show() return result print(help) @@ -48,19 +50,26 @@ def run(args, disable_log, help): verbose = getattr(args, "verbose", False) if verbose: utils.set_log_verbose() - disable_log = getattr(args, "output_json", False) - if disable_log: - utils.set_enable_log(False) + disable_log_stdout = getattr(args, "output_json", False) + if disable_log_stdout: + log_file_name = "" + cluster_name = getattr(args, "NAME", "") + if cluster_name: + if type(cluster_name) == type([]): + cluster_name = cluster_name[0] + log_file_name = os.path.join( + CLUSTER.get_cluster_path(cluster_name), "doris-compose.log") + utils.set_log_to(log_file_name, False) code = None try: - data = run(args, disable_log, help) - if disable_log: + data = run(args, disable_log_stdout, help) + if disable_log_stdout: print(utils.pretty_json({"code": 0, "data": data}), flush=True) code = 0 except: err = traceback.format_exc() - if disable_log: + if disable_log_stdout: print(utils.pretty_json({"code": 1, "err": err}), flush=True) else: print(err, flush=True) diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt index 2f962ed68d8bf8..1f32223a02e1f4 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/requirements.txt @@ -22,5 +22,6 @@ jsonpickle prettytable pymysql python-dateutil -#pyyaml==5.4.1 +# if mac install pyyaml failed, change pyyaml version +#pyyaml==5.3.1 requests<=2.31.0 diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index b69ac3a209e409..a58723db1d7b2a 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -102,9 +102,6 @@ start_cloud_fe() { fe_daemon & run_fe - if [ "$MY_ID" == "1" ]; then - echo $MY_IP >$MASTER_FE_IP_FILE - fi return fi @@ -168,10 +165,6 @@ start_cloud_fe() { fe_daemon & run_fe - - if [ "$MY_ID" == "1" ]; then - echo $MY_IP >$MASTER_FE_IP_FILE - fi } stop_frontend() { diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index 4332ae6cf48d03..dcb821ddffdd00 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -23,6 +23,7 @@ import pwd import socket import subprocess +import sys import time import yaml @@ -30,7 +31,7 @@ LOG = None -ENABLE_LOG = True +ENALBE_LOG_STDOUT = True class Timer(object): @@ -48,39 +49,41 @@ def cancel(self): self.canceled = True -def set_enable_log(enabled): - global ENABLE_LOG - ENABLE_LOG = enabled - get_logger().disabled = not enabled - - -def is_enable_log(): - return ENABLE_LOG +def is_log_stdout(): + return ENALBE_LOG_STDOUT def set_log_verbose(): get_logger().setLevel(logging.DEBUG) -def get_logger(name=None): - global LOG - if LOG != None: - return LOG - - logger = logging.getLogger(name) - if not logger.hasHandlers(): +def set_log_to(log_file_name, is_to_stdout): + logger = get_logger() + for ch in logger.handlers: + logger.removeHandler(ch) + if log_file_name: + os.makedirs(os.path.dirname(log_file_name), exist_ok=True) + logger.addHandler(logging.FileHandler(log_file_name)) + global ENALBE_LOG_STDOUT + ENALBE_LOG_STDOUT = is_to_stdout + if is_to_stdout: + logger.addHandler(logging.StreamHandler(sys.stdout)) + for ch in logger.handlers: formatter = logging.Formatter( '%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - %(message)s' ) - ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(formatter) - logger.addHandler(ch) - logger.setLevel(logging.INFO) - LOG = logger - return logger +def get_logger(name="doris-compose"): + global LOG + if LOG is None: + LOG = logging.getLogger(name) + LOG.setLevel(logging.INFO) + set_log_to(None, True) + + return LOG get_logger() @@ -196,15 +199,17 @@ def exec_shell_command(command, ignore_errors=False, output_real_time=False): if output_real_time: while p.poll() is None: s = p.stdout.readline().decode('utf-8') - if ENABLE_LOG and s.rstrip(): - print(s.rstrip()) + if s.rstrip(): + for line in s.strip().splitlines(): + LOG.info("(docker) " + line) out += s exitcode = p.wait() else: out = p.communicate()[0].decode('utf-8') exitcode = p.returncode - if ENABLE_LOG and out: - print(out) + if out: + for line in out.splitlines(): + LOG.info("(docker) " + line) if not ignore_errors: assert exitcode == 0, out return exitcode, out diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index e27339c2aacc14..9468c8acecd032 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -170,6 +170,10 @@ private void updateStatus(List currentBes, List expec String endpoint = addr + ":" + node.getHeartbeatPort(); Cloud.NodeStatusPB status = node.getStatus(); Backend be = currentMap.get(endpoint); + if (be == null) { + LOG.warn("cant get valid be {} from fe mem, ignore it checker will add this be at next", endpoint); + continue; + } if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) { if (!be.isDecommissioned()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index f4c6005a0d828a..89338c228fc0b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -415,7 +415,8 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd Frontend frontend = checkFeExist(host, port); if (frontend == null) { - throw new DdlException("Frontend does not exist."); + throw new DdlException("frontend does not exist[" + NetUtils + .getHostPortInAccessibleFormat(host, port) + "]"); } if (frontend.getRole() != role) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index eb75bc9312aab4..ede8cb56258636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -126,7 +126,9 @@ public static void getFrontendsInfo(Env env, List> infos) { selfNode = ConnectContext.get().getCurrentConnectedFEIp(); } - for (Frontend fe : env.getFrontends(null /* all */)) { + List envFes = env.getFrontends(null /* all */); + LOG.info("bdbje fes {}, env fes {}", allFe, envFes); + for (Frontend fe : envFes) { List info = new ArrayList(); info.add(fe.getNodeName()); info.add(fe.getHost()); @@ -211,11 +213,6 @@ private static boolean isJoin(List allFeHosts, Frontend fe) { if (fe.getEditLogPort() != addr.getPort()) { continue; } - if (!Strings.isNullOrEmpty(addr.getHostName())) { - if (addr.getHostName().equals(fe.getHost())) { - return true; - } - } // if hostname of InetSocketAddress is ip, addr.getHostName() may be not equal to fe.getIp() // so we need to compare fe.getIp() with address.getHostAddress() InetAddress address = addr.getAddress(); @@ -227,6 +224,22 @@ private static boolean isJoin(List allFeHosts, Frontend fe) { return true; } } + + // Avoid calling getHostName multiple times, don't remove it + for (InetSocketAddress addr : allFeHosts) { + // Avoid calling getHostName multiple times, don't remove it + if (fe.getEditLogPort() != addr.getPort()) { + continue; + } + // https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051 + // getHostName sometime has bug, take 5s + String host = addr.getHostName(); + if (!Strings.isNullOrEmpty(host)) { + if (host.equals(fe.getHost())) { + return true; + } + } + } return false; } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 856b0e76956395..e77658793fe5b2 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -23,12 +23,14 @@ import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps +import org.awaitility.Awaitility import org.slf4j.Logger import org.slf4j.LoggerFactory import groovy.json.JsonSlurper import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import static java.util.concurrent.TimeUnit.SECONDS import java.util.stream.Collectors import java.sql.Connection @@ -333,7 +335,7 @@ class SuiteCluster { sqlModeNodeMgr = options.sqlModeNodeMgr - runCmd(cmd.join(' '), -1) + runCmd(cmd.join(' '), 180) // wait be report disk Thread.sleep(5000) @@ -483,6 +485,9 @@ class SuiteCluster { if (followerMode) { sb.append('--fe-follower' + ' ') } + if (sqlModeNodeMgr) { + sb.append('--sql-mode-node-mgr' + ' ') + } } if (beNum > 0) { sb.append('--add-be-num ' + beNum + ' ') @@ -492,7 +497,7 @@ class SuiteCluster { } sb.append('--wait-timeout 60') - def data = (Map>) runCmd(sb.toString(), -1) + def data = (Map>) runCmd(sb.toString(), 180) def newFrontends = (List) data.get('fe').get('add_list') def newBackends = (List) data.get('be').get('add_list') @@ -636,17 +641,15 @@ class SuiteCluster { } private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception { - def fullCmd = String.format('python -W ignore %s %s --output-json', config.dorisComposePath, cmd) + def fullCmd = String.format('python -W ignore %s %s -v --output-json', config.dorisComposePath, cmd) logger.info('Run doris compose cmd: {}', fullCmd) def proc = fullCmd.execute() def outBuf = new StringBuilder() def errBuf = new StringBuilder() - proc.consumeProcessOutput(outBuf, errBuf) - if (timeoutSecond > 0) { - proc.waitForOrKill(timeoutSecond * 1000) - } else { - proc.waitFor() - } + Awaitility.await().atMost(timeoutSecond, SECONDS).until({ + proc.waitForProcessOutput(outBuf, errBuf) + return true + }) if (proc.exitValue() != 0) { throw new Exception(String.format('Exit value: %s != 0, stdout: %s, stderr: %s', proc.exitValue(), outBuf.toString(), errBuf.toString())) diff --git a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy index 7405cb864d889d..70372f68ab865f 100644 --- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy +++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy @@ -38,6 +38,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { options.sqlModeNodeMgr = true options.waitTimeout = 0 options.feNum = 3 + options.useFollowersMode = true options.feConfigs += ["resource_not_ready_sleep_seconds=1", "heartbeat_interval_second=1",] } @@ -121,6 +122,9 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { // Check FE number def frontendResult = sql_return_maparray """SHOW FRONTENDS;""" + // Check that all frontends are alive + def aliveCount = frontendResult.count { it['Alive'] == 'true' } + assert aliveCount == expectedFeNum, "Expected all $expectedFeNum frontends to be alive, but only ${aliveCount} are alive" assert frontendResult.size() == expectedFeNum, "Expected ${expectedFeNum} frontends, but got ${frontendResult.size()}" logger.info("FE number check passed: ${frontendResult.size()} FEs found") @@ -272,28 +276,23 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { def feEditLogPort = feToDropMap['EditLogPort'] def feRole = feToDropMap['Role'] - logger.info("Dropping non-master frontend: {}:{}", feHost, feEditLogPort) + def dropFeInx = cluster.getFrontends().find { it.host == feHost }.index + logger.info("Dropping non-master frontend: {}:{}, index: {}", feHost, feEditLogPort, dropFeInx) // Drop the selected non-master frontend sql """ ALTER SYSTEM DROP ${feRole} "${feHost}:${feEditLogPort}"; """ - + // After drop feHost container will exit + cluster.dropFrontends(true, dropFeInx) + sleep(3 * 1000) + logger.info("Dropping frontend index: {}, remove it from docker compose", dropFeInx) // Wait for the frontend to be fully dropped - maxWaitSeconds = 300 - waited = 0 - while (waited < maxWaitSeconds) { + + dockerAwaitUntil(300) { reconnectFe() def currentFrontends = sql_return_maparray("SHOW FRONTENDS") - if (currentFrontends.size() == frontends.size() - 1) { - logger.info("Non-master frontend successfully dropped") - break - } - sleep(10000) - waited += 10 + currentFrontends.size() == frontends.size() - 1 } - if (waited >= maxWaitSeconds) { - throw new Exception("Timeout waiting for non-master frontend to be dropped") - } checkClusterStatus(2, 3, 4) @@ -309,86 +308,72 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') { } assert droppedFE != null, "Could not find the dropped frontend" - - feHost = droppedFE['Host'] - feEditLogPort = droppedFE['EditLogPort'] - - logger.info("Adding back frontend: {}:{}", feHost, feEditLogPort) - - // Add the frontend back - sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """ + + // Up a new follower fe and add to docker compose + // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}";` + boolean fuzzyUpFollower = (getRandomBoolean() == "true") ? true : false + logger.info("Want up a new role [{}] frontend", fuzzyUpFollower ? "FOLLOWER" : "OBSERVER") + def addList = cluster.addFrontend(1, fuzzyUpFollower) + logger.info("Up a new frontend, addList: {}", addList) + + def addFE = cluster.getFeByIndex(addList[0]) + feHost = addFE['Host'] + feEditLogPort = addFE['EditLogPort'] + def showFes = sql """SHOW FRONTENDS""" + logger.info("Adding back frontend: {}", showFes) // Wait for the frontend to be fully added back - maxWaitSeconds = 300 - waited = 0 - while (waited < maxWaitSeconds) { + dockerAwaitUntil(300, 5) { def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") - if (updatedFrontends.size() == frontends.size()) { - logger.info("Frontend successfully added back") - break - } - sleep(10000) - waited += 10 + updatedFrontends.size() == frontends.size() } - - if (waited >= maxWaitSeconds) { - throw new Exception("Timeout waiting for frontend to be added back") - } - - // Verify cluster status after adding the frontend back + checkClusterStatus(3, 3, 5) logger.info("Frontend successfully added back and cluster status verified") // CASE 6. Drop frontend and add back again logger.info("Dropping frontend and adding back again") - // Get the frontend to be dropped - def frontendToDrop = frontends.find { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort } + currentFrontends = sql_return_maparray("SHOW FRONTENDS") + + int obServerCount = currentFrontends.count { it['Role'] == 'OBSERVER' } + String fuzzyDropRole + if (obServerCount != 0) { + fuzzyDropRole = (getRandomBoolean() == "true") ? "FOLLOWER" : "OBSERVER" + } else { + fuzzyDropRole = "FOLLOWER" + } + + def frontendToDrop = currentFrontends.find {it['IsMaster'] == "false" && it['Role'] == fuzzyDropRole} + logger.info("Find drop again frontend: {}, drop role [{}]", frontendToDrop, fuzzyDropRole) assert frontendToDrop != null, "Could not find the frontend to drop" + def role = frontendToDrop.Role // Drop the frontend - sql """ ALTER SYSTEM DROP FOLLOWER "${feHost}:${feEditLogPort}"; """ - sleep(30000) + sql """ ALTER SYSTEM DROP $role "${frontendToDrop.Host}:${frontendToDrop.EditLogPort}"; """ + dropFeInx = cluster.getFrontends().find { it.host == frontendToDrop.Host }.index + // After drop frontendToDrop.Host container will exit + cluster.dropFrontends(true, dropFeInx) + logger.info("Dropping again frontend index: {}, remove it from docker compose", dropFeInx) + sleep(3 * 1000) reconnectFe() // Wait for the frontend to be fully dropped - maxWaitSeconds = 300 - waited = 0 - while (waited < maxWaitSeconds) { + dockerAwaitUntil(300, 5) { def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") - if (!updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) { - logger.info("Frontend successfully dropped") - break - } - sleep(10000) - waited += 10 - } - - if (waited >= maxWaitSeconds) { - throw new Exception("Timeout waiting for frontend to be dropped") + !updatedFrontends.any { it['Host'] == frontendToDrop.Host && it['EditLogPort'] == frontendToDrop.EditLogPort } } - // Add the frontend back - sql """ ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}"; """ + // Up a new follower fe and add to docker compose + // ATTN: in addFrontend, sql node mode, will execute `ALTER SYSTEM ADD FOLLOWER "${feHost}:${feEditLogPort}";` + addList = cluster.addFrontend(1, true) + logger.info("Up a new frontend, addList: {}", addList) - // Wait for the frontend to be fully added back - maxWaitSeconds = 300 - waited = 0 - while (waited < maxWaitSeconds) { + dockerAwaitUntil(300, 5) { def updatedFrontends = sql_return_maparray("SHOW FRONTENDS") - if (updatedFrontends.any { it['Host'] == feHost && it['EditLogPort'] == feEditLogPort }) { - logger.info("Frontend successfully added back") - break - } - sleep(10000) - waited += 10 + updatedFrontends.size() == 3 } - - if (waited >= maxWaitSeconds) { - throw new Exception("Timeout waiting for frontend to be added back") - } - // Verify cluster status after adding the frontend back checkClusterStatus(3, 3, 6)