Skip to content

Commit

Permalink
PTFE-1738 introducing job that will warn about leaks (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcarmet authored May 23, 2024
1 parent 1f6fa4a commit f44e0c4
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 53 deletions.
34 changes: 34 additions & 0 deletions runner_manager/jobs/leaks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging

from redis_om import NotFoundError

from runner_manager import RunnerGroup

log = logging.getLogger(__name__)


def runner_leaks(pk: str) -> bool:
"""
Job that will look for leaks in the runner group
and for now it will just log the runner's name that
could be considered as a leak.
returns:
bool: True if a leak was found, False otherwise
"""

try:
group: RunnerGroup = RunnerGroup.get(pk)
except NotFoundError:
log.error(f"Runner group {pk} not found")
return False
backend_runners = group.backend.list()
group_runners = group.get_runners()
if len(backend_runners) > len(group_runners):
log.warning(f"Runner group {pk} has leaks")
for runner in backend_runners:
if runner not in group_runners:
log.warning(f"Runner {runner} could be considered as a leak")
return True
else:
return False
17 changes: 15 additions & 2 deletions runner_manager/jobs/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from runner_manager.clients.github import GitHub
from runner_manager.dependencies import get_github, get_scheduler, get_settings
from runner_manager.jobs import healthcheck
from runner_manager.jobs import healthcheck, leaks
from runner_manager.models.runner_group import RunnerGroup
from runner_manager.models.settings import Settings

Expand Down Expand Up @@ -54,7 +54,7 @@ def bootstrap_scheduler(
for job in jobs:
# Cancel any existing healthcheck jobs
job_type = job.meta.get("type")
if job_type == "healthcheck" or job_type == "migrator":
if job_type == "healthcheck" or job_type == "migrator" or job_type == "leaks":
log.info(f"Canceling {job_type} job: {job.id}")
scheduler.cancel(job)

Expand Down Expand Up @@ -90,6 +90,19 @@ def bootstrap_scheduler(
repeat=None,
timeout=settings.healthcheck_interval.total_seconds() / 2,
)
log.info(f"Scheduling leaks job for group {group.name}")
scheduler.schedule(
scheduled_time=datetime.utcnow(),
func=leaks.runner_leaks,
args=[group.pk],
meta={
"type": "leaks",
"group": group.name,
},
interval=settings.healthcheck_interval.total_seconds() * 4,
result_ttl=60,
repeat=None,
)


def indexing():
Expand Down
51 changes: 0 additions & 51 deletions tests/unit/jobs/test_healthchecks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from hypothesis import given, settings
from hypothesis import strategies as st
from pytest import mark
from redis_om import Migrator
from rq import Queue

Expand Down Expand Up @@ -135,53 +134,3 @@ def test_healthcheck_job(
settings.timeout_runner,
)
assert len(runner_group.get_runners()) == 1


@mark.skip(reason="The check was reverted due to killing healthy runners")
def test_healthcheck_backend_leak(
runner_group: RunnerGroup,
settings: Settings,
queue: Queue,
github: GitHub,
monkeypatch,
):
runner_group.save()
runner = runner_group.create_runner(github)
# monkeypatch the runner_group.backend.list method to return a list
# with the recently created runner and a fake leaked runner
# This will simulate a leak in the backend
leaked_runner = Runner(
name="leak",
busy=False,
runner_group_name=runner_group.name,
runner_group_id=runner_group.id,
)
monkeypatch.setattr(
"runner_manager.backend.base.BaseBackend.list",
lambda self: [runner, leaked_runner],
)
# ensure the method is patched and that the group is unaware of the leaked runner
assert len(runner_group.get_runners()) == 1
assert len(runner_group.backend.list()) == 2
queue.enqueue(
healthcheck.group,
runner_group.pk,
settings.time_to_live,
settings.timeout_runner,
)
# the healthcheck job should add the runner to the group
# because the timestamp is not yet expired
assert len(runner_group.get_runners()) == 2
# Now we will expire the leaked runner
leaked_runner.created_at = datetime.now(timezone.utc) - (
settings.timeout_runner + timedelta(minutes=1)
)
leaked_runner.save()
queue.enqueue(
healthcheck.group,
runner_group.pk,
settings.time_to_live,
settings.timeout_runner,
)
# the healthcheck job should remove the leaked runner
assert len(runner_group.get_runners()) == 1
46 changes: 46 additions & 0 deletions tests/unit/jobs/test_leaks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from rq import Queue

from runner_manager import RunnerGroup
from runner_manager.clients.github import GitHub
from runner_manager.jobs.leaks import runner_leaks
from runner_manager.models.runner import Runner, RunnerStatus


def test_leak_job(runner_group: RunnerGroup, queue: Queue, github: GitHub, monkeypatch):
runner_group.save()
runner = runner_group.create_runner(github)
assert runner in runner_group.get_runners()
runner.status = RunnerStatus.online
runner.busy = True
runner.save()
job = queue.enqueue(
runner_leaks,
runner_group.pk,
)
# No leaks were found
assert job.return_value() is False
fake_runner: Runner = Runner(
name="fake_runner",
instance_id="fake_instance_id",
busy=False,
status=RunnerStatus.offline,
runner_group_id=runner_group.id,
)
monkeypatch.setattr(
"runner_manager.backend.base.BaseBackend.list",
lambda self: [runner, fake_runner],
)
job = queue.enqueue(
runner_leaks,
runner_group.pk,
)
# fake_runner is a leak since it is not in the group
assert job.return_value() is True


def test_leak_job_group_not_found(queue: Queue, github: GitHub):
job = queue.enqueue(
runner_leaks,
"fake_id",
)
assert job.return_value() is False
4 changes: 4 additions & 0 deletions tests/unit/jobs/test_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,20 @@ def test_scheduler(
jobs: List[Job] = scheduler.get_jobs()
is_indexing: bool = False
is_healthcheck: bool = False
is_runner_leaks: bool = False
for job in jobs:
job_type = job.meta.get("type")
if job_type == "indexing":
is_indexing = True
elif job_type == "healthcheck":
assert settings.healthcheck_interval.total_seconds() / 2 == job.timeout
is_healthcheck = True
elif job_type == "leaks":
is_runner_leaks = True

assert is_indexing is True
assert is_healthcheck is True
assert is_runner_leaks is True


def test_update_group_sync(settings: Settings, github: GitHub):
Expand Down

0 comments on commit f44e0c4

Please sign in to comment.