Skip to content

Commit

Permalink
[fix](docker case) Fix test_sql_mode_node_mgr and add cloud multi f… (
Browse files Browse the repository at this point in the history
#44124)

…ollower fuzzy test

### What problem does this PR solve?
1. fix start docker cluster slow, double cycle use one break, can't
break out
2. Fix the test_sql_mode_node_mgr case that fails to meet testing
objectives, as the container exits after dropping the follower.
3. Add fuzzy add/drop follower/observer cases on the cloud.
4. Fix docker compose error `java.lang.Exception: doris compose output
is empty, err:`
5. Fix show frontends slow query when change fe nodes, due to `show
frontends` use is isJoin function call
getHostName(https://bugs.openjdk.org/browse/JDK-8143378#:~:text=getHostName()%3B%20takes%20about%205,millisecond%20on%20JDK%20update%2051.)
6. and Fix 
```
2024-11-28 15:50:05,272 WARN (cloud cluster check|302) [CloudClusterChecker.checkCloudBackends():518] diff cluster has exception, Cannot invoke "org.apache.doris.system.Backend.isDecommissioned()" because "be" is null
java.lang.NullPointerException: Cannot invoke "org.apache.doris.system.Backend.isDecommissioned()" because "be" is null
        at org.apache.doris.cloud.catalog.CloudClusterChecker.updateStatus(CloudClusterChecker.java:175) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.cloud.catalog.CloudClusterChecker.checkDiffNode(CloudClusterChecker.java:263) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.cloud.catalog.CloudClusterChecker.checkCloudBackends(CloudClusterChecker.java:513) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.cloud.catalog.CloudClusterChecker.runAfterCatalogReady(CloudClusterChecker.java:320) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58) ~[doris-fe.jar:1.2-SNAPSHOT]
        at org.apache.doris.common.util.Daemon.run(Daemon.java:119) ~[doris-fe.jar:1.2-SNAPSHOT]
```
  • Loading branch information
deardeng authored Dec 10, 2024
1 parent df90de9 commit 5e655d4
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 166 deletions.
12 changes: 9 additions & 3 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import os.path
import utils
import time

DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris")
Expand Down Expand Up @@ -139,11 +140,15 @@ def gen_subnet_prefix16():

def get_master_fe_endpoint(cluster_name):
master_fe_ip_file = get_cluster_path(cluster_name) + "/status/master_fe_ip"
if os.path.exists(master_fe_ip_file):
with open(master_fe_ip_file, "r") as f:
return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
max_retries = 10
for attempt in range(max_retries):
if os.path.exists(master_fe_ip_file):
with open(master_fe_ip_file, "r") as f:
return "{}:{}".format(f.read().strip(), FE_QUERY_PORT)
time.sleep(1)
try:
cluster = Cluster.load(cluster_name)
LOG.info("master file not exist, master ip get from node 1")
return "{}:{}".format(
cluster.get_node(Node.TYPE_FE, 1).get_ip(), FE_QUERY_PORT)
except:
Expand Down Expand Up @@ -468,6 +473,7 @@ def get_add_init_config(self):
for key in ("JAVA_OPTS", "JAVA_OPTS_FOR_JDK_17"):
value = parser["dummy_section"].get(key)
if value:
value = value.strip().strip('"')
cfg.append(
f"{key} = \"{value} -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:{FE_JAVA_DBG_PORT}\""
)
Expand Down
32 changes: 18 additions & 14 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _support_boolean_action(self):
return sys.version_info.major == 3 and sys.version_info.minor >= 9

def _print_table(self, header, datas):
if utils.is_enable_log():
if utils.is_log_stdout():
table = prettytable.PrettyTable(
[utils.render_green(field) for field in header])
for row in datas:
Expand Down Expand Up @@ -598,13 +598,6 @@ def do_add_node(node_type, add_num, add_ids):
related_nodes,
output_real_time=output_real_time)

ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
LOG.info(
"Master fe query address: " +
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
"\n")

if not args.start:
LOG.info(
utils.render_green(
Expand All @@ -618,14 +611,18 @@ def do_add_node(node_type, add_num, add_ids):
LOG.info("Waiting for FE master to be elected...")
expire_ts = time.time() + 30
while expire_ts > time.time():
ready = False
db_mgr = database.get_db_mgr(args.NAME, False)
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if fe_state is not None and fe_state.alive:
ready = True
break
LOG.info("there is no fe ready")
time.sleep(5)

if ready:
break
LOG.info("there is no fe ready")
time.sleep(1)
LOG.info("after Waiting for FE master to be elected...")
if cluster.is_cloud and args.sql_mode_node_mgr:
db_mgr = database.get_db_mgr(args.NAME, False)
master_fe_endpoint = CLUSTER.get_master_fe_endpoint(
Expand All @@ -635,7 +632,9 @@ def do_add_node(node_type, add_num, add_ids):
fe_endpoint = f"{fe.get_ip()}:{CLUSTER.FE_EDITLOG_PORT}"
if fe_endpoint != master_fe_endpoint:
try:
db_mgr.add_fe(fe_endpoint)
db_mgr.add_fe(
fe_endpoint, "FOLLOWER"
if cluster.fe_follower else "OBSERVER")
LOG.info(f"Added FE {fe_endpoint} successfully.")
except Exception as e:
LOG.error(
Expand All @@ -661,6 +660,12 @@ def do_add_node(node_type, add_num, add_ids):
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))

ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
LOG.info(
"Master fe query address: " +
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
"\n")
return {
"fe": {
"add_list": add_fe_ids,
Expand Down Expand Up @@ -1066,8 +1071,7 @@ def parse_cluster_compose_file(cluster_name):
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service:
ComposeService(
service: ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
Expand Down
58 changes: 37 additions & 21 deletions docker/runtime/doris-compose/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@

class FEState(object):

def __init__(self, id, is_master, alive, last_heartbeat, err_msg):
def __init__(self, id, is_master, alive, last_heartbeat, err_msg, role):
self.id = id
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
self.role = role


class BEState(object):
Expand Down Expand Up @@ -66,20 +67,21 @@ def load_states(self):
self._load_fe_states()
self._load_be_states()

def add_fe(self, fe_endpoint):
def add_fe(self, fe_endpoint, role):
try:
sql = f"ALTER SYSTEM ADD FOLLOWER '{fe_endpoint}'"
sql = f"ALTER SYSTEM ADD {role} '{fe_endpoint}'"
self._exec_query(sql)
LOG.info(f"Added FE {fe_endpoint} via SQL successfully.")
LOG.info(f"Added {role} FE {fe_endpoint} via SQL successfully.")
except Exception as e:
LOG.error(f"Failed to add FE {fe_endpoint} via SQL: {str(e)}")
raise

def drop_fe(self, fe_endpoint):
id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
try:
self._exec_query(
"ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
role = self.get_fe(id).role if self.get_fe(id) else "FOLLOWER"
self._exec_query("ALTER SYSTEM DROP {} '{}'".format(
role, fe_endpoint))
LOG.info("Drop fe {} with id {} from db succ.".format(
fe_endpoint, id))
except Exception as e:
Expand Down Expand Up @@ -152,7 +154,7 @@ def decommission_be(self, be_endpoint):
.format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num,
int(time.time() - start_ts)))

time.sleep(5)
time.sleep(1)

def create_default_storage_vault(self, cloud_store_config):
try:
Expand Down Expand Up @@ -194,7 +196,7 @@ def _load_fe_states(self):
id = CLUSTER.Node.get_id_from_ip(ip)
last_heartbeat = utils.escape_null(record["LastHeartbeat"])
err_msg = record["ErrMsg"]
fe = FEState(id, is_master, alive, last_heartbeat, err_msg)
fe = FEState(id, is_master, alive, last_heartbeat, err_msg, role)
fe_states[id] = fe
if is_master and alive:
alive_master_fe_ip = ip
Expand Down Expand Up @@ -223,13 +225,23 @@ def _load_be_states(self):
self.be_states = be_states

# return rows, and each row is a record map
def _exec_query(self, sql):
def _exec_query(self, sql, retries=3):
self._prepare_conn()
with self.conn.cursor() as cursor:
cursor.execute(sql)
fields = [field_md[0] for field_md in cursor.description
] if cursor.description else []
return [dict(zip(fields, row)) for row in cursor.fetchall()]
for attempt in range(retries):
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
fields = [field_md[0] for field_md in cursor.description
] if cursor.description else []
return [dict(zip(fields, row)) for row in cursor.fetchall()]
except Exception as e:
LOG.warn(f"Error occurred: {e}")
if "timed out" in str(e).lower() and attempt < retries - 1:
LOG.warn(f"Query timed out. Retrying {attempt + 1}/{retries}...")
self._reset_conn()
else:
raise e
raise Exception("Max retries exceeded")

def _prepare_conn(self):
if self.conn:
Expand Down Expand Up @@ -257,19 +269,23 @@ def get_db_mgr(cluster_name, required_load_succ=True):
if not master_fe_ip:
return db_mgr

has_alive_fe = False
alive_fe = None
cluster = CLUSTER.Cluster.load(cluster_name)
containers = utils.get_doris_containers(cluster_name).get(cluster_name, [])
for container in containers:
if utils.is_container_running(container):
_, node_type, _ = utils.parse_service_name(container.name)
_, node_type, id = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
has_alive_fe = True
break

if not has_alive_fe:
node = cluster.get_node(node_type, id)
if not alive_fe:
alive_fe = node
if node.get_ip() == master_fe_ip:
alive_fe = node
break
if not alive_fe:
return db_mgr

db_mgr.master_fe_ip = master_fe_ip
db_mgr.master_fe_ip = alive_fe.get_ip()
try:
db_mgr.load_states()
except Exception as e:
Expand Down
25 changes: 17 additions & 8 deletions docker/runtime/doris-compose/doris-compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# under the License.

import argparse
import cluster as CLUSTER
import command
import os.path
import sys
import traceback
import utils
Expand All @@ -31,12 +33,12 @@ def parse_args():
return ap.parse_args(), ap.format_help()


def run(args, disable_log, help):
def run(args, disable_log_stdout, help):
for cmd in command.ALL_COMMANDS:
if args.command == cmd.name:
timer = utils.Timer()
result = cmd.run(args)
if cmd.print_use_time() and not disable_log:
if cmd.print_use_time() and not disable_log_stdout:
timer.show()
return result
print(help)
Expand All @@ -48,19 +50,26 @@ def run(args, disable_log, help):
verbose = getattr(args, "verbose", False)
if verbose:
utils.set_log_verbose()
disable_log = getattr(args, "output_json", False)
if disable_log:
utils.set_enable_log(False)
disable_log_stdout = getattr(args, "output_json", False)
if disable_log_stdout:
log_file_name = ""
cluster_name = getattr(args, "NAME", "")
if cluster_name:
if type(cluster_name) == type([]):
cluster_name = cluster_name[0]
log_file_name = os.path.join(
CLUSTER.get_cluster_path(cluster_name), "doris-compose.log")
utils.set_log_to(log_file_name, False)

code = None
try:
data = run(args, disable_log, help)
if disable_log:
data = run(args, disable_log_stdout, help)
if disable_log_stdout:
print(utils.pretty_json({"code": 0, "data": data}), flush=True)
code = 0
except:
err = traceback.format_exc()
if disable_log:
if disable_log_stdout:
print(utils.pretty_json({"code": 1, "err": err}), flush=True)
else:
print(err, flush=True)
Expand Down
3 changes: 2 additions & 1 deletion docker/runtime/doris-compose/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ jsonpickle
prettytable
pymysql
python-dateutil
#pyyaml==5.4.1
# if mac install pyyaml failed, change pyyaml version
#pyyaml==5.3.1
requests<=2.31.0
7 changes: 0 additions & 7 deletions docker/runtime/doris-compose/resource/init_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ start_cloud_fe() {
fe_daemon &
run_fe

if [ "$MY_ID" == "1" ]; then
echo $MY_IP >$MASTER_FE_IP_FILE
fi
return
fi

Expand Down Expand Up @@ -168,10 +165,6 @@ start_cloud_fe() {

fe_daemon &
run_fe

if [ "$MY_ID" == "1" ]; then
echo $MY_IP >$MASTER_FE_IP_FILE
fi
}

stop_frontend() {
Expand Down
Loading

0 comments on commit 5e655d4

Please sign in to comment.