diff --git a/.gitignore b/.gitignore index 8c37ca7b..23fb528c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,9 +21,10 @@ node_modules/ # openstack ( s3 mock ) tmp/ -# vscode debugger -.vscode/* -!.vscode/launch.json -.env +# env +.env.secret requirements.txt +# vscode debugger +.vscode/* +!.vscode/launch.json \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py index 998d0c12..5633a245 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -6,14 +6,12 @@ from flask_migrate import Migrate from database.models import db +from app.scripts.load_manager import load_manager load_dotenv() -DATABASE_URI = os.getenv("DATABASE_URI") - def create_app(testing=False): - app = Flask(__name__, static_url_path="", static_folder="static") if testing: @@ -35,4 +33,7 @@ def create_app(testing=False): register_routes(app) + with app.app_context(): + load_manager() + return app diff --git a/app/scripts/load_manager.py b/app/scripts/load_manager.py new file mode 100644 index 00000000..63c78351 --- /dev/null +++ b/app/scripts/load_manager.py @@ -0,0 +1,62 @@ +import os +from database.interface import HarvesterDBInterface +from harvester.utils import CFHandler + +DATABASE_URI = os.getenv("DATABASE_URI") +CF_API_URL = os.getenv("CF_API_URL") +CF_SERVICE_USER = os.getenv("CF_SERVICE_USER") +CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH") +LM_RUNNER_APP_GUID = os.getenv("LM_RUNNER_APP_GUID") +CF_INSTANCE_INDEX = os.getenv("CF_INSTANCE_INDEX") + +LM_MAX_TASKS_COUNT = 3 + +interface = HarvesterDBInterface() +cf_handler = CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH) + + +def create_task(jobId): + return { + "app_guuid": LM_RUNNER_APP_GUID, + "command": f"python harvest.py {jobId}", + "task_id": f"harvest-job-{jobId}", + } + + +def sort_jobs(jobs): + return sorted(jobs, key=lambda x: x["status"], reverse=True) + + +def load_manager(): + # confirm CF_INSTANCE_INDEX == 0 or bail + if os.getenv("CF_INSTANCE_INDEX") != "0": + return + + # filter harvestjobs by pending / pending_manual + jobs = interface.get_harvest_jobs_by_faceted_filter( + "status", ["pending", "pending_manual"] + ) + + # get current list of all tasks + current_tasks = cf_handler.get_all_app_tasks(LM_RUNNER_APP_GUID) + # filter out in_process tasks + running_tasks = cf_handler.get_all_running_tasks(current_tasks) + + # confirm tasks < MAX_JOBS_COUNT or bail + if LM_MAX_TASKS_COUNT < running_tasks: + return + else: + slots = LM_MAX_TASKS_COUNT - running_tasks + + # sort jobs by pending_manual first + sorted_jobs = sort_jobs(jobs) + + # slice off jobs to invoke + jobs_to_invoke = sorted_jobs[:slots] + + # invoke cf_task with next job(s) + # then mark that job(s) as running in the DB + for job in jobs_to_invoke: + task_contract = create_task(job["id"]) + cf_handler.start_task(**task_contract) + interface.update_harvest_job(job["id"], {"status": "in_progress"}) diff --git a/database/interface.py b/database/interface.py index f8dea785..78894bb3 100644 --- a/database/interface.py +++ b/database/interface.py @@ -1,4 +1,5 @@ -from sqlalchemy import create_engine, inspect +import os +from sqlalchemy import create_engine, inspect, or_ from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import scoped_session, sessionmaker @@ -10,7 +11,7 @@ Organization, ) -from . import DATABASE_URI +DATABASE_URI = os.getenv("DATABASE_URI") class HarvesterDBInterface: @@ -32,6 +33,8 @@ def __init__(self, session=None): @staticmethod def _to_dict(obj): + if obj is None: + return None return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} def add_organization(self, org_data): @@ -48,16 +51,10 @@ def add_organization(self, org_data): def get_all_organizations(self): orgs = self.db.query(Organization).all() - if orgs is None: - return None - else: - orgs_data = [HarvesterDBInterface._to_dict(org) for org in orgs] - return orgs_data + return [HarvesterDBInterface._to_dict(org) for org in orgs] def get_organization(self, org_id): result = self.db.query(Organization).filter_by(id=org_id).first() - if result is None: - return None return HarvesterDBInterface._to_dict(result) def update_organization(self, org_id, updates): @@ -99,31 +96,17 @@ def add_harvest_source(self, source_data): def get_all_harvest_sources(self): harvest_sources = self.db.query(HarvestSource).all() - if harvest_sources is None: - return None - else: - harvest_sources_data = [ - HarvesterDBInterface._to_dict(source) for source in harvest_sources - ] - return harvest_sources_data + return [HarvesterDBInterface._to_dict(source) for source in harvest_sources] def get_harvest_source(self, source_id): result = self.db.query(HarvestSource).filter_by(id=source_id).first() - if result is None: - return None return HarvesterDBInterface._to_dict(result) def get_harvest_source_by_org(self, org_id): harvest_source = ( self.db.query(HarvestSource).filter_by(organization_id=org_id).all() ) - if harvest_source is None: - return None - else: - harvest_source_data = [ - HarvesterDBInterface._to_dict(src) for src in harvest_source - ] - return harvest_source_data + return [HarvesterDBInterface._to_dict(src) for src in harvest_source] def update_harvest_source(self, source_id, updates): try: @@ -164,21 +147,24 @@ def add_harvest_job(self, job_data): def get_harvest_job(self, job_id): result = self.db.query(HarvestJob).filter_by(id=job_id).first() - if result is None: - return None return HarvesterDBInterface._to_dict(result) + def get_harvest_jobs_by_filter(self, filter): + harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all() + harvest_jobs_data = [HarvesterDBInterface._to_dict(job) for job in harvest_jobs] + return harvest_jobs_data + + def get_harvest_jobs_by_faceted_filter(self, attr, values): + query_list = [getattr(HarvestJob, attr) == value for value in values] + harvest_jobs = self.db.query(HarvestJob).filter(or_(*query_list)).all() + harvest_jobs_data = [HarvesterDBInterface._to_dict(job) for job in harvest_jobs] + return harvest_jobs_data + def get_harvest_job_by_source(self, source_id): harvest_job = ( self.db.query(HarvestJob).filter_by(harvest_source_id=source_id).all() ) - if harvest_job is None: - return None - else: - harvest_job_data = [ - HarvesterDBInterface._to_dict(job) for job in harvest_job - ] - return harvest_job_data + return [HarvesterDBInterface._to_dict(job) for job in harvest_job] def update_harvest_job(self, job_id, updates): try: @@ -219,19 +205,11 @@ def add_harvest_error(self, error_data): def get_harvest_error(self, error_id): result = self.db.query(HarvestError).filter_by(id=error_id).first() - if result is None: - return None return HarvesterDBInterface._to_dict(result) def get_harvest_error_by_job(self, job_id): harvest_errors = self.db.query(HarvestError).filter_by(harvest_job_id=job_id) - if harvest_errors is None: - return None - else: - harvest_errors_data = [ - HarvesterDBInterface._to_dict(err) for err in harvest_errors - ] - return harvest_errors_data + return [HarvesterDBInterface._to_dict(err) for err in harvest_errors] def add_harvest_record(self, record_data): try: @@ -264,31 +242,17 @@ def add_harvest_records(self, records_data: list) -> bool: def get_harvest_record(self, record_id): result = self.db.query(HarvestRecord).filter_by(id=record_id).first() - if result is None: - return None return HarvesterDBInterface._to_dict(result) def get_harvest_record_by_job(self, job_id): harvest_records = self.db.query(HarvestRecord).filter_by(harvest_job_id=job_id) - if harvest_records is None: - return None - else: - harvest_records_data = [ - HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records - ] - return harvest_records_data + return [HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records] def get_harvest_record_by_source(self, source_id): harvest_records = self.db.query(HarvestRecord).filter_by( harvest_source_id=source_id ) - if harvest_records is None: - return None - else: - harvest_records_data = [ - HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records - ] - return harvest_records_data + return [HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records] def get_source_by_jobid(self, jobid): harvest_job = self.db.query(HarvestJob).filter_by(id=jobid).first() diff --git a/harvester/utils.py b/harvester/utils.py index 99d311b2..48d7a924 100644 --- a/harvester/utils.py +++ b/harvester/utils.py @@ -20,7 +20,6 @@ def get_title_from_fgdc(xml_str: str) -> str: def parse_args(args): - parser = argparse.ArgumentParser( prog="Harvest Runner", description="etl harvest sources" ) @@ -114,6 +113,7 @@ def __init__(self, url: str, user: str, password: str): self.url = url self.user = user self.password = password + self.setup() def setup(self): self.client = CloudFoundryClient(self.url) @@ -136,7 +136,6 @@ def get_all_running_tasks(self, tasks): return sum(1 for _ in filter(lambda task: task["state"] == "RUNNING", tasks)) def read_recent_app_logs(self, app_guuid, task_id=None): - app = self.client.v2.apps[app_guuid] logs = filter(lambda lg: task_id in lg, [str(log) for log in app.recent_logs()]) return "\n".join(logs) diff --git a/pyproject.toml b/pyproject.toml index 90c4b127..2751dd43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datagov-harvesting-logic" -version = "0.4.0" +version = "0.4.1" description = "" # authors = [ # {name = "Jin Sun", email = "jin.sun@gsa.gov"}, diff --git a/tests/conftest.py b/tests/conftest.py index 30097471..109d13a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -188,6 +188,24 @@ def source_data_dcatus_invalid_records_job( } +@pytest.fixture +def interface_with_multiple_jobs( + interface, + source_data_dcatus: dict, +): + statuses = ["pending", "pending_manual", "in_progress", "complete"] + source_ids = ["1234", "abcd"] + jobs = [ + {"status": status, "harvest_source_id": source} + for status in statuses + for source in source_ids + ] + for job in jobs: + interface.add_harvest_job(job) + + return interface + + @pytest.fixture def internal_compare_data(job_data_dcatus: dict) -> dict: # ruff: noqa: E501 @@ -249,7 +267,6 @@ def internal_compare_data(job_data_dcatus: dict) -> dict: @pytest.fixture def cf_handler() -> CFHandler: - url = os.getenv("CF_API_URL") user = os.getenv("CF_SERVICE_USER") password = os.getenv("CF_SERVICE_AUTH") diff --git a/tests/unit/database/test_db.py b/tests/unit/database/test_db.py index 8b2e0182..da23fb3e 100644 --- a/tests/unit/database/test_db.py +++ b/tests/unit/database/test_db.py @@ -95,7 +95,6 @@ def test_add_harvest_record( job_data_dcatus, record_data_dcatus, ): - interface.add_organization(organization_data) source = interface.add_harvest_source(source_data_dcatus) harvest_job = interface.add_harvest_job(job_data_dcatus) @@ -113,7 +112,6 @@ def test_add_harvest_records( job_data_dcatus, record_data_dcatus, ): - interface.add_organization(organization_data) interface.add_harvest_source(source_data_dcatus) interface.add_harvest_job(job_data_dcatus) @@ -122,3 +120,33 @@ def test_add_harvest_records( success = interface.add_harvest_records(records) assert success is True assert len(interface.get_all_harvest_records()) == 10 + + def test_add_harvest_job_with_id(self, interface, job_data_dcatus): + job = interface.add_harvest_job(job_data_dcatus) + assert job.id == job_data_dcatus["id"] + assert job.status == job_data_dcatus["status"] + assert job.harvest_source_id == job_data_dcatus["harvest_source_id"] + + def test_add_harvest_job_without_id(self, interface, job_data_dcatus): + job_data_dcatus_id = job_data_dcatus["id"] + del job_data_dcatus["id"] + job = interface.add_harvest_job(job_data_dcatus) + assert job.id + assert job.id != job_data_dcatus_id + assert job.status == job_data_dcatus["status"] + assert job.harvest_source_id == job_data_dcatus["harvest_source_id"] + + def test_get_harvest_jobs_by_filter(self, interface_with_multiple_jobs): + filters = {"status": "pending", "harvest_source_id": "1234"} + filtered_list = interface_with_multiple_jobs.get_harvest_jobs_by_filter(filters) + assert len(filtered_list) == 1 + assert filtered_list[0]["status"] == "pending" + assert filtered_list[0]["harvest_source_id"] == "1234" + + def test_filter_jobs_by_faceted_filter(self, interface_with_multiple_jobs): + faceted_list = interface_with_multiple_jobs.get_harvest_jobs_by_faceted_filter( + "status", ["pending", "pending_manual"] + ) + assert len(faceted_list) == 4 + assert len([x for x in faceted_list if x["status"] == "pending"]) == 2 + assert len([x for x in faceted_list if x["harvest_source_id"] == "1234"]) == 2 diff --git a/tests/unit/test_load_manager.py b/tests/unit/test_load_manager.py new file mode 100644 index 00000000..ac80ed58 --- /dev/null +++ b/tests/unit/test_load_manager.py @@ -0,0 +1,150 @@ +import pytest + +from unittest.mock import patch +from app.scripts.load_manager import load_manager, sort_jobs +from app.scripts.load_manager import CFHandler +from app.scripts.load_manager import HarvesterDBInterface + + +@pytest.fixture +def mock_good_cf_index(monkeypatch): + monkeypatch.setenv("CF_INSTANCE_INDEX", "0") + + +@pytest.fixture +def mock_bad_cf_index(monkeypatch): + monkeypatch.setenv("CF_INSTANCE_INDEX", "1") + + +@pytest.fixture(autouse=True) +def mock_lm_config(monkeypatch): + monkeypatch.setenv("LM_RUNNER_APP_GUID", "f4ab7f86-bee0-44fd-8806-1dca7f8e215a") + + +class TestLoadManager: + @patch.object(HarvesterDBInterface, "update_harvest_job") + @patch.object(CFHandler, "start_task") + @patch.object(CFHandler, "get_all_running_tasks") + @patch.object(CFHandler, "get_all_app_tasks") + @patch.object(HarvesterDBInterface, "get_harvest_jobs_by_faceted_filter") + def test_load_manager_invokes_tasks( + self, + db_get_harvest_jobs_by_faceted_filter_mock, + cf_get_all_app_tasks_mock, + cf_get_all_running_tasks_mock, + cf_start_task_mock, + db_update_harvest_job_mock, + job_data_dcatus, + job_data_waf, + dhl_cf_task_data, + mock_good_cf_index, + ): + db_get_harvest_jobs_by_faceted_filter_mock.return_value = [ + job_data_dcatus, + job_data_waf, + ] + cf_get_all_app_tasks_mock.return_value = [] + cf_get_all_running_tasks_mock.return_value = 2 + cf_start_task_mock.return_value = "ok" + db_update_harvest_job_mock.return_value = "ok" + load_manager() + + # assert all mocks are called when env vars in place + assert db_get_harvest_jobs_by_faceted_filter_mock.called + assert cf_get_all_app_tasks_mock.called + assert cf_get_all_running_tasks_mock.called + assert cf_start_task_mock.called + assert db_update_harvest_job_mock.called + + # assert derived slot val is same as calls to cf_start_task_mock + assert ( + 3 - cf_get_all_running_tasks_mock.return_value + ) == cf_start_task_mock.call_count + + @patch.object(HarvesterDBInterface, "update_harvest_job") + @patch.object(CFHandler, "start_task") + @patch.object(CFHandler, "get_all_running_tasks") + @patch.object(CFHandler, "get_all_app_tasks") + @patch.object(HarvesterDBInterface, "get_harvest_jobs_by_faceted_filter") + def test_load_manager_hits_task_limit( + self, + db_get_harvest_jobs_by_faceted_filter_mock, + cf_get_all_app_tasks_mock, + cf_get_all_running_tasks_mock, + cf_start_task_mock, + db_update_harvest_job_mock, + job_data_dcatus, + job_data_waf, + dhl_cf_task_data, + mock_good_cf_index, + ): + db_get_harvest_jobs_by_faceted_filter_mock.return_value = [ + job_data_dcatus, + job_data_waf, + ] + cf_get_all_app_tasks_mock.return_value = [] + cf_get_all_running_tasks_mock.return_value = 3 + cf_start_task_mock.return_value = "ok" + db_update_harvest_job_mock.return_value = "ok" + load_manager() + + # assert certain mocks are called + assert db_get_harvest_jobs_by_faceted_filter_mock.called + assert cf_get_all_app_tasks_mock.called + assert cf_get_all_running_tasks_mock.called + # assert certain mocks are not called when + # cf_get_all_running_tasks_mock is at limit + assert not cf_start_task_mock.called + assert not db_update_harvest_job_mock.called + + assert ( + 3 - cf_get_all_running_tasks_mock.return_value + ) == cf_start_task_mock.call_count + + @patch.object(HarvesterDBInterface, "update_harvest_job") + @patch.object(CFHandler, "start_task") + @patch.object(CFHandler, "get_all_running_tasks") + @patch.object(CFHandler, "get_all_app_tasks") + @patch.object(HarvesterDBInterface, "get_harvest_jobs_by_faceted_filter") + def test_load_manager_bails_on_incorrect_index( + self, + db_get_harvest_jobs_by_faceted_filter_mock, + cf_get_all_app_tasks_mock, + cf_get_all_running_tasks_mock, + cf_start_task_mock, + db_update_harvest_job_mock, + job_data_dcatus, + job_data_waf, + dhl_cf_task_data, + mock_bad_cf_index, + ): + db_get_harvest_jobs_by_faceted_filter_mock.return_value = [ + job_data_dcatus, + job_data_dcatus, + ] + cf_get_all_app_tasks_mock.return_value = [] + cf_get_all_running_tasks_mock.return_value = 2 + cf_start_task_mock.return_value = "ok" + db_update_harvest_job_mock.return_value = "ok" + load_manager() + + # assert all mocks are NOT called when env vars NOT in place + assert not db_get_harvest_jobs_by_faceted_filter_mock.called + assert not cf_get_all_app_tasks_mock.called + assert not cf_get_all_app_tasks_mock.called + assert not cf_get_all_running_tasks_mock.called + + def test_sort_jobs(self): + jobs = [ + {"status": "pending"}, + {"status": "pending"}, + {"status": "pending_manual"}, + ] + + sorted_jobs = sort_jobs(jobs) + + assert sorted_jobs == [ + {"status": "pending_manual"}, + {"status": "pending"}, + {"status": "pending"}, + ]