Skip to content

Commit

Permalink
works with new data structures
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonid Chernin committed Oct 20, 2024
1 parent 47624e0 commit 45aa326
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 40 deletions.
9 changes: 9 additions & 0 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, config):
self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf")
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.rebalance_supported = False
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
Expand All @@ -50,6 +51,9 @@ def get_gw_id_owner_ana_group(self, pool, group, anagrp):
break
return gw_id

def is_rebalance_supported(self):
return self.rebalance_supported

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
Expand All @@ -64,6 +68,11 @@ def get_number_created_gateways(self, pool, group):
rply = self.execute_ceph_monitor_command(str)
self.logger.debug(f"reply \"{rply}\"")
conv_str = rply[1].decode()
pos = conv_str.find('"LB"')
if pos != -1:
self.rebalance_supported = True
else :
self.rebalance_supported = False
pos = conv_str.find("[")
if pos != -1:
new_str = conv_str[pos + len("[") :]
Expand Down
90 changes: 50 additions & 40 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,28 @@ def get_namespace_count(self, nqn, no_auto_visible = None, min_hosts = 0) -> int

return ns_count

def get_namespaces_using_ana_group_id(self, nqn, anagrpid):
def get_all_namespaces_by_ana_group_id(self, anagrpid):
ns_list = []
# Loop through all nqn values in the namespace list
for nqn in self.namespace_list:
for nsid in self.namespace_list[nqn]:
ns = self.namespace_list[nqn][nsid]
if ns.empty():
continue
if ns.anagrpid == anagrpid:
ns_list.append((nsid, nqn))#list of tupples
return ns_list

def get_ana_group_id_by_nsid_subsys(self, nqn, nsid):
if nqn not in self.namespace_list:
return 0
ns = self.namespace_list[nqn][nsid]
if ns.empty():
return 0
anagrp = ns.anagrpid
return anagrp

def get_subsys_namespaces_by_ana_group_id(self, nqn, anagrpid):
ns_list = []
if nqn not in self.namespace_list:
return ns_list
Expand Down Expand Up @@ -322,7 +343,6 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.ana_grp_ns_load = {}
self.rebalance_period_sec = self.config.getint_with_default("gateway", "rebalance_period_sec", 40)
self.rebalance_max_ns_to_change_lb_grp = self.config.getint_with_default("gateway", "max_ns_to_change_lb_grp", 10)
self.subsystem_nsid_anagrp = defaultdict(dict)

for i in range(MAX_POSSIBLE_ANA_GRPS):
self.ana_grp_ns_load[i] = 0
Expand Down Expand Up @@ -1173,7 +1193,6 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, no_au
no_auto_visible=no_auto_visible,
)
self.subsystem_nsid_bdev_and_uuid.add_namespace(subsystem_nqn, nsid, bdev_name, uuid, anagrpid, no_auto_visible)
self.subsystem_nsid_anagrp[subsystem_nqn][nsid] = anagrpid
self.logger.debug(f"subsystem_add_ns: {nsid}")
self.ana_grp_ns_load[anagrpid] += 1
except Exception as ex:
Expand Down Expand Up @@ -1312,6 +1331,9 @@ def find_target_ns_per_group(self, grp_list):
def rebalance_logic(self):
self.logger.info(f"Called rebalance logic")
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if not self.ceph_utils.is_rebalance_supported():
self.logger.info(f"Auto rebalance is not supported")
return

for ana_grp in self.ana_grp_state:
if self.ana_grp_state[ana_grp] == pb2.ana_state.OPTIMIZED :
Expand All @@ -1328,11 +1350,12 @@ def rebalance_logic(self):
else :
#rebalance only if this ana grp is more loaded
num_valid_grps, target = self.find_target_ns_per_group(grps_list)
self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]} ")
if self.ana_grp_ns_load[ana_grp] > target :
min_ana_grp = self.find_min_loaded_group(grps_list)
self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]} min load {min_ana_grp}")
if self.ana_grp_ns_load[ana_grp] >= (self.ana_grp_ns_load[min_ana_grp] + 2 * self.rebalance_max_ns_to_change_lb_grp):
self.logger.info(f" found destination ana group {min_ana_grp} load = {self.ana_grp_ns_load[min_ana_grp]} for source ana {ana_grp} Start NS rebalance")
self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]}, min loaded group {min_ana_grp}, min load {self.ana_grp_ns_load[min_ana_grp]}")
if self.ana_grp_ns_load[ana_grp] >= (self.ana_grp_ns_load[min_ana_grp] + self.rebalance_max_ns_to_change_lb_grp + 5):# was 2*
self.logger.info(f"Found destination ana group {min_ana_grp}: load = {self.ana_grp_ns_load[min_ana_grp]} for source ana {ana_grp} Start NS rebalance")
num_2_move = self.rebalance_max_ns_to_change_lb_grp/(num_valid_grps -1) if num_valid_grps > 1 else self.rebalance_max_ns_to_change_lb_grp
if num_2_move == 0 :num_2_move = 1
self.ns_rebalance(ana_grp, min_ana_grp, num_2_move)
Expand All @@ -1342,25 +1365,18 @@ def ns_rebalance(self, ana_id, dest_ana_id, num) ->int:
now = time.time()
num_rebalanced = 0
self.logger.info(f"== rebalance started == for anagrp {ana_id} destination anagrp {dest_ana_id} num ns {num} time {now} ")
self.logger.info(f"Doing loop on {ana_id} map; subsystem_nsid_anagrp:")
for subsys, inner_dict in self.subsystem_nsid_anagrp.items():
for ns_key, ana_value in inner_dict.items():
self.logger.info(f"nsid: {ns_key} ana_val: {ana_value}")
if ana_value == ana_id :
self.logger.info(f"nsid {ns_key} for nqn {subsys} to rebalance:")
nsid = ns_key
#bdev_name = self.subsystem_nsid_bdev[subsys][nsid]
#assert bdev_name, f"Can't find bdev for subsystem {subsys}, namespace {nsid}"
#self.logger.info(f"nsid for change_load_balancing :{nsid}, {subsys}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}")
self.logger.info(f"nsid for change_load_balancing :{nsid}, {subsys}, anagrpid:{ana_id}")

change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True)
ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context")
self.logger.info(f" ret namespace_change_load_balancing_group {ret}")
num_rebalanced += 1
if num_rebalanced >= num :
self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ")
return 0
ns_list = self.subsystem_nsid_bdev_and_uuid.get_all_namespaces_by_ana_group_id(ana_id)
self.logger.info(f"Doing loop on {ana_id} ")
for nsid, subsys in ns_list:
#self.logger.info(f"nsid {nsid} for nqn {subsys} to rebalance:")
self.logger.info(f"nsid for change_load_balancing : {nsid}, {subsys}, anagrpid:{ana_id}")
change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True)
ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context")
self.logger.info(f" ret namespace_change_load_balancing_group {ret}")
num_rebalanced += 1
if num_rebalanced >= num :
self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ")
return 0
return 0

def namespace_add_safe(self, request, context):
Expand Down Expand Up @@ -1552,6 +1568,12 @@ def namespace_change_load_balancing_group_safe(self, request, context):
transit_anagrpid=0
)
self.logger.debug(f"nvmf_subsystem_set_ns_ana_group: {ret}")
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid)
self.ana_grp_ns_load[anagrpid] -= 1 #decrease loading of previous "old" ana group
self.logger.debug(f"updated load in grp {anagrpid} = {self.ana_grp_ns_load[anagrpid]} ")
self.ana_grp_ns_load[request.anagrpid] += 1
self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ")
#here update find_ret.set_ana_group_id(request.anagrpid)
if not find_ret.empty():
find_ret.set_ana_group_id(request.anagrpid)
except Exception as ex:
Expand Down Expand Up @@ -1592,16 +1614,6 @@ def namespace_change_load_balancing_group_safe(self, request, context):
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

for subsys, inner_dict in self.subsystem_nsid_anagrp.items():
for ns_key, ana_value in inner_dict.items():
if ns_key == request.nsid and subsys == request.subsystem_nqn :
self.ana_grp_ns_load[ana_value] -= 1 #decrease loading of previous "old" ana group
self.logger.debug(f"updated load in grp {ana_value} = {self.ana_grp_ns_load[ana_value]} ")
break
self.ana_grp_ns_load[request.anagrpid] += 1
self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ")
self.subsystem_nsid_anagrp[request.subsystem_nqn][request.nsid] = request.anagrpid

return pb2.req_status(status=0, error_message=os.strerror(0))

def namespace_change_load_balancing_group(self, request, context=None):
Expand Down Expand Up @@ -1660,11 +1672,9 @@ def remove_namespace(self, subsystem_nqn, nsid, context):
nsid=nsid,
)
self.logger.debug(f"remove_namespace {nsid}: {ret}")
for subsys, inner_dict in self.subsystem_nsid_anagrp.items():
for ns_key, ana_value in inner_dict.items():
if ns_key == nsid:
self.ana_grp_ns_load[ana_value] -= 1
break
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(subsystem_nqn, nsid)
self.ana_grp_ns_load[anagrpid] -= 1

except Exception as ex:
self.logger.exception(namespace_failure_prefix)
errmsg = f"{namespace_failure_prefix}:\n{ex}"
Expand Down

0 comments on commit 45aa326

Please sign in to comment.