From 45aa326b0b7e2c0c4895c0bd9d9ceed62b61c429 Mon Sep 17 00:00:00 2001 From: Leonid Chernin Date: Mon, 14 Oct 2024 19:38:56 +0000 Subject: [PATCH] works with new data structures --- control/cephutils.py | 9 +++++ control/grpc.py | 90 ++++++++++++++++++++++++-------------------- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/control/cephutils.py b/control/cephutils.py index 542fc223..9140efc4 100644 --- a/control/cephutils.py +++ b/control/cephutils.py @@ -24,6 +24,7 @@ def __init__(self, config): self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf") self.rados_id = config.get_with_default("ceph", "id", "") self.anagroup_list = [] + self.rebalance_supported = False self.last_sent = time.time() def execute_ceph_monitor_command(self, cmd): @@ -50,6 +51,9 @@ def get_gw_id_owner_ana_group(self, pool, group, anagrp): break return gw_id + def is_rebalance_supported(self): + return self.rebalance_supported + def get_number_created_gateways(self, pool, group): now = time.time() if (now - self.last_sent) < 10 and self.anagroup_list : @@ -64,6 +68,11 @@ def get_number_created_gateways(self, pool, group): rply = self.execute_ceph_monitor_command(str) self.logger.debug(f"reply \"{rply}\"") conv_str = rply[1].decode() + pos = conv_str.find('"LB"') + if pos != -1: + self.rebalance_supported = True + else : + self.rebalance_supported = False pos = conv_str.find("[") if pos != -1: new_str = conv_str[pos + len("[") :] diff --git a/control/grpc.py b/control/grpc.py index b0a3499e..6aa893d6 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -209,7 +209,28 @@ def get_namespace_count(self, nqn, no_auto_visible = None, min_hosts = 0) -> int return ns_count - def get_namespaces_using_ana_group_id(self, nqn, anagrpid): + def get_all_namespaces_by_ana_group_id(self, anagrpid): + ns_list = [] + # Loop through all nqn values in the namespace list + for nqn in self.namespace_list: + for nsid in self.namespace_list[nqn]: + ns = self.namespace_list[nqn][nsid] + if ns.empty(): + continue + if ns.anagrpid == anagrpid: + ns_list.append((nsid, nqn))#list of tupples + return ns_list + + def get_ana_group_id_by_nsid_subsys(self, nqn, nsid): + if nqn not in self.namespace_list: + return 0 + ns = self.namespace_list[nqn][nsid] + if ns.empty(): + return 0 + anagrp = ns.anagrpid + return anagrp + + def get_subsys_namespaces_by_ana_group_id(self, nqn, anagrpid): ns_list = [] if nqn not in self.namespace_list: return ns_list @@ -322,7 +343,6 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp self.ana_grp_ns_load = {} self.rebalance_period_sec = self.config.getint_with_default("gateway", "rebalance_period_sec", 40) self.rebalance_max_ns_to_change_lb_grp = self.config.getint_with_default("gateway", "max_ns_to_change_lb_grp", 10) - self.subsystem_nsid_anagrp = defaultdict(dict) for i in range(MAX_POSSIBLE_ANA_GRPS): self.ana_grp_ns_load[i] = 0 @@ -1173,7 +1193,6 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, no_au no_auto_visible=no_auto_visible, ) self.subsystem_nsid_bdev_and_uuid.add_namespace(subsystem_nqn, nsid, bdev_name, uuid, anagrpid, no_auto_visible) - self.subsystem_nsid_anagrp[subsystem_nqn][nsid] = anagrpid self.logger.debug(f"subsystem_add_ns: {nsid}") self.ana_grp_ns_load[anagrpid] += 1 except Exception as ex: @@ -1312,6 +1331,9 @@ def find_target_ns_per_group(self, grp_list): def rebalance_logic(self): self.logger.info(f"Called rebalance logic") grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + if not self.ceph_utils.is_rebalance_supported(): + self.logger.info(f"Auto rebalance is not supported") + return for ana_grp in self.ana_grp_state: if self.ana_grp_state[ana_grp] == pb2.ana_state.OPTIMIZED : @@ -1328,11 +1350,12 @@ def rebalance_logic(self): else : #rebalance only if this ana grp is more loaded num_valid_grps, target = self.find_target_ns_per_group(grps_list) + self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]} ") if self.ana_grp_ns_load[ana_grp] > target : min_ana_grp = self.find_min_loaded_group(grps_list) - self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]} min load {min_ana_grp}") - if self.ana_grp_ns_load[ana_grp] >= (self.ana_grp_ns_load[min_ana_grp] + 2 * self.rebalance_max_ns_to_change_lb_grp): - self.logger.info(f" found destination ana group {min_ana_grp} load = {self.ana_grp_ns_load[min_ana_grp]} for source ana {ana_grp} Start NS rebalance") + self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]}, min loaded group {min_ana_grp}, min load {self.ana_grp_ns_load[min_ana_grp]}") + if self.ana_grp_ns_load[ana_grp] >= (self.ana_grp_ns_load[min_ana_grp] + self.rebalance_max_ns_to_change_lb_grp + 5):# was 2* + self.logger.info(f"Found destination ana group {min_ana_grp}: load = {self.ana_grp_ns_load[min_ana_grp]} for source ana {ana_grp} Start NS rebalance") num_2_move = self.rebalance_max_ns_to_change_lb_grp/(num_valid_grps -1) if num_valid_grps > 1 else self.rebalance_max_ns_to_change_lb_grp if num_2_move == 0 :num_2_move = 1 self.ns_rebalance(ana_grp, min_ana_grp, num_2_move) @@ -1342,25 +1365,18 @@ def ns_rebalance(self, ana_id, dest_ana_id, num) ->int: now = time.time() num_rebalanced = 0 self.logger.info(f"== rebalance started == for anagrp {ana_id} destination anagrp {dest_ana_id} num ns {num} time {now} ") - self.logger.info(f"Doing loop on {ana_id} map; subsystem_nsid_anagrp:") - for subsys, inner_dict in self.subsystem_nsid_anagrp.items(): - for ns_key, ana_value in inner_dict.items(): - self.logger.info(f"nsid: {ns_key} ana_val: {ana_value}") - if ana_value == ana_id : - self.logger.info(f"nsid {ns_key} for nqn {subsys} to rebalance:") - nsid = ns_key - #bdev_name = self.subsystem_nsid_bdev[subsys][nsid] - #assert bdev_name, f"Can't find bdev for subsystem {subsys}, namespace {nsid}" - #self.logger.info(f"nsid for change_load_balancing :{nsid}, {subsys}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}") - self.logger.info(f"nsid for change_load_balancing :{nsid}, {subsys}, anagrpid:{ana_id}") - - change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True) - ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context") - self.logger.info(f" ret namespace_change_load_balancing_group {ret}") - num_rebalanced += 1 - if num_rebalanced >= num : - self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ") - return 0 + ns_list = self.subsystem_nsid_bdev_and_uuid.get_all_namespaces_by_ana_group_id(ana_id) + self.logger.info(f"Doing loop on {ana_id} ") + for nsid, subsys in ns_list: + #self.logger.info(f"nsid {nsid} for nqn {subsys} to rebalance:") + self.logger.info(f"nsid for change_load_balancing : {nsid}, {subsys}, anagrpid:{ana_id}") + change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True) + ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context") + self.logger.info(f" ret namespace_change_load_balancing_group {ret}") + num_rebalanced += 1 + if num_rebalanced >= num : + self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ") + return 0 return 0 def namespace_add_safe(self, request, context): @@ -1552,6 +1568,12 @@ def namespace_change_load_balancing_group_safe(self, request, context): transit_anagrpid=0 ) self.logger.debug(f"nvmf_subsystem_set_ns_ana_group: {ret}") + anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid) + self.ana_grp_ns_load[anagrpid] -= 1 #decrease loading of previous "old" ana group + self.logger.debug(f"updated load in grp {anagrpid} = {self.ana_grp_ns_load[anagrpid]} ") + self.ana_grp_ns_load[request.anagrpid] += 1 + self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ") + #here update find_ret.set_ana_group_id(request.anagrpid) if not find_ret.empty(): find_ret.set_ana_group_id(request.anagrpid) except Exception as ex: @@ -1592,16 +1614,6 @@ def namespace_change_load_balancing_group_safe(self, request, context): errmsg = f"{errmsg}:\n{ex}" return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - for subsys, inner_dict in self.subsystem_nsid_anagrp.items(): - for ns_key, ana_value in inner_dict.items(): - if ns_key == request.nsid and subsys == request.subsystem_nqn : - self.ana_grp_ns_load[ana_value] -= 1 #decrease loading of previous "old" ana group - self.logger.debug(f"updated load in grp {ana_value} = {self.ana_grp_ns_load[ana_value]} ") - break - self.ana_grp_ns_load[request.anagrpid] += 1 - self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ") - self.subsystem_nsid_anagrp[request.subsystem_nqn][request.nsid] = request.anagrpid - return pb2.req_status(status=0, error_message=os.strerror(0)) def namespace_change_load_balancing_group(self, request, context=None): @@ -1660,11 +1672,9 @@ def remove_namespace(self, subsystem_nqn, nsid, context): nsid=nsid, ) self.logger.debug(f"remove_namespace {nsid}: {ret}") - for subsys, inner_dict in self.subsystem_nsid_anagrp.items(): - for ns_key, ana_value in inner_dict.items(): - if ns_key == nsid: - self.ana_grp_ns_load[ana_value] -= 1 - break + anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(subsystem_nqn, nsid) + self.ana_grp_ns_load[anagrpid] -= 1 + except Exception as ex: self.logger.exception(namespace_failure_prefix) errmsg = f"{namespace_failure_prefix}:\n{ex}"