From 4c4483c03576b750627e246cd6c7df7b860c07ae Mon Sep 17 00:00:00 2001 From: Tyler Burton Date: Tue, 5 Nov 2024 16:42:37 -0600 Subject: [PATCH] Revert "adds job cancel methods, route and template UI" This reverts commit 0a32ddf7e2af05b800fc609ec0988778856aa6f2. --- app/__init__.py | 6 +- app/routes.py | 23 +-- app/scripts/load_manager.py | 124 +++++++++++++++ app/templates/view_job_data.html | 13 +- app/templates/view_source_data.html | 3 +- harvester/lib/load_manager.py | 148 ------------------ tests/conftest.py | 8 +- .../app/fixtures/task_manager_all_tasks.json | 44 ------ tests/integration/app/test_load_manager.py | 105 +++---------- 9 files changed, 156 insertions(+), 318 deletions(-) create mode 100644 app/scripts/load_manager.py delete mode 100644 harvester/lib/load_manager.py delete mode 100644 tests/integration/app/fixtures/task_manager_all_tasks.json diff --git a/app/__init__.py b/app/__init__.py index f50b345..2c9dc54 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -7,11 +7,9 @@ from flask_migrate import Migrate from app.filters import usa_icon -from harvester.lib.load_manager import LoadManager +from app.scripts.load_manager import load_manager from database.models import db -load_manager = LoadManager() - load_dotenv() @@ -37,7 +35,7 @@ def create_app(): with app.app_context(): db.create_all() - load_manager.start() + load_manager() return app diff --git a/app/routes.py b/app/routes.py index ced5e2a..c004984 100644 --- a/app/routes.py +++ b/app/routes.py @@ -14,7 +14,7 @@ from flask import Blueprint, flash, redirect, render_template, request, session, url_for from jinja2_fragments.flask import render_block -from harvester.lib.load_manager import LoadManager +from app.scripts.load_manager import schedule_first_job, trigger_manual_job from database.interface import HarvesterDBInterface from . import htmx @@ -32,8 +32,6 @@ db = HarvesterDBInterface() -load_manager = LoadManager() - # Login authentication load_dotenv() CLIENT_ID = os.getenv("CLIENT_ID") @@ -431,7 +429,7 @@ def add_harvest_source(): if request.is_json: source = db.add_harvest_source(request.json) - job_message = load_manager.schedule_first_job(source.id) + job_message = schedule_first_job(source.id) if source and job_message: return { "message": f"Added new harvest source with ID: {source.id}. {job_message}" @@ -442,7 +440,7 @@ def add_harvest_source(): if form.validate_on_submit(): new_source = make_new_source_contract(form) source = db.add_harvest_source(new_source) - job_message = load_manager.schedule_first_job(source.id) + job_message = schedule_first_job(source.id) if source and job_message: flash(f"Updated source with ID: {source.id}. {job_message}") else: @@ -554,7 +552,7 @@ def edit_harvest_source(source_id: str): if form.validate_on_submit(): new_source_data = make_new_source_contract(form) source = db.update_harvest_source(source_id, new_source_data) - job_message = load_manager.schedule_first_job(source.id) + job_message = schedule_first_job(source.id) if source and job_message: flash(f"Updated source with ID: {source.id}. {job_message}") else: @@ -620,7 +618,7 @@ def delete_harvest_source(source_id): ### Trigger Harvest @mod.route("/harvest_source/harvest/", methods=["GET"]) def trigger_harvest_source(source_id): - message = load_manager.trigger_manual_job(source_id) + message = trigger_manual_job(source_id) flash(message) return redirect(f"/harvest_source/{source_id}") @@ -710,15 +708,6 @@ def delete_harvest_job(job_id): return result -@mod.route("/harvest_job/cancel/", methods=["GET", "POST"]) -@login_required -def cancel_harvest_job(job_id): - """Cancels a harvest job""" - message = load_manager.stop_job(job_id) - flash(message) - return redirect(f"/harvest_job/{job_id}") - - ### Get Job Errors by Type @mod.route("/harvest_job//errors/", methods=["GET"]) def get_harvest_errors_by_job(job_id, error_type): @@ -758,7 +747,6 @@ def get_harvest_records(): records = db.pget_harvest_records(page) return db._to_dict(records) - @mod.route("/harvest_record//raw", methods=["GET"]) def get_harvest_record_raw(record_id=None): record = db.get_harvest_record(record_id) @@ -771,7 +759,6 @@ def get_harvest_record_raw(record_id=None): else: return {"error": "Not Found"}, 404 - ### Add record @mod.route("/harvest_record/add", methods=["POST", "GET"]) def add_harvest_record(): diff --git a/app/scripts/load_manager.py b/app/scripts/load_manager.py new file mode 100644 index 0000000..d5b4710 --- /dev/null +++ b/app/scripts/load_manager.py @@ -0,0 +1,124 @@ +import logging +import os +from datetime import datetime + +from database.interface import HarvesterDBInterface +from harvester.lib.cf_handler import CFHandler +from harvester.utils.general_utils import create_future_date + +CF_API_URL = os.getenv("CF_API_URL") +CF_SERVICE_USER = os.getenv("CF_SERVICE_USER") +CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH") +HARVEST_RUNNER_APP_GUID = os.getenv("HARVEST_RUNNER_APP_GUID") + +MAX_TASKS_COUNT = 3 + +interface = HarvesterDBInterface() + +logger = logging.getLogger("harvest_admin") + + +def create_cf_handler(): + # check for correct env vars to init CFHandler + if not CF_API_URL or not CF_SERVICE_USER or not CF_SERVICE_AUTH: + logger.info("CFHandler is not configured correctly. Check your env vars.") + return + return CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH) + + +def create_task(job_id, cf_handler=None): + task_contract = { + "app_guuid": HARVEST_RUNNER_APP_GUID, + "command": f"python harvester/harvest.py {job_id}", + "task_id": f"harvest-job-{job_id}", + } + if cf_handler is None: + cf_handler = create_cf_handler() + + cf_handler.start_task(**task_contract) + updated_job = interface.update_harvest_job(job_id, {"status": "in_progress"}) + message = f"Updated job {updated_job.id} to in_progress" + logger.info(message) + return message + + +def trigger_manual_job(source_id): + source = interface.get_harvest_source(source_id) + jobs_in_progress = interface.get_all_harvest_jobs_by_filter( + {"harvest_source_id": source.id, "status": "in_progress"} + ) + if len(jobs_in_progress): + return ( + f"Can't trigger harvest. Job {jobs_in_progress[0].id} already in progress." + ) + job_data = interface.add_harvest_job( + { + "harvest_source_id": source.id, + "status": "new", + "date_created": datetime.now(), + } + ) + if job_data: + logger.info( + f"Created new manual harvest job: for {job_data.harvest_source_id}." + ) + return create_task(job_data.id) + + +def schedule_first_job(source_id): + future_jobs = interface.get_new_harvest_jobs_by_source_in_future(source_id) + # delete any future scheduled jobs + for job in future_jobs: + interface.delete_harvest_job(job.id) + logger.info(f"Deleted harvest job: {job.id} for source {source_id}.") + # then schedule next job + return schedule_next_job(source_id) + + +def schedule_next_job(source_id): + source = interface.get_harvest_source(source_id) + if source.frequency != "manual": + # schedule new future job + job_data = interface.add_harvest_job( + { + "harvest_source_id": source.id, + "status": "new", + "date_created": create_future_date(source.frequency), + } + ) + message = f"Scheduled new harvest job: for {job_data.harvest_source_id} at {job_data.date_created}." # noqa E501 + logger.info(message) + return message + else: + return "No job scheduled for manual source." + + +def load_manager(): + # confirm CF_INSTANCE_INDEX == 0. we don't want multiple instances starting jobs + if os.getenv("CF_INSTANCE_INDEX") != "0": + logger.info("CF_INSTANCE_INDEX is not set or not equal to zero") + return + + cf_handler = create_cf_handler() + + # get new jobs older than now + jobs = interface.get_new_harvest_jobs_in_past() + + # get list of running tasks + running_tasks = cf_handler.get_all_running_app_tasks(HARVEST_RUNNER_APP_GUID) + + # confirm tasks < MAX_JOBS_COUNT or bail + if running_tasks >= MAX_TASKS_COUNT: + logger.info( + f"{running_tasks} running_tasks >= max tasks count ({MAX_TASKS_COUNT})." + ) + return + else: + slots = MAX_TASKS_COUNT - running_tasks + + # invoke cf_task with next job(s) + # then mark that job(s) as running in the DB + logger.info("Load Manager :: Updated Harvest Jobs") + for job in jobs[:slots]: + create_task(job.id, cf_handler) + schedule_next_job(job.harvest_source_id) diff --git a/app/templates/view_job_data.html b/app/templates/view_job_data.html index ec6ea8b..718bb14 100644 --- a/app/templates/view_job_data.html +++ b/app/templates/view_job_data.html @@ -27,18 +27,7 @@

Job Info

{% endfor %} - {% if session['user'] and data.harvest_job.status == "in_progress" %} -
- -
- {% endif %} -
+

Job Error Table

{% if not data.harvest_job.errors %} No job errors found diff --git a/app/templates/view_source_data.html b/app/templates/view_source_data.html index 19e442c..2ad8f8c 100644 --- a/app/templates/view_source_data.html +++ b/app/templates/view_source_data.html @@ -10,8 +10,7 @@ {% endblock %} {% block content %} -