Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup topology handling logic #346

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions sdx_controller/handlers/lc_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def process_lc_json_msg(
msg,
latest_topo,
domain_list,
num_domain_topos,
):
logger.info("MQ received message:" + str(msg))
msg_json = json.loads(msg)
Expand All @@ -40,6 +39,7 @@ def process_lc_json_msg(
logger.info("Updating topo")
logger.debug(msg_json)
self.te_manager.update_topology(msg_json)
logger.info("Updating topology in TE manager")
failed_links = self.te_manager.get_failed_links()
if failed_links:
logger.info("Processing link failure.")
Expand All @@ -52,22 +52,12 @@ def process_lc_json_msg(
self.db_instance.add_key_value_pair_to_db(
"domains", "domain_list", domain_list
)

logger.info("Adding topology to TE manager")
self.te_manager.add_topology(msg_json)

if self.db_instance.read_from_db("topologies", "num_domain_topos") is None:
num_domain_topos = 1
self.db_instance.add_key_value_pair_to_db(
"topologies", "num_domain_topos", num_domain_topos
)
else:
num_domain_topos = len(domain_list)

db_key = "LC-" + str(num_domain_topos)
logger.info(f"Adding topology {db_key} to db.")
logger.info(f"Adding topology {domain_name} to db.")
self.db_instance.add_key_value_pair_to_db(
"topologies", db_key, json.dumps(msg_json)
"topologies", domain_name, json.dumps(msg_json)
)

# TODO: use TEManager API directly; but TEManager does not
Expand Down
63 changes: 29 additions & 34 deletions sdx_controller/messaging/rpc_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def start_consumer(self):
self.channel.start_consuming()

def start_sdx_consumer(self, thread_queue, db_instance):
MESSAGE_ID = 0
HEARTBEAT_ID = 0

rpc = RpcConsumer(thread_queue, "", self.te_manager)
Expand All @@ -86,41 +85,36 @@ def start_sdx_consumer(self, thread_queue, db_instance):

latest_topo = {}
domain_list = []
num_domain_topos = 0

# This part reads from DB when SDX controller initially starts.
# It looks for domain_list, and num_domain_topos, if they are already in DB,
# It looks for domain_list, if already in DB,
# Then use the existing ones from DB.
domain_list_from_db = db_instance.read_from_db("domains", "domain_list")
latest_topo_from_db = db_instance.read_from_db("topologies", "latest_topo")
num_domain_topos_from_db = db_instance.read_from_db(
"topologies", "num_domain_topos"
)

if domain_list_from_db:
domain_list = domain_list_from_db["domain_list"]
logger.debug("Read domain_list from db: ")
logger.debug("Domain list already exists in db: ")
logger.debug(domain_list)

if latest_topo_from_db:
latest_topo = latest_topo_from_db["latest_topo"]
logger.debug("Read latest_topo from db: ")
logger.debug("Topology already exists in db: ")
logger.debug(latest_topo)

if num_domain_topos_from_db:
num_domain_topos = num_domain_topos_from_db["num_domain_topos"]
logger.debug("Read num_domain_topos from db: ")
logger.debug(num_domain_topos)
for topo in range(1, num_domain_topos + 1):
db_key = f"LC-{topo}"
topology = db_instance.read_from_db("topologies", db_key)

if topology:
# Get the actual thing minus the Mongo ObjectID.
topology = topology[db_key]
topo_json = json.loads(topology)
self.te_manager.add_topology(topo_json)
logger.debug(f"Read {db_key}: {topology}")
# If topologies already saved in db, use them to initialize te_manager
if domain_list:
for domain in domain_list:
topology = db_instance.read_from_db("topologies", domain)

if not topology:
continue

# Get the actual thing minus the Mongo ObjectID.
topology = topology[domain]
topo_json = json.loads(topology)
self.te_manager.add_topology(topo_json)
logger.debug(f"Read {domain}: {topology}")

while not self._exit_event.is_set():
# Queue.get() will block until there's an item in the queue.
Expand All @@ -130,18 +124,19 @@ def start_sdx_consumer(self, thread_queue, db_instance):
if "Heart Beat" in str(msg):
HEARTBEAT_ID += 1
logger.debug("Heart beat received. ID: " + str(HEARTBEAT_ID))
else:
logger.info("Saving to database.")
if parse_helper.is_json(msg):
if "version" in str(msg):
lc_message_handler.process_lc_json_msg(
msg,
latest_topo,
domain_list,
num_domain_topos,
)
else:
logger.info("got message from MQ: " + str(msg))
continue

if not parse_helper.is_json(msg):
continue

if "version" not in str(msg):
logger.info("Got message (NO VERSION) from MQ: " + str(msg))

lc_message_handler.process_lc_json_msg(
msg,
latest_topo,
domain_list,
)

def stop_threads(self):
"""
Expand Down