diff --git a/app/routes.py b/app/routes.py index ba6362b..ba5bcb3 100644 --- a/app/routes.py +++ b/app/routes.py @@ -248,13 +248,13 @@ def cli_remove_harvest_source(id): # Helper Functions def make_new_source_contract(form): return { + "organization_id": form.organization_id.data, "name": form.name.data, + "url": form.url.data, "notification_emails": form.notification_emails.data, "frequency": form.frequency.data, - "url": form.url.data, "schema_type": form.schema_type.data, "source_type": form.source_type.data, - "organization_id": form.organization_id.data, } diff --git a/app/scripts/load_manager.py b/app/scripts/load_manager.py index fe0bb30..d5b4710 100644 --- a/app/scripts/load_manager.py +++ b/app/scripts/load_manager.py @@ -10,7 +10,6 @@ CF_SERVICE_USER = os.getenv("CF_SERVICE_USER") CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH") HARVEST_RUNNER_APP_GUID = os.getenv("HARVEST_RUNNER_APP_GUID") -CF_INSTANCE_INDEX = os.getenv("CF_INSTANCE_INDEX") MAX_TASKS_COUNT = 3 @@ -18,11 +17,6 @@ logger = logging.getLogger("harvest_admin") -TASK_SIZE_ENUM = { - "medium": (2048, 768), - "large": (4096, 1536), -} - def create_cf_handler(): # check for correct env vars to init CFHandler @@ -32,7 +26,7 @@ def create_cf_handler(): return CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH) -def create_task(job_id, size, cf_handler=None): +def create_task(job_id, cf_handler=None): task_contract = { "app_guuid": HARVEST_RUNNER_APP_GUID, "command": f"python harvester/harvest.py {job_id}", @@ -41,10 +35,6 @@ def create_task(job_id, size, cf_handler=None): if cf_handler is None: cf_handler = create_cf_handler() - if size != "small" and size in TASK_SIZE_ENUM: - task_contract["memory_in_mb"] = TASK_SIZE_ENUM[size][0] - task_contract["disk_in_mb"] = TASK_SIZE_ENUM[size][1] - cf_handler.start_task(**task_contract) updated_job = interface.update_harvest_job(job_id, {"status": "in_progress"}) message = f"Updated job {updated_job.id} to in_progress" @@ -72,7 +62,7 @@ def trigger_manual_job(source_id): logger.info( f"Created new manual harvest job: for {job_data.harvest_source_id}." ) - return create_task(job_data.id, source.size) + return create_task(job_data.id) def schedule_first_job(source_id): @@ -130,6 +120,5 @@ def load_manager(): # then mark that job(s) as running in the DB logger.info("Load Manager :: Updated Harvest Jobs") for job in jobs[:slots]: - source = interface.get_harvest_source(job.harvest_source_id) - create_task(job.id, source.size, cf_handler) + create_task(job.id, cf_handler) schedule_next_job(job.harvest_source_id) diff --git a/database/models.py b/database/models.py index 8a37455..2c33af8 100644 --- a/database/models.py +++ b/database/models.py @@ -34,11 +34,12 @@ class Organization(db.Model): class HarvestSource(db.Model): __tablename__ = "harvest_source" - name = db.Column(db.String, nullable=False) - notification_emails = db.Column(db.ARRAY(db.String)) organization_id = db.Column( db.String(36), db.ForeignKey("organization.id"), nullable=False ) + name = db.Column(db.String, nullable=False) + url = db.Column(db.String, nullable=False, unique=True) + notification_emails = db.Column(db.ARRAY(db.String)) frequency = db.Column( Enum( "manual", @@ -51,11 +52,8 @@ class HarvestSource(db.Model): nullable=False, index=True, ) - url = db.Column(db.String, nullable=False, unique=True) - size = db.Column(Enum("small", "medium", "large", name="size")) schema_type = db.Column(db.String, nullable=False) source_type = db.Column(db.String, nullable=False) - status = db.Column(db.String) jobs = db.relationship( "HarvestJob", backref="source", cascade="all, delete-orphan", lazy=True ) diff --git a/harvester/lib/cf_handler.py b/harvester/lib/cf_handler.py index be0020f..f5d2eef 100644 --- a/harvester/lib/cf_handler.py +++ b/harvester/lib/cf_handler.py @@ -1,3 +1,4 @@ +import os from cloudfoundry_client.client import CloudFoundryClient from cloudfoundry_client.v3.tasks import TaskManager @@ -14,10 +15,10 @@ def setup(self): self.client.init_with_user_credentials(self.user, self.password) self.task_mgr = TaskManager(self.url, self.client) - def start_task(self, app_guuid, command, task_id, memory_in_mb=512, disk_in_mb=512): - return self.task_mgr.create( - app_guuid, command, task_id, memory_in_mb, disk_in_mb - ) + def start_task(self, app_guuid, command, task_id): + TASK_MEMORY = os.getenv("HARVEST_RUNNER_TASK_MEM", "4096") + TASK_DISK = os.getenv("HARVEST_RUNNER_TASK_DISK", "1536") + return self.task_mgr.create(app_guuid, command, task_id, TASK_MEMORY, TASK_DISK) def stop_task(self, task_id): return self.task_mgr.cancel(task_id) diff --git a/tests/conftest.py b/tests/conftest.py index 12ba78a..b55217f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -104,11 +104,9 @@ def source_data_dcatus(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", - "size": "small", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -120,11 +118,9 @@ def source_data_dcatus_2(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", - "size": "medium", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus_2.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -136,11 +132,9 @@ def source_data_dcatus_same_title(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", - "size": "small", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus_same_title.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -157,11 +151,9 @@ def source_data_waf(organization_data: dict) -> dict: "notification_emails": "wafl@example.com", "organization_id": organization_data["id"], "frequency": "daily", - "size": "small", "url": f"{HARVEST_SOURCE_URL}/waf/", "schema_type": "type1", "source_type": "waf", - "status": "active", } @@ -176,7 +168,6 @@ def source_data_dcatus_invalid(organization_data: dict) -> dict: "url": f"{HARVEST_SOURCE_URL}/dcatus/missing_title.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -242,7 +233,6 @@ def source_data_dcatus_single_record(organization_data: dict) -> dict: "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus_single_record.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -279,7 +269,6 @@ def source_data_dcatus_bad_url(organization_data: dict) -> dict: "url": f"{HARVEST_SOURCE_URL}/dcatus/bad_url.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } @@ -303,7 +292,6 @@ def source_data_dcatus_invalid_records(organization_data) -> dict: "url": "http://localhost/dcatus/missing_title.json", "schema_type": "type1", "source_type": "dcatus", - "status": "active", } diff --git a/tests/integration/app/test_load_manager.py b/tests/integration/app/test_load_manager.py index d885d64..8252c31 100644 --- a/tests/integration/app/test_load_manager.py +++ b/tests/integration/app/test_load_manager.py @@ -168,7 +168,6 @@ def test_manual_job_doesnt_affect_scheduled_jobs( jobs = interface_no_jobs.get_new_harvest_jobs_by_source_in_future( source_data_dcatus["id"] ) - assert len(jobs) == 1 assert source_data_dcatus["frequency"] == "daily" assert jobs[0].date_created == datetime.now() + timedelta(days=1) @@ -176,7 +175,6 @@ def test_manual_job_doesnt_affect_scheduled_jobs( jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( {"harvest_source_id": source_data_dcatus["id"]} ) - assert len(jobs) == 2 assert jobs[0].date_created == datetime.now() + timedelta(days=1) assert jobs[0].status == "new" @@ -215,3 +213,28 @@ def test_dont_create_new_job_if_job_already_in_progress( assert jobs[1].date_created == datetime.now() assert jobs[1].status == "in_progress" + + @patch("harvester.lib.cf_handler.CloudFoundryClient") + @patch("harvester.lib.cf_handler.TaskManager") + def test_assert_env_var_changes_task_size( + self, TMMock, CFCMock, interface_no_jobs, source_data_dcatus, monkeypatch + ): + trigger_manual_job(source_data_dcatus["id"]) + start_task_mock = TMMock.return_value.create + assert start_task_mock.call_args[0][3] == "4096" + assert start_task_mock.call_args[0][4] == "1536" + + # clear out in progress jobs + jobs = interface_no_jobs.get_all_harvest_jobs_by_filter( + {"harvest_source_id": source_data_dcatus["id"]} + ) + interface_no_jobs.delete_harvest_job(jobs[0].id) + + # set custom env vars + monkeypatch.setenv("HARVEST_RUNNER_TASK_MEM", "1234") + monkeypatch.setenv("HARVEST_RUNNER_TASK_DISK", "1234") + + trigger_manual_job(source_data_dcatus["id"]) + start_task_mock = TMMock.return_value.create + assert start_task_mock.call_args[0][3] == "1234" + assert start_task_mock.call_args[0][4] == "1234"