Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposing removal of nested protocol managers #2169

Draft
wants to merge 1 commit into
base: alexleung/dev_v070_for_refactor
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ def remove_message_listener(self, topic):
self.listener_topics.remove(topic)
self.listener_handler_funcs.pop(topic)

def get_message_runner(self):
return None

def get_listener_message_queue(self):
return self.listener_message_queue

Expand All @@ -321,19 +318,17 @@ def start_listener(
self.listener_message_event = multiprocessing.Event()
self.listener_message_event.clear()
self.listener_agent_config = agent_config
message_runner = self.get_message_runner()
message_runner.listener_agent_config = agent_config
if platform.system() == "Windows":
self.listener_message_center_process = multiprocessing.Process(
target=message_runner.run_listener_dispatcher, args=(
target=self.run_listener_dispatcher, args=(
self.listener_message_event, self.listener_message_queue,
self.listener_handler_funcs, sender_message_queue,
message_center_name, extra_queues
)
)
else:
self.listener_message_center_process = fedml.get_process(
target=message_runner.run_listener_dispatcher, args=(
target=self.run_listener_dispatcher, args=(
self.listener_message_event, self.listener_message_queue,
self.listener_handler_funcs, sender_message_queue,
message_center_name, extra_queues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,32 +256,6 @@ def generate_status_report(self, run_id, edge_id, server_agent_id=None):
status_reporter.server_agent_id = server_agent_id
return status_reporter

@abstractmethod
def generate_protocol_manager(self):
# Generate the protocol manager instance and set the attribute values.
return None

def get_message_runner(self):
if self.message_status_runner is not None:
return self.message_status_runner

self.message_status_runner = self.generate_protocol_manager()
self.message_status_runner.status_queue = self.get_status_queue()

return self.message_status_runner

def get_status_runner(self):
if self.message_status_runner is None:
self.get_message_runner()
if self.message_status_runner is not None:
self.message_status_runner.sender_message_queue = self.message_center.get_sender_message_queue()

if self.message_status_runner is not None:
self.message_status_runner.sender_message_queue = self.message_center.get_sender_message_queue()
return self.message_status_runner

return None

def get_get_protocol_communication_manager(self):
return self.communication_mgr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def __init__(self, message_queue=None):
self.status_listener_message_center_queue = None
self.status_message_center = None
self.status_manager_instance = None
self.status_runner = None
self.is_deployment_status_center = False

def __repr__(self):
Expand All @@ -103,19 +102,15 @@ def __repr__(self):
attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()),
)

def get_status_runner(self):
return None

def start_status_center(self, sender_message_center_queue=None,
listener_message_center_queue=None, is_slave_agent=False):
self.status_queue = multiprocessing.Manager().Queue(-1)
self.status_event = multiprocessing.Event()
self.status_event.clear()
self.status_sender_message_center_queue = sender_message_center_queue
self.status_listener_message_center_queue = listener_message_center_queue
self.status_runner = self.get_status_runner()
target_func = self.status_runner.run_status_dispatcher if not is_slave_agent else \
self.status_runner.run_status_dispatcher_in_slave
target_func = self.run_status_dispatcher if not is_slave_agent else \
self.run_status_dispatcher_in_slave
if platform.system() == "Windows":
self.status_center_process = multiprocessing.Process(
target=target_func, args=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,32 +435,6 @@ def callback_broadcasted_job_status(self, topic, payload):
logging.info("process status in the broadcast job status callback.")
self.process_status(run_id, job_status, self.edge_id)

def generate_protocol_manager(self):
message_status_runner = self._generate_protocol_manager_instance(
self.args, agent_config=self.agent_config
)
message_status_runner.request_json = self.request_json
message_status_runner.disable_client_login = self.disable_client_login
message_status_runner.message_center_name = self.message_center_name
message_status_runner.run_id = self.run_id
message_status_runner.edge_id = self.edge_id
message_status_runner.edge_user_name = self.edge_user_name
message_status_runner.edge_extra_url = self.edge_extra_url
message_status_runner.server_agent_id = self.server_agent_id
message_status_runner.current_device_id = self.current_device_id
message_status_runner.unique_device_id = self.unique_device_id
message_status_runner.subscribed_topics = self.subscribed_topics
message_status_runner.running_request_json = self.running_request_json
message_status_runner.request_json = self.start_request_json
message_status_runner.user_name = self.user_name
message_status_runner.general_edge_id = self.general_edge_id
message_status_runner.server_id = self.server_id
message_status_runner.model_device_server_id = self.model_device_server_id
message_status_runner.model_device_client_edge_id_list = self.model_device_client_edge_id_list
message_status_runner.status_queue = self.get_status_queue()

return message_status_runner

def process_status(self, run_id, status, edge_id, master_id=None):
run_id_str = str(run_id)

Expand Down