diff --git a/dispatcher/backend/src/common/constants.py b/dispatcher/backend/src/common/constants.py index 43d6c5e9f..fc8ac106c 100644 --- a/dispatcher/backend/src/common/constants.py +++ b/dispatcher/backend/src/common/constants.py @@ -3,6 +3,12 @@ from common.enum import SchedulePeriodicity + +def refreshable_constant(fn): + """Refreshable constants helper for those we have interest to live update""" + return fn + + OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl") MESSAGE_VALIDITY = 60 # number of seconds before a message expire @@ -66,7 +72,11 @@ # using the following, it is possible to automate # the update of a whitelist of workers IPs on Wasabi (S3 provider) # enable this feature (default is off) -USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) +# Nota: this is a refreshable constant so that it can be dynamically updated +# (including in tests) +USES_WORKERS_IPS_WHITELIST = refreshable_constant( + lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) +) MAX_WORKER_IP_CHANGES_PER_DAY = 4 # wasabi URL with credentials to update policy WASABI_URL = os.getenv("WASABI_URL", "") diff --git a/dispatcher/backend/src/common/external.py b/dispatcher/backend/src/common/external.py index 8a4d76fd3..5df271ab5 100644 --- a/dispatcher/backend/src/common/external.py +++ b/dispatcher/backend/src/common/external.py @@ -27,12 +27,11 @@ logger = logging.getLogger(__name__) -def update_workers_whitelist(): +def update_workers_whitelist(session: so.Session): """update whitelist of workers on external services""" - update_wasabi_whitelist(build_workers_whitelist()) + IpUpdater.update_fn(build_workers_whitelist(session=session)) -@dbsession def build_workers_whitelist(session: so.Session) -> typing.List[str]: """list of worker IP adresses and networks (text) to use as whitelist""" wl_networks = [] @@ -150,6 +149,10 @@ def get_statement(): ) +class IpUpdater: + update_fn = update_wasabi_whitelist + + @dbsession def advertise_books_to_cms(task_id: UUID, session: so.Session): """inform openZIM CMS of all created ZIMs in the farm for this task diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 347192bc3..c58080ddd 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -35,14 +35,14 @@ logger = logging.getLogger(__name__) -def record_ip_change(worker_name): +def record_ip_change(session: so.Session, worker_name: str): """record that this worker changed its IP and trigger whitelist changes""" today = datetime.date.today() # counts and limits are per-day so reset it if date changed if today != WorkersIpChangesCounts.today: WorkersIpChangesCounts.reset() if WorkersIpChangesCounts.add(worker_name) <= MAX_WORKER_IP_CHANGES_PER_DAY: - update_workers_whitelist() + update_workers_whitelist(session) else: logger.error( f"Worker {worker_name} IP changes for {today} " @@ -229,15 +229,16 @@ def get(self, session: so.Session, token: AccessToken.Payload): worker = dbm.Worker.get(session, worker_name, WorkerNotFound) if worker.user.username == token.username: worker.last_seen = getnow() - previous_ip = str(worker.last_ip) - worker.last_ip = worker_ip - - # flush to DB so that record_ip_change has access to updated IP - session.flush() # IP changed since last encounter - if USES_WORKERS_IPS_WHITELIST and previous_ip != worker_ip: - record_ip_change(worker_name) + if str(worker.last_ip) != worker_ip: + logger.info( + f"Worker IP changed detected for {worker_name}: " + f"IP changed from {worker.last_ip} to {worker_ip}" + ) + worker.last_ip = worker_ip + if USES_WORKERS_IPS_WHITELIST(): + record_ip_change(session=session, worker_name=worker_name) request_args = WorkerRequestedTaskSchema().load(request_args) diff --git a/dispatcher/backend/src/tests/conftest.py b/dispatcher/backend/src/tests/conftest.py new file mode 100644 index 000000000..230be5dae --- /dev/null +++ b/dispatcher/backend/src/tests/conftest.py @@ -0,0 +1,12 @@ +from typing import Generator + +import pytest +from sqlalchemy.orm import Session as OrmSession + +from db import Session + + +@pytest.fixture +def dbsession() -> Generator[OrmSession, None, None]: + with Session.begin() as session: + yield session diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 48b1971c9..fac50461a 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,11 +1,14 @@ +import os +from typing import List + import pytest -from common.external import build_workers_whitelist +from common.external import IpUpdater, build_workers_whitelist class TestWorkersCommon: - def test_build_workers_whitelist(self, workers): - whitelist = build_workers_whitelist() + def test_build_workers_whitelist(self, workers, dbsession): + whitelist = build_workers_whitelist(session=dbsession) # - 4 because: # 2 workers have a duplicate IP # 1 worker has an IP missing @@ -206,3 +209,59 @@ def test_checkin_another_user( # response.get_json()["error"] # == "worker with same name already exists for another user" # ) + + +class TestWorkerRequestedTasks: + def test_requested_task_worker_as_admin(self, client, access_token, worker): + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={"Authorization": access_token}, + ) + assert response.status_code == 200 + + def test_requested_task_worker_as_worker(self, client, make_access_token, worker): + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={"Authorization": make_access_token(worker["username"], "worker")}, + ) + assert response.status_code == 200 + + new_ip_address = "88.88.88.88" + + def custom_ip_update(self, ip_addresses: List): + self.ip_updated = True + assert TestWorkerRequestedTasks.new_ip_address in ip_addresses + + def test_requested_task_worker_update_ip_whitelist( + self, client, make_access_token, worker + ): + self.ip_updated = False + IpUpdater.update_fn = self.custom_ip_update + os.environ["USES_WORKERS_IPS_WHITELIST"] = "1" + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={ + "Authorization": make_access_token(worker["username"], "worker"), + "X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address, + }, + ) + assert response.status_code == 200 + assert self.ip_updated