From 671441989ce45434945638f3cac8ef087f3c2864 Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Mon, 9 Sep 2024 15:34:26 -0400 Subject: [PATCH 1/6] Add optimize hook, rename calibrate's --- service/models/operations/calibrate.py | 4 +- .../models/operations/ensemble_calibrate.py | 4 +- service/models/operations/optimize.py | 7 ++++ service/utils/rabbitmq.py | 39 ++++++++++++++++++- 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/service/models/operations/calibrate.py b/service/models/operations/calibrate.py index c07f834..bcdb7b9 100644 --- a/service/models/operations/calibrate.py +++ b/service/models/operations/calibrate.py @@ -13,7 +13,7 @@ fetch_and_convert_static_interventions, fetch_and_convert_dynamic_interventions, ) -from utils.rabbitmq import gen_rabbitmq_hook +from utils.rabbitmq import gen_calibrate_rabbitmq_hook from utils.tds import fetch_dataset, fetch_model @@ -77,7 +77,7 @@ def gen_pyciemss_args(self, job_id): # TODO: Test RabbitMQ try: - hook = gen_rabbitmq_hook(job_id) + hook = gen_calibrate_rabbitmq_hook(job_id) except (socket.gaierror, AMQPConnectionError): logging.warning( "%s: Failed to connect to RabbitMQ. Unable to log progress", job_id diff --git a/service/models/operations/ensemble_calibrate.py b/service/models/operations/ensemble_calibrate.py index 3ecf549..4e18363 100644 --- a/service/models/operations/ensemble_calibrate.py +++ b/service/models/operations/ensemble_calibrate.py @@ -11,7 +11,7 @@ from models.base import Dataset, OperationRequest, Timespan, ModelConfig from models.converters import convert_to_solution_mapping -from utils.rabbitmq import gen_rabbitmq_hook +from utils.rabbitmq import gen_calibrate_rabbitmq_hook from utils.tds import fetch_dataset, fetch_model @@ -52,7 +52,7 @@ def gen_pyciemss_args(self, job_id): dataset_path = fetch_dataset(self.dataset.dict(), job_id) try: - hook = gen_rabbitmq_hook(job_id) + hook = gen_calibrate_rabbitmq_hook(job_id) except (socket.gaierror, AMQPConnectionError): logging.warning( "%s: Failed to connect to RabbitMQ. Unable to log progress", job_id diff --git a/service/models/operations/optimize.py b/service/models/operations/optimize.py index 7edc414..01e36fc 100644 --- a/service/models/operations/optimize.py +++ b/service/models/operations/optimize.py @@ -2,6 +2,7 @@ from typing import ClassVar, List, Optional from enum import Enum +from utils.rabbitmq import OptimizeHook import numpy as np import torch @@ -151,6 +152,11 @@ def gen_pyciemss_args(self, job_id): if step_size is not None and solver_method == "euler": solver_options["step_size"] = step_size + total_possible_iterations = extra_options.get("maxiter") * extra_options.get( + "maxfeval" + ) + progress_hook = OptimizeHook(job_id, total_possible_iterations) + return { "model_path_or_json": amr_path, "logging_step_size": self.logging_step_size, @@ -173,6 +179,7 @@ def gen_pyciemss_args(self, job_id): "n_samples_ouu": n_samples_ouu, "solver_method": solver_method, "solver_options": solver_options, + "progress_hook": progress_hook, **extra_options, } diff --git a/service/utils/rabbitmq.py b/service/utils/rabbitmq.py index b8b0120..b26402b 100644 --- a/service/utils/rabbitmq.py +++ b/service/utils/rabbitmq.py @@ -52,7 +52,7 @@ def callback(ch, method, properties, body): channel.start_consuming() -def gen_rabbitmq_hook(job_id): +def gen_calibrate_rabbitmq_hook(job_id): connection = pika.BlockingConnection( conn_config, ) @@ -68,3 +68,40 @@ def hook(progress, loss): ) return hook + + +class OptimizeHook: + def __init__(self, job_id, total_possible_iterations): + connection = pika.BlockingConnection( + conn_config, + ) + self.channel = connection.channel() + self.job_id = job_id + self.result = [] + self.step = 0 + self.total_possible_iterations = total_possible_iterations + + def __call__(self, current_results): + self.step += 1 + print( + "job_id: ", + self.job_id, + " progress: ", + self.step, + " current_results: ", + current_results.tolist(), + " total_possible_iterations: ", + self.total_possible_iterations, + ) + self.channel.basic_publish( + exchange="", + routing_key="simulation-status", + body=json.dumps( + { + "job_id": self.job_id, + "progress": self.step, + "current_results": current_results.tolist(), + "total_possible_iterations": self.total_possible_iterations, + } + ), + ) From ff4666f0feb03406376479b55a087e3e941e421d Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Tue, 10 Sep 2024 09:41:35 -0400 Subject: [PATCH 2/6] Correction to total_possible_iterations --- service/models/operations/optimize.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/models/operations/optimize.py b/service/models/operations/optimize.py index 01e36fc..323ca95 100644 --- a/service/models/operations/optimize.py +++ b/service/models/operations/optimize.py @@ -152,9 +152,9 @@ def gen_pyciemss_args(self, job_id): if step_size is not None and solver_method == "euler": solver_options["step_size"] = step_size - total_possible_iterations = extra_options.get("maxiter") * extra_options.get( - "maxfeval" - ) + total_possible_iterations = ( + extra_options.get("maxiter") + 1 + ) * extra_options.get("maxfeval") progress_hook = OptimizeHook(job_id, total_possible_iterations) return { From c7aa7a96487eeb60df1b7f587dacade5d4d23bd9 Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Tue, 10 Sep 2024 14:52:39 -0400 Subject: [PATCH 3/6] removing hook's print --- service/utils/rabbitmq.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/service/utils/rabbitmq.py b/service/utils/rabbitmq.py index b26402b..0edbba5 100644 --- a/service/utils/rabbitmq.py +++ b/service/utils/rabbitmq.py @@ -83,16 +83,6 @@ def __init__(self, job_id, total_possible_iterations): def __call__(self, current_results): self.step += 1 - print( - "job_id: ", - self.job_id, - " progress: ", - self.step, - " current_results: ", - current_results.tolist(), - " total_possible_iterations: ", - self.total_possible_iterations, - ) self.channel.basic_publish( exchange="", routing_key="simulation-status", From 4b502b232ccfb17ebce054205819568d7730fe32 Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Wed, 11 Sep 2024 14:44:13 -0400 Subject: [PATCH 4/6] Add types to rabbitmq messages. --- service/utils/rabbitmq.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/service/utils/rabbitmq.py b/service/utils/rabbitmq.py index 0edbba5..9a1667e 100644 --- a/service/utils/rabbitmq.py +++ b/service/utils/rabbitmq.py @@ -63,7 +63,12 @@ def hook(progress, loss): exchange="", routing_key="simulation-status", body=json.dumps( - {"job_id": job_id, "progress": progress, "loss": str(loss)} + { + "job_id": job_id, + "type": "calibrate", + "progress": progress, + "loss": str(loss), + } ), ) @@ -77,6 +82,7 @@ def __init__(self, job_id, total_possible_iterations): ) self.channel = connection.channel() self.job_id = job_id + self.type = "optimize" self.result = [] self.step = 0 self.total_possible_iterations = total_possible_iterations @@ -90,6 +96,7 @@ def __call__(self, current_results): { "job_id": self.job_id, "progress": self.step, + "type": self.type, "current_results": current_results.tolist(), "total_possible_iterations": self.total_possible_iterations, } From fc490c14b02cdf59f9d4d09fc4a3f2c057b98f5b Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Wed, 11 Sep 2024 16:23:43 -0400 Subject: [PATCH 5/6] surround optimize hook in try --- service/models/operations/optimize.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/service/models/operations/optimize.py b/service/models/operations/optimize.py index 323ca95..fe64fff 100644 --- a/service/models/operations/optimize.py +++ b/service/models/operations/optimize.py @@ -3,6 +3,10 @@ from typing import ClassVar, List, Optional from enum import Enum from utils.rabbitmq import OptimizeHook +from pika.exceptions import AMQPConnectionError +import socket +import logging + import numpy as np import torch @@ -155,7 +159,12 @@ def gen_pyciemss_args(self, job_id): total_possible_iterations = ( extra_options.get("maxiter") + 1 ) * extra_options.get("maxfeval") - progress_hook = OptimizeHook(job_id, total_possible_iterations) + try: + progress_hook = OptimizeHook(job_id, total_possible_iterations) + except (socket.gaierror, AMQPConnectionError): + logging.warning( + "%s: Failed to connect to RabbitMQ. Unable to log progress", job_id + ) return { "model_path_or_json": amr_path, From 7af125be8ba249018a7fa5548e628390e0530e8e Mon Sep 17 00:00:00 2001 From: Tom Szendrey Date: Wed, 11 Sep 2024 16:39:12 -0400 Subject: [PATCH 6/6] defining progress_hook in exception case. --- service/models/operations/optimize.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/service/models/operations/optimize.py b/service/models/operations/optimize.py index fe64fff..b63695b 100644 --- a/service/models/operations/optimize.py +++ b/service/models/operations/optimize.py @@ -166,6 +166,10 @@ def gen_pyciemss_args(self, job_id): "%s: Failed to connect to RabbitMQ. Unable to log progress", job_id ) + # Log progress hook when unable to connect - for testing purposes. + def progress_hook(current_results): + logging.info(f"Optimize current results: {current_results.tolist()}") + return { "model_path_or_json": amr_path, "logging_step_size": self.logging_step_size,