From 039852fd88d58c7175ca07ad7f928e6ea988418a Mon Sep 17 00:00:00 2001 From: benoit74 Date: Mon, 27 Nov 2023 09:56:38 +0100 Subject: [PATCH 1/3] Fix update of worker last_seen property --- .../routes/requested_tasks/requested_task.py | 17 +- .../integration/routes/workers/test_worker.py | 160 +++++++++++++----- 2 files changed, 134 insertions(+), 43 deletions(-) diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 68805193..6efd6323 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -223,20 +223,27 @@ def get(self, session: so.Session, token: AccessToken.Payload): # record we've seen a worker, if applicable if token and worker_name: worker = dbm.Worker.get(session, worker_name, WorkerNotFound) + + # Update worker properties only if called as the worker itself, not as an + # admin if worker.user.username == token.username: worker.last_seen = getnow() # IP changed since last encounter - if str(worker.last_ip) != worker_ip: + ip_changed = str(worker.last_ip) != worker_ip + if ip_changed: logger.info( f"Worker IP changed detected for {worker_name}: " f"IP changed from {worker.last_ip} to {worker_ip}" ) worker.last_ip = worker_ip - # commit explicitely since we are not using an explicit transaction, - # and do it before calling Wasabi so that changes are propagated - # quickly and transaction is not blocking - session.commit() + + # commit explicitely since we are not using an explicit transaction, + # and do it before calling Wasabi so that changes are propagated + # quickly and transaction is not blocking + session.commit() + + if ip_changed: if constants.USES_WORKERS_IPS_WHITELIST: try: record_ip_change(session=session, worker_name=worker_name) diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 4767bdc6..57a8c3c9 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,7 +1,8 @@ -from typing import List +from typing import Any, Dict, List import pytest +import db.models as dbm from common import constants from common.external import ExternalIpUpdater, build_workers_whitelist @@ -212,32 +213,130 @@ def test_checkin_another_user( class TestWorkerRequestedTasks: - def test_requested_task_worker_as_admin(self, client, access_token, worker): + @pytest.fixture() + def req_task_query_string(self, worker): + return { + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + } + + @pytest.fixture + def make_headers(self): + def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]: + return { + "Authorization": access_token, + "X-Forwarded-For": client_ip, + } + + return _make_headers + + @pytest.fixture + def admin_headers(self, make_headers, access_token, default_ip): + def _admin_headers( + access_token: str = access_token, client_ip: str = default_ip + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _admin_headers + + @pytest.fixture + def worker_headers(self, make_headers, make_access_token, worker, default_ip): + def _worker_headers( + access_token: str = make_access_token(worker["username"], "worker"), + client_ip: str = default_ip, + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _worker_headers + + @pytest.fixture + def default_ip(self): + return "192.168.1.1" + + @pytest.fixture + def increase_ip(self): + def _increase_ip(prev_ip): + return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}" + + return _increase_ip + + def test_requested_task_worker_as_admin( + self, + client, + worker, + req_task_query_string, + admin_headers, + dbsession, + increase_ip, + ): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=admin_headers(client_ip=increase_ip(last_ip)), + ) + assert response.status_code == 200 + + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are not updated when endpoint is called as admin + assert last_seen == db_worker.last_seen + assert last_ip == db_worker.last_ip + + def test_requested_task_worker_as_worker( + self, + client, + worker, + worker_headers, + req_task_query_string, + increase_ip, + dbsession, + ): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + new_ip = increase_ip(last_ip) + # Worker checks for requested tasks response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={"Authorization": access_token}, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) assert response.status_code == 200 - def test_requested_task_worker_as_worker(self, client, make_access_token, worker): + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are updated in DB when endpoint is called as worker + assert last_seen != db_worker.last_seen + assert last_ip != db_worker.last_ip + assert str(db_worker.last_ip) == new_ip + + # second call will update only the last_seen attribute + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={"Authorization": make_access_token(worker["username"], "worker")}, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) assert response.status_code == 200 + # Refresh current object from DB again + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen has been updated again but not last_ip which did not changed + assert last_seen != db_worker.last_seen + assert str(db_worker.last_ip) == new_ip + @pytest.mark.parametrize( "prev_ip, new_ip, external_update_enabled, external_update_fails," " external_update_called", @@ -251,10 +350,11 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker def test_requested_task_worker_update_ip_whitelist( self, client, - make_access_token, worker, + req_task_query_string, prev_ip, new_ip, + worker_headers, external_update_enabled, external_update_fails, external_update_called, @@ -262,16 +362,8 @@ def test_requested_task_worker_update_ip_whitelist( # call it once to set prev_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={ - "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": prev_ip, - }, + query_string=req_task_query_string, + headers=worker_headers(client_ip=prev_ip), ) assert response.status_code == 200 @@ -293,16 +385,8 @@ def test_requested_task_worker_update_ip_whitelist( # call it once to set next_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={ - "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": new_ip, - }, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) if external_update_fails: assert response.status_code == 503 From 9671321f0dcb79cbeaa1cecc240b11fe659e6254 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Mon, 27 Nov 2023 14:53:41 +0100 Subject: [PATCH 2/3] Split test file --- .../integration/routes/workers/test_worker.py | 216 +---------------- .../routes/workers/test_worker_req_tasks.py | 221 ++++++++++++++++++ 2 files changed, 222 insertions(+), 215 deletions(-) create mode 100644 dispatcher/backend/src/tests/integration/routes/workers/test_worker_req_tasks.py diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 57a8c3c9..2d532ef5 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,10 +1,6 @@ -from typing import Any, Dict, List - import pytest -import db.models as dbm -from common import constants -from common.external import ExternalIpUpdater, build_workers_whitelist +from common.external import build_workers_whitelist class TestWorkersCommon: @@ -210,213 +206,3 @@ def test_checkin_another_user( # response.get_json()["error"] # == "worker with same name already exists for another user" # ) - - -class TestWorkerRequestedTasks: - @pytest.fixture() - def req_task_query_string(self, worker): - return { - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - } - - @pytest.fixture - def make_headers(self): - def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]: - return { - "Authorization": access_token, - "X-Forwarded-For": client_ip, - } - - return _make_headers - - @pytest.fixture - def admin_headers(self, make_headers, access_token, default_ip): - def _admin_headers( - access_token: str = access_token, client_ip: str = default_ip - ) -> Dict[str, Any]: - return make_headers(access_token=access_token, client_ip=client_ip) - - return _admin_headers - - @pytest.fixture - def worker_headers(self, make_headers, make_access_token, worker, default_ip): - def _worker_headers( - access_token: str = make_access_token(worker["username"], "worker"), - client_ip: str = default_ip, - ) -> Dict[str, Any]: - return make_headers(access_token=access_token, client_ip=client_ip) - - return _worker_headers - - @pytest.fixture - def default_ip(self): - return "192.168.1.1" - - @pytest.fixture - def increase_ip(self): - def _increase_ip(prev_ip): - return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}" - - return _increase_ip - - def test_requested_task_worker_as_admin( - self, - client, - worker, - req_task_query_string, - admin_headers, - dbsession, - increase_ip, - ): - # Retrieve current object from DB - db_worker = dbm.Worker.get(dbsession, worker["name"]) - last_seen = db_worker.last_seen - last_ip = db_worker.last_ip - - response = client.get( - "/requested-tasks/worker", - query_string=req_task_query_string, - headers=admin_headers(client_ip=increase_ip(last_ip)), - ) - assert response.status_code == 200 - - # Refresh current object from DB - dbsession.expire(db_worker) - db_worker = dbm.Worker.get(dbsession, worker["name"]) - # last_seen and last_ip are not updated when endpoint is called as admin - assert last_seen == db_worker.last_seen - assert last_ip == db_worker.last_ip - - def test_requested_task_worker_as_worker( - self, - client, - worker, - worker_headers, - req_task_query_string, - increase_ip, - dbsession, - ): - # Retrieve current object from DB - db_worker = dbm.Worker.get(dbsession, worker["name"]) - last_seen = db_worker.last_seen - last_ip = db_worker.last_ip - new_ip = increase_ip(last_ip) - # Worker checks for requested tasks - response = client.get( - "/requested-tasks/worker", - query_string=req_task_query_string, - headers=worker_headers(client_ip=new_ip), - ) - assert response.status_code == 200 - - # Refresh current object from DB - dbsession.expire(db_worker) - db_worker = dbm.Worker.get(dbsession, worker["name"]) - # last_seen and last_ip are updated in DB when endpoint is called as worker - assert last_seen != db_worker.last_seen - assert last_ip != db_worker.last_ip - assert str(db_worker.last_ip) == new_ip - - # second call will update only the last_seen attribute - last_seen = db_worker.last_seen - last_ip = db_worker.last_ip - response = client.get( - "/requested-tasks/worker", - query_string=req_task_query_string, - headers=worker_headers(client_ip=new_ip), - ) - assert response.status_code == 200 - - # Refresh current object from DB again - dbsession.expire(db_worker) - db_worker = dbm.Worker.get(dbsession, worker["name"]) - # last_seen has been updated again but not last_ip which did not changed - assert last_seen != db_worker.last_seen - assert str(db_worker.last_ip) == new_ip - - @pytest.mark.parametrize( - "prev_ip, new_ip, external_update_enabled, external_update_fails," - " external_update_called", - [ - ("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled - ("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed - ("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated - ("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails - ], - ) - def test_requested_task_worker_update_ip_whitelist( - self, - client, - worker, - req_task_query_string, - prev_ip, - new_ip, - worker_headers, - external_update_enabled, - external_update_fails, - external_update_called, - ): - # call it once to set prev_ip - response = client.get( - "/requested-tasks/worker", - query_string=req_task_query_string, - headers=worker_headers(client_ip=prev_ip), - ) - assert response.status_code == 200 - - # check prev_ip has been set - response = client.get("/workers/") - assert response.status_code == 200 - response_data = response.get_json() - for item in response_data["items"]: - if item["name"] != worker["name"]: - continue - assert item["last_ip"] == prev_ip - - # setup custom ip updater to intercept Wasabi operations - updater = IpUpdaterAndChecker(should_fail=external_update_fails) - assert new_ip not in updater.ip_addresses - ExternalIpUpdater.update = updater.ip_update - constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled - - # call it once to set next_ip - response = client.get( - "/requested-tasks/worker", - query_string=req_task_query_string, - headers=worker_headers(client_ip=new_ip), - ) - if external_update_fails: - assert response.status_code == 503 - else: - assert response.status_code == 200 - assert updater.ips_updated == external_update_called - if external_update_called: - assert new_ip in updater.ip_addresses - - # check new_ip has been set (even if ip update is disabled or has failed) - response = client.get("/workers/") - assert response.status_code == 200 - response_data = response.get_json() - for item in response_data["items"]: - if item["name"] != worker["name"]: - continue - assert item["last_ip"] == new_ip - - -class IpUpdaterAndChecker: - """Helper class to intercept Wasabi operations and perform assertions""" - - def __init__(self, should_fail: bool) -> None: - self.ips_updated = False - self.should_fail = should_fail - self.ip_addresses = [] - - def ip_update(self, ip_addresses: List): - if self.should_fail: - raise Exception() - else: - self.ips_updated = True - self.ip_addresses = ip_addresses diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker_req_tasks.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker_req_tasks.py new file mode 100644 index 00000000..4970a028 --- /dev/null +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker_req_tasks.py @@ -0,0 +1,221 @@ +from typing import Any, Dict, List + +import pytest + +import db.models as dbm +from common import constants +from common.external import ExternalIpUpdater + + +class IpUpdaterAndChecker: + """Helper class to intercept Wasabi operations and perform assertions""" + + def __init__(self, should_fail: bool) -> None: + self.ips_updated = False + self.should_fail = should_fail + self.ip_addresses = [] + + def ip_update(self, ip_addresses: List): + if self.should_fail: + raise Exception() + else: + self.ips_updated = True + self.ip_addresses = ip_addresses + + +@pytest.fixture() +def req_task_query_string(worker): + return { + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + } + + +@pytest.fixture +def make_headers(): + def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]: + return { + "Authorization": access_token, + "X-Forwarded-For": client_ip, + } + + return _make_headers + + +@pytest.fixture +def admin_headers(make_headers, access_token, default_ip): + def _admin_headers( + access_token: str = access_token, client_ip: str = default_ip + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _admin_headers + + +@pytest.fixture +def worker_headers(make_headers, make_access_token, worker, default_ip): + def _worker_headers( + access_token: str = make_access_token(worker["username"], "worker"), + client_ip: str = default_ip, + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _worker_headers + + +@pytest.fixture +def default_ip(): + return "192.168.1.1" + + +@pytest.fixture +def increase_ip(): + def _increase_ip(prev_ip): + return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}" + + return _increase_ip + + +def test_requested_task_worker_as_admin( + client, + worker, + req_task_query_string, + admin_headers, + dbsession, + increase_ip, +): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=admin_headers(client_ip=increase_ip(last_ip)), + ) + assert response.status_code == 200 + + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are not updated when endpoint is called as admin + assert last_seen == db_worker.last_seen + assert last_ip == db_worker.last_ip + + +def test_requested_task_worker_as_worker( + client, + worker, + worker_headers, + req_task_query_string, + increase_ip, + dbsession, +): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + new_ip = increase_ip(last_ip) + # Worker checks for requested tasks + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), + ) + assert response.status_code == 200 + + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are updated in DB when endpoint is called as worker + assert last_seen != db_worker.last_seen + assert last_ip != db_worker.last_ip + assert str(db_worker.last_ip) == new_ip + + # second call will update only the last_seen attribute + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), + ) + assert response.status_code == 200 + + # Refresh current object from DB again + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen has been updated again but not last_ip which did not changed + assert last_seen != db_worker.last_seen + assert str(db_worker.last_ip) == new_ip + + +@pytest.mark.parametrize( + "prev_ip, new_ip, external_update_enabled, external_update_fails," + " external_update_called", + [ + ("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled + ("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed + ("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated + ("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails + ], +) +def test_requested_task_worker_update_ip_whitelist( + client, + worker, + req_task_query_string, + prev_ip, + new_ip, + worker_headers, + external_update_enabled, + external_update_fails, + external_update_called, +): + # call it once to set prev_ip + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=worker_headers(client_ip=prev_ip), + ) + assert response.status_code == 200 + + # check prev_ip has been set + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == prev_ip + + # setup custom ip updater to intercept Wasabi operations + updater = IpUpdaterAndChecker(should_fail=external_update_fails) + assert new_ip not in updater.ip_addresses + ExternalIpUpdater.update = updater.ip_update + constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled + + # call it once to set next_ip + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), + ) + if external_update_fails: + assert response.status_code == 503 + else: + assert response.status_code == 200 + assert updater.ips_updated == external_update_called + if external_update_called: + assert new_ip in updater.ip_addresses + + # check new_ip has been set (even if ip update is disabled or has failed) + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == new_ip From b096dfca429a77089e3dc5bfcebcddb399a9e9d7 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Mon, 27 Nov 2023 14:55:33 +0100 Subject: [PATCH 3/3] Enhance comment following review --- .../backend/src/routes/requested_tasks/requested_task.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 6efd6323..77fbd043 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -238,9 +238,9 @@ def get(self, session: so.Session, token: AccessToken.Payload): ) worker.last_ip = worker_ip - # commit explicitely since we are not using an explicit transaction, - # and do it before calling Wasabi so that changes are propagated - # quickly and transaction is not blocking + # commit explicitely last_ip and last_seen changes, since we are not + # using an explicit transaction, and do it before calling Wasabi so + # that changes are propagated quickly and transaction is not blocking session.commit() if ip_changed: