From 99f85d0daa16d886d4c7d1ebb2260e91ff00e8ca Mon Sep 17 00:00:00 2001 From: wojtekzyla <108660584+wojtekzyla@users.noreply.github.com> Date: Thu, 23 May 2024 14:50:57 +0200 Subject: [PATCH] feat: change workflow after clicking 'Apply changes' button (#82) --- CHANGELOG.md | 1 + .../apply_changes/apply_changes.py | 6 +- .../apply_changes/handling_chain.py | 69 +++++++--- .../SC4SNMP_UI_backend/apply_changes/tasks.py | 23 +++- .../post_endpoints/test_post_apply_changes.py | 128 ++++++++++++------ 5 files changed, 158 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 223178b..2b00ec5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Changed - add error handling for apply changes action +- after clicking 'Apply changes' workflow is initially attempting to create new job immediately, if it is impossible, schedule it for the future ## [1.0.2] diff --git a/backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py b/backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py index b220f9d..d6ba532 100644 --- a/backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py +++ b/backend/SC4SNMP_UI_backend/apply_changes/apply_changes.py @@ -43,11 +43,13 @@ def __init__(self) -> None: mongo_config_collection.update_one( { "previous_job_start_time": {"$exists": True}, - "currently_scheduled": {"$exists": True}} + "currently_scheduled": {"$exists": True}, + "task_id": {"$exists": True}} ,{ "$set":{ "previous_job_start_time": None, - "currently_scheduled": False + "currently_scheduled": False, + "task_id": None } }, upsert=True diff --git a/backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py b/backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py index 595933c..297b15b 100644 --- a/backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py +++ b/backend/SC4SNMP_UI_backend/apply_changes/handling_chain.py @@ -2,7 +2,9 @@ import ruamel.yaml from flask import current_app from SC4SNMP_UI_backend import mongo_client -from SC4SNMP_UI_backend.apply_changes.tasks import run_job +from SC4SNMP_UI_backend.apply_changes.tasks import run_job, get_job_config +from SC4SNMP_UI_backend.apply_changes.kubernetes_job import create_job +from kubernetes.client import ApiException import datetime import os @@ -13,6 +15,7 @@ VALUES_DIRECTORY = os.getenv("VALUES_DIRECTORY", "") VALUES_FILE = os.getenv("VALUES_FILE", "") KEEP_TEMP_FILES = os.getenv("KEEP_TEMP_FILES", "false") +JOB_NAMESPACE = os.getenv("JOB_NAMESPACE", "sc4snmp") mongo_config_collection = mongo_client.sc4snmp.config_collection mongo_groups = mongo_client.sc4snmp.groups_ui mongo_inventory = mongo_client.sc4snmp.inventory_ui @@ -138,31 +141,55 @@ def handle(self, request: dict = None): :return: pass dictionary with job_delay in seconds to the next handler """ record = list(mongo_config_collection.find())[0] - last_update = record["previous_job_start_time"] - if last_update is None: - # If it's the first time that the job is run (record in mongo_config_collection has been created - # in ApplyChanges class and last_update attribute is None) then job delay should be equal to - # CHANGES_INTERVAL_SECONDS. Update the mongo record with job state accordingly. - job_delay = CHANGES_INTERVAL_SECONDS + schedule_new_job = True + # get_job_config return job configuration in "job" variable and BatchV1Api from kubernetes client + job, batch_v1 = get_job_config() + if job is None or batch_v1 is None: + raise ValueError("CheckJobHandler: Job configuration is empty") + try: + # Try creating a new kubernetes job immediately. If the previous job is still present in the namespace, + # ApiException will be thrown. + create_job(batch_v1, job, JOB_NAMESPACE) + task_id = record["task_id"] + if task_id is not None: + # revoke existing Celery task with the previously scheduled job + current_app.extensions["celery"].control.revoke(task_id, + terminate=True, signal='SIGKILL') mongo_config_collection.update_one({"_id": record["_id"]}, - {"$set": {"previous_job_start_time": datetime.datetime.utcnow()}}) - # time from the last update + {"$set": {"previous_job_start_time": datetime.datetime.utcnow(), + "currently_scheduled": False, + "task_id": None}}) + job_delay = 1 time_difference = 0 - else: + schedule_new_job = False + except ApiException: # Check how many seconds have elapsed since the last time that the job was run. If the time difference - # is greater than CHANGES_INTERVAL_SECONDS then job can be run immediately. Otherwise, calculate how + # is greater than CHANGES_INTERVAL_SECONDS then job can be scheduled within 1 second. Otherwise, calculate how # many seconds are left until minimum time difference between updates (CHANGES_INTERVAL_SECONDS). - current_time = datetime.datetime.utcnow() - delta = current_time - last_update - time_difference = delta.total_seconds() - if time_difference > CHANGES_INTERVAL_SECONDS: - job_delay = 1 + last_update = record["previous_job_start_time"] + if last_update is None: + # If it's the first time that the job is run (record in mongo_config_collection has been created + # in ApplyChanges class and last_update attribute is None) but the previous job is still in the namespace + # then job delay should be equal to CHANGES_INTERVAL_SECONDS. + # Update the mongo record with job state accordingly. + job_delay = CHANGES_INTERVAL_SECONDS + mongo_config_collection.update_one({"_id": record["_id"]}, + {"$set": {"previous_job_start_time": datetime.datetime.utcnow()}}) + # time from the last update + time_difference = 0 else: - job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference) + current_time = datetime.datetime.utcnow() + delta = current_time - last_update + time_difference = delta.total_seconds() + if time_difference > CHANGES_INTERVAL_SECONDS: + job_delay = 1 + else: + job_delay = int(CHANGES_INTERVAL_SECONDS - time_difference) result = { "job_delay": job_delay, - "time_from_last_update": time_difference + "time_from_last_update": time_difference, + "schedule_new_job": schedule_new_job } current_app.logger.info(f"CheckJobHandler: {result}") @@ -175,11 +202,11 @@ def handle(self, request: dict): ScheduleHandler schedules the kubernetes job with updated sc4snmp configuration """ record = list(mongo_config_collection.find())[0] - if not record["currently_scheduled"]: + if not record["currently_scheduled"] and request["schedule_new_job"]: # If the task isn't currently scheduled, schedule it and update its state in mongo. + async_result = run_job.apply_async(countdown=request["job_delay"], queue='apply_changes') mongo_config_collection.update_one({"_id": record["_id"]}, - {"$set": {"currently_scheduled": True}}) - run_job.apply_async(countdown=request["job_delay"], queue='apply_changes') + {"$set": {"currently_scheduled": True, "task_id": async_result.id}}) current_app.logger.info( f"ScheduleHandler: scheduling new task with the delay of {request['job_delay']} seconds.") else: diff --git a/backend/SC4SNMP_UI_backend/apply_changes/tasks.py b/backend/SC4SNMP_UI_backend/apply_changes/tasks.py index 2e5bfed..4428d25 100644 --- a/backend/SC4SNMP_UI_backend/apply_changes/tasks.py +++ b/backend/SC4SNMP_UI_backend/apply_changes/tasks.py @@ -15,8 +15,11 @@ JOB_CONFIG_PATH = os.getenv("JOB_CONFIG_PATH", "/config/job_config.yaml") celery_logger = get_task_logger(__name__) -@shared_task() -def run_job(): +def get_job_config(): + """ + :return: job - configuration of the job + batch_v1 - BatchV1Api object from kubernetes client + """ job = None batch_v1 = None with open(JOB_CONFIG_PATH, encoding="utf-8") as file: @@ -26,6 +29,13 @@ def run_job(): config.load_incluster_config() batch_v1 = client.BatchV1Api() job = create_job_object(config_file) + return job, batch_v1 + +@shared_task() +def run_job(): + job, batch_v1 = get_job_config() + if job is None or batch_v1 is None: + raise ValueError("Scheduled kubernetes job: Job configuration is empty") with MongoClient(MONGO_URI) as connection: try_creating = True @@ -39,8 +49,9 @@ def run_job(): try: record = list(connection.sc4snmp.config_collection.find())[0] connection.sc4snmp.config_collection.update_one({"_id": record["_id"]}, - {"$set": {"previous_job_start_time": datetime.datetime.utcnow(), - "currently_scheduled": False}}) + {"$set": {"previous_job_start_time": datetime.datetime.utcnow(), + "currently_scheduled": False, + "task_id": None}}) except Exception as e: celery_logger.info(f"Error occurred while updating job state after job creation: {str(e)}") except ApiException: @@ -50,6 +61,6 @@ def run_job(): celery_logger.info(f"Kubernetes job was not created. Max retries ({JOB_CREATION_RETRIES}) exceeded.") record = list(connection.sc4snmp.config_collection.find())[0] connection.sc4snmp.config_collection.update_one({"_id": record["_id"]}, - {"$set": {"currently_scheduled": False}}) + {"$set": {"currently_scheduled": False, "task_id": None}}) else: - time.sleep(10) \ No newline at end of file + time.sleep(10) diff --git a/backend/tests/ui_handling/post_endpoints/test_post_apply_changes.py b/backend/tests/ui_handling/post_endpoints/test_post_apply_changes.py index 58cebec..7da516a 100644 --- a/backend/tests/ui_handling/post_endpoints/test_post_apply_changes.py +++ b/backend/tests/ui_handling/post_endpoints/test_post_apply_changes.py @@ -1,11 +1,14 @@ from unittest import mock -from unittest.mock import call +from unittest.mock import call, Mock from bson import ObjectId from copy import copy import ruamel import datetime import os +from kubernetes.client import ApiException from SC4SNMP_UI_backend.apply_changes.handling_chain import TMP_FILE_PREFIX +import pytest +from SC4SNMP_UI_backend.apply_changes.apply_changes import SingletonMeta VALUES_TEST_DIRECTORY = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../yamls_for_tests/values_test") @@ -58,6 +61,12 @@ def reset_generated_values(): yaml.dump(original_data, file) +@pytest.fixture(autouse=True) +def reset_singleton(): + yield # The code after yield is executed after the test + SingletonMeta._instances = {} + + common_id = "635916b2c8cb7a15f28af40a" groups_collection = [ @@ -171,24 +180,26 @@ def reset_generated_values(): @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.VALUES_FILE", "values.yaml") @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.KEEP_TEMP_FILES", "true") @mock.patch("datetime.datetime") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.create_job") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.get_job_config") @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.run_job") @mock.patch("pymongo.collection.Collection.update_one") @mock.patch("pymongo.collection.Collection.find") -def test_apply_changes_first_call(m_find, m_update, m_run_job, m_datetime, client): +def test_apply_changes_first_call_no_job_in_namespace(m_find, m_update, m_run_job, m_get_job_config, m_create_job, m_datetime, client): datetime_object = datetime.datetime(2020, 7, 10, 10, 30, 0, 0) m_datetime.utcnow = mock.Mock(return_value=datetime_object) collection = { "_id": ObjectId(common_id), "previous_job_start_time": None, - "currently_scheduled": False + "currently_scheduled": False, + "task_id": None } m_find.side_effect = [ groups_collection, # call from SaveConfigToFileHandler profiles_collection, # call from SaveConfigToFileHandler inventory_collection, # call from SaveConfigToFileHandler - [collection], - [collection], - [collection] + [collection], # call from CheckJobHandler + [collection], # call from ScheduleHandler ] calls_find = [ call(), @@ -196,104 +207,141 @@ def test_apply_changes_first_call(m_find, m_update, m_run_job, m_datetime, clien call() ] calls_update = [ - call({"_id": ObjectId(common_id)},{"$set": {"previous_job_start_time": datetime_object}}), - call({"_id": ObjectId(common_id)},{"$set": {"currently_scheduled": True}}) + call({'previous_job_start_time': {'$exists': True}, 'currently_scheduled': {'$exists': True}, + 'task_id': {'$exists': True}}, + {'$set': {'previous_job_start_time': None, 'currently_scheduled': False, 'task_id': None}}, upsert=True), # call from ApplyChanges + call({"_id": ObjectId(common_id)}, {"$set": {"previous_job_start_time": datetime_object, "currently_scheduled": False, "task_id": None}}) # call from CheckJobHandler ] - apply_async_calls = [ - call(countdown=300, queue='apply_changes') + create_job_calls = [ + call("val1", "val2", "sc4snmp") ] + m_get_job_config.return_value = ("val2", "val1") + m_create_job.return_value = None m_run_job.apply_async.return_value = None m_update.return_value = None response = client.post("/apply-changes") m_find.assert_has_calls(calls_find) + assert m_get_job_config.called m_update.assert_has_calls(calls_update) - m_run_job.apply_async.assert_has_calls(apply_async_calls) - assert response.json == {"message": "Configuration will be updated in approximately 300 seconds."} + m_create_job.assert_has_calls(create_job_calls) + assert not m_run_job.apply_async.called + assert response.json == {"message": "Configuration will be updated in approximately 1 seconds."} reference_files, generated_files = return_generated_and_reference_files() for ref_f, gen_f in zip(reference_files, generated_files): assert ref_f == gen_f delete_generated_files() reset_generated_values() + @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.VALUES_DIRECTORY", VALUES_TEST_DIRECTORY) @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.TMP_DIR", VALUES_TEST_DIRECTORY) -@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.datetime") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.VALUES_FILE", "values.yaml") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.KEEP_TEMP_FILES", "true") +@mock.patch("datetime.datetime") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.create_job") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.get_job_config") @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.run_job") @mock.patch("pymongo.collection.Collection.update_one") @mock.patch("pymongo.collection.Collection.find") -def test_apply_changes_job_currently_scheduled(m_find, m_update, m_run_job, m_datetime, client): - datetime_object_old = datetime.datetime(2020, 7, 10, 10, 27, 10, 0) - datetime_object_new = datetime.datetime(2020, 7, 10, 10, 30, 0, 0) - m_datetime.datetime.utcnow = mock.Mock(return_value=datetime_object_new) +def test_apply_changes_first_call_job_present_in_namespace(m_find, m_update, m_run_job, m_get_job_config, m_create_job, m_datetime, client): + datetime_object = datetime.datetime(2020, 7, 10, 10, 30, 0, 0) + m_datetime.utcnow = mock.Mock(return_value=datetime_object) collection = { "_id": ObjectId(common_id), - "previous_job_start_time": datetime_object_old, - "currently_scheduled": True + "previous_job_start_time": None, + "currently_scheduled": False, + "task_id": None } m_find.side_effect = [ - groups_collection, # call from SaveConfigToFileHandler + groups_collection, # call from SaveConfigToFileHandler profiles_collection, # call from SaveConfigToFileHandler inventory_collection, # call from SaveConfigToFileHandler - [collection], - [collection], - [collection] + [collection], # call from CheckJobHandler + [collection], # call from ScheduleHandler ] calls_find = [ call(), call(), call() ] - m_run_job.apply_async.return_value = None + calls_update = [ + call({'previous_job_start_time': {'$exists': True}, 'currently_scheduled': {'$exists': True}, + 'task_id': {'$exists': True}}, + {'$set': {'previous_job_start_time': None, 'currently_scheduled': False, 'task_id': None}}, upsert=True), # call from ApplyChanges + call({"_id": ObjectId(common_id)},{"$set": {"previous_job_start_time": datetime_object}}), # call from CheckJobHandler + call({"_id": ObjectId(common_id)}, {"$set": {"currently_scheduled": True, "task_id": "id_val"}}) # call from ScheduleHandler + + ] + apply_async_calls = [ + call(countdown=300, queue='apply_changes') + ] + create_job_calls = [ + call("val1", "val2", "sc4snmp") + ] + + m_get_job_config.return_value = ("val2", "val1") + m_create_job.side_effect = ApiException() + + apply_async_result = Mock() + apply_async_result.id = "id_val" + m_run_job.apply_async.return_value = apply_async_result m_update.return_value = None response = client.post("/apply-changes") m_find.assert_has_calls(calls_find) - assert not m_run_job.apply_async.called - assert response.json == {"message": "Configuration will be updated in approximately 130 seconds."} + assert m_get_job_config.called + m_update.assert_has_calls(calls_update) + m_create_job.assert_has_calls(create_job_calls) + m_run_job.apply_async.assert_has_calls(apply_async_calls) + assert response.json == {"message": "Configuration will be updated in approximately 300 seconds."} + reference_files, generated_files = return_generated_and_reference_files() + for ref_f, gen_f in zip(reference_files, generated_files): + assert ref_f == gen_f delete_generated_files() reset_generated_values() - @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.VALUES_DIRECTORY", VALUES_TEST_DIRECTORY) @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.TMP_DIR", VALUES_TEST_DIRECTORY) @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.datetime") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.create_job") +@mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.get_job_config") @mock.patch("SC4SNMP_UI_backend.apply_changes.handling_chain.run_job") @mock.patch("pymongo.collection.Collection.update_one") @mock.patch("pymongo.collection.Collection.find") -def test_apply_changes_new_job_delay_1(m_find, m_update, m_run_job, m_datetime, client): - datetime_object_old = datetime.datetime(2020, 7, 10, 10, 20, 0, 0) +def test_apply_changes_job_currently_scheduled_job_present_in_namespace(m_find, m_update, m_run_job, m_get_job_config, m_create_job, m_datetime, client): + datetime_object_old = datetime.datetime(2020, 7, 10, 10, 27, 10, 0) datetime_object_new = datetime.datetime(2020, 7, 10, 10, 30, 0, 0) m_datetime.datetime.utcnow = mock.Mock(return_value=datetime_object_new) collection = { "_id": ObjectId(common_id), "previous_job_start_time": datetime_object_old, - "currently_scheduled": False + "currently_scheduled": True, + "task_id": "test_id" } m_find.side_effect = [ groups_collection, # call from SaveConfigToFileHandler profiles_collection, # call from SaveConfigToFileHandler inventory_collection, # call from SaveConfigToFileHandler - [collection], - [collection], - [collection] + [collection], # call from CheckJobHandler + [collection], # call from ScheduleHandler ] calls_find = [ call(), call(), call() ] - apply_async_calls = [ - call(countdown=1, queue='apply_changes') + create_job_calls = [ + call("val1", "val2", "sc4snmp") ] - - m_run_job.apply_async.return_value = None - m_update.return_value = None + m_get_job_config.return_value = ("val2", "val1") + m_create_job.side_effect = ApiException() response = client.post("/apply-changes") m_find.assert_has_calls(calls_find) - m_run_job.apply_async.assert_has_calls(apply_async_calls) - assert response.json == {"message": "Configuration will be updated in approximately 1 seconds."} + m_create_job.assert_has_calls(create_job_calls) + assert not m_run_job.apply_async.called + assert response.json == {"message": "Configuration will be updated in approximately 130 seconds."} delete_generated_files() reset_generated_values()