Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing retries with backoff with rq-scheduler #309

Open
shreyasp opened this issue May 12, 2024 · 0 comments
Open

Implementing retries with backoff with rq-scheduler #309

shreyasp opened this issue May 12, 2024 · 0 comments

Comments

@shreyasp
Copy link

I am trying to write a solution that mimics the python-rq feature of retrying a job at specified intervals when it fails. But it seems there's some issue if the job succeeds after subsequent failures.

In the scenario when the job succeeds after subsequent failures, as seen in the script below, I am trying to reset the retry_intervals and retries_left to None. So, if a failure scenario occurs later in time, it would ideally have the same number of retries left. But in my case of this script, the job gets canceled as retries_left doesn't reset its value.

I would be grateful to get any help or advice on the topic. Thanks.

import random
from redis import Redis
from rq import Queue
from rq.job import Job
from rq_scheduler import Scheduler
from datetime import datetime, timedelta, timezone

queue = Queue("bar", connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)


def Hello(*args, **kwargs):
    i = random.randrange(0, 10, 1)
    if i % 3 != 0:
        raise Exception("failed job")
    else:
        print("runs successfully")


def HelloSuccess(job: Job, connection, result, *args, **kwargs):
    print("succeeded", job.ended_at, job.func_name)

    if (
        job.retries_left is not None
        and job.retry_intervals is not None
        and job.retries_left < len(job.retry_intervals)
    ):
        d = job.ended_at.replace(tzinfo=timezone.utc)
        scheduler.change_execution_time(
            job,
            d + timedelta(seconds=job.meta["interval"]),
        )
        job.retry_intervals = None
        job.retries_left = None

        job.save(include_meta=True, include_result=True)


def HelloFailure(job: Job, connection, _, value, traceback):
    print("failed", job.ended_at, job.func_name)

    if job.retries_left is None and job.retry_intervals is None:
        backoff_intervals = [10, 20]
        job.retry_intervals = backoff_intervals
        job.retries_left = len(backoff_intervals)
        job.save(include_meta=True)

    d = job.ended_at.replace(tzinfo=timezone.utc)

    if job.retries_left != 0:
        scheduler.change_execution_time(
            job, d + timedelta(seconds=job.get_retry_interval())
        )

        job.save(include_meta=True)

    else:
        print("cancelling the job")
        scheduler.cancel(job)

    return


if __name__ == "__main__":

    scheduled_time = datetime.now(tz=timezone.utc) + timedelta(seconds=10)
    print("scheduling now ... to run at {}".format(scheduled_time))

    j: Job = scheduler.schedule(
        scheduled_time=scheduled_time,
        func="test_scheduler.Hello",
        args=[],
        kwargs={},
        queue_name="bar",
        repeat=None,
        interval=5,
        on_success="test_scheduler.HelloSuccess",
        on_failure="test_scheduler.HelloFailure",
    )

    print(j.retry_intervals)
    print(j.retries_left)

    print("scheduled job {}".format(j.get_id()))

    for job in scheduler.get_jobs():
        if job.id != j.id:
            scheduler.cancel(job)
            continue

        print(job)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant