From f44e0c4d1567dd36236b3e1a3e3520a42b9b0dae Mon Sep 17 00:00:00 2001 From: Thomas Carmet <8408330+tcarmet@users.noreply.github.com> Date: Thu, 23 May 2024 13:40:27 -0700 Subject: [PATCH] PTFE-1738 introducing job that will warn about leaks (#606) --- runner_manager/jobs/leaks.py | 34 +++++++++++++++++++ runner_manager/jobs/startup.py | 17 ++++++++-- tests/unit/jobs/test_healthchecks.py | 51 ---------------------------- tests/unit/jobs/test_leaks.py | 46 +++++++++++++++++++++++++ tests/unit/jobs/test_startup.py | 4 +++ 5 files changed, 99 insertions(+), 53 deletions(-) create mode 100644 runner_manager/jobs/leaks.py create mode 100644 tests/unit/jobs/test_leaks.py diff --git a/runner_manager/jobs/leaks.py b/runner_manager/jobs/leaks.py new file mode 100644 index 00000000..b7b87c31 --- /dev/null +++ b/runner_manager/jobs/leaks.py @@ -0,0 +1,34 @@ +import logging + +from redis_om import NotFoundError + +from runner_manager import RunnerGroup + +log = logging.getLogger(__name__) + + +def runner_leaks(pk: str) -> bool: + """ + Job that will look for leaks in the runner group + and for now it will just log the runner's name that + could be considered as a leak. + + returns: + bool: True if a leak was found, False otherwise + """ + + try: + group: RunnerGroup = RunnerGroup.get(pk) + except NotFoundError: + log.error(f"Runner group {pk} not found") + return False + backend_runners = group.backend.list() + group_runners = group.get_runners() + if len(backend_runners) > len(group_runners): + log.warning(f"Runner group {pk} has leaks") + for runner in backend_runners: + if runner not in group_runners: + log.warning(f"Runner {runner} could be considered as a leak") + return True + else: + return False diff --git a/runner_manager/jobs/startup.py b/runner_manager/jobs/startup.py index 1052acde..40a7d349 100644 --- a/runner_manager/jobs/startup.py +++ b/runner_manager/jobs/startup.py @@ -10,7 +10,7 @@ from runner_manager.clients.github import GitHub from runner_manager.dependencies import get_github, get_scheduler, get_settings -from runner_manager.jobs import healthcheck +from runner_manager.jobs import healthcheck, leaks from runner_manager.models.runner_group import RunnerGroup from runner_manager.models.settings import Settings @@ -54,7 +54,7 @@ def bootstrap_scheduler( for job in jobs: # Cancel any existing healthcheck jobs job_type = job.meta.get("type") - if job_type == "healthcheck" or job_type == "migrator": + if job_type == "healthcheck" or job_type == "migrator" or job_type == "leaks": log.info(f"Canceling {job_type} job: {job.id}") scheduler.cancel(job) @@ -90,6 +90,19 @@ def bootstrap_scheduler( repeat=None, timeout=settings.healthcheck_interval.total_seconds() / 2, ) + log.info(f"Scheduling leaks job for group {group.name}") + scheduler.schedule( + scheduled_time=datetime.utcnow(), + func=leaks.runner_leaks, + args=[group.pk], + meta={ + "type": "leaks", + "group": group.name, + }, + interval=settings.healthcheck_interval.total_seconds() * 4, + result_ttl=60, + repeat=None, + ) def indexing(): diff --git a/tests/unit/jobs/test_healthchecks.py b/tests/unit/jobs/test_healthchecks.py index 01a65074..f31943d8 100644 --- a/tests/unit/jobs/test_healthchecks.py +++ b/tests/unit/jobs/test_healthchecks.py @@ -2,7 +2,6 @@ from hypothesis import given, settings from hypothesis import strategies as st -from pytest import mark from redis_om import Migrator from rq import Queue @@ -135,53 +134,3 @@ def test_healthcheck_job( settings.timeout_runner, ) assert len(runner_group.get_runners()) == 1 - - -@mark.skip(reason="The check was reverted due to killing healthy runners") -def test_healthcheck_backend_leak( - runner_group: RunnerGroup, - settings: Settings, - queue: Queue, - github: GitHub, - monkeypatch, -): - runner_group.save() - runner = runner_group.create_runner(github) - # monkeypatch the runner_group.backend.list method to return a list - # with the recently created runner and a fake leaked runner - # This will simulate a leak in the backend - leaked_runner = Runner( - name="leak", - busy=False, - runner_group_name=runner_group.name, - runner_group_id=runner_group.id, - ) - monkeypatch.setattr( - "runner_manager.backend.base.BaseBackend.list", - lambda self: [runner, leaked_runner], - ) - # ensure the method is patched and that the group is unaware of the leaked runner - assert len(runner_group.get_runners()) == 1 - assert len(runner_group.backend.list()) == 2 - queue.enqueue( - healthcheck.group, - runner_group.pk, - settings.time_to_live, - settings.timeout_runner, - ) - # the healthcheck job should add the runner to the group - # because the timestamp is not yet expired - assert len(runner_group.get_runners()) == 2 - # Now we will expire the leaked runner - leaked_runner.created_at = datetime.now(timezone.utc) - ( - settings.timeout_runner + timedelta(minutes=1) - ) - leaked_runner.save() - queue.enqueue( - healthcheck.group, - runner_group.pk, - settings.time_to_live, - settings.timeout_runner, - ) - # the healthcheck job should remove the leaked runner - assert len(runner_group.get_runners()) == 1 diff --git a/tests/unit/jobs/test_leaks.py b/tests/unit/jobs/test_leaks.py new file mode 100644 index 00000000..2b58a32a --- /dev/null +++ b/tests/unit/jobs/test_leaks.py @@ -0,0 +1,46 @@ +from rq import Queue + +from runner_manager import RunnerGroup +from runner_manager.clients.github import GitHub +from runner_manager.jobs.leaks import runner_leaks +from runner_manager.models.runner import Runner, RunnerStatus + + +def test_leak_job(runner_group: RunnerGroup, queue: Queue, github: GitHub, monkeypatch): + runner_group.save() + runner = runner_group.create_runner(github) + assert runner in runner_group.get_runners() + runner.status = RunnerStatus.online + runner.busy = True + runner.save() + job = queue.enqueue( + runner_leaks, + runner_group.pk, + ) + # No leaks were found + assert job.return_value() is False + fake_runner: Runner = Runner( + name="fake_runner", + instance_id="fake_instance_id", + busy=False, + status=RunnerStatus.offline, + runner_group_id=runner_group.id, + ) + monkeypatch.setattr( + "runner_manager.backend.base.BaseBackend.list", + lambda self: [runner, fake_runner], + ) + job = queue.enqueue( + runner_leaks, + runner_group.pk, + ) + # fake_runner is a leak since it is not in the group + assert job.return_value() is True + + +def test_leak_job_group_not_found(queue: Queue, github: GitHub): + job = queue.enqueue( + runner_leaks, + "fake_id", + ) + assert job.return_value() is False diff --git a/tests/unit/jobs/test_startup.py b/tests/unit/jobs/test_startup.py index 07c233da..0b0a4d25 100644 --- a/tests/unit/jobs/test_startup.py +++ b/tests/unit/jobs/test_startup.py @@ -59,6 +59,7 @@ def test_scheduler( jobs: List[Job] = scheduler.get_jobs() is_indexing: bool = False is_healthcheck: bool = False + is_runner_leaks: bool = False for job in jobs: job_type = job.meta.get("type") if job_type == "indexing": @@ -66,9 +67,12 @@ def test_scheduler( elif job_type == "healthcheck": assert settings.healthcheck_interval.total_seconds() / 2 == job.timeout is_healthcheck = True + elif job_type == "leaks": + is_runner_leaks = True assert is_indexing is True assert is_healthcheck is True + assert is_runner_leaks is True def test_update_group_sync(settings: Settings, github: GitHub):