Skip to content

Commit

Permalink
Merge pull request #12 from quirrel-dev/retry-backoff
Browse files Browse the repository at this point in the history
Add support for Retry / Backoff
  • Loading branch information
Skn0tt authored Jan 27, 2021
2 parents 48e1ee7 + 369036a commit 4036188
Show file tree
Hide file tree
Showing 13 changed files with 3,085 additions and 27 deletions.
2,861 changes: 2,860 additions & 1 deletion package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export interface Job<ScheduleType extends string = string> {
runAt: Date;
exclusive: boolean;

retry: number[];

schedule?: {
type: ScheduleType;
meta: string;
Expand Down Expand Up @@ -33,6 +35,12 @@ export interface JobEnqueue<ScheduleType extends string = string> {
*/
override?: boolean;

/**
* Retry a job on the specified schedule.
* @example [ 10, 100, 1000 ] a job was scheduled for t=0ms. It fails, so it's scheduled for retry t=10ms. It fails again, so it's scheduled for retry at t=100ms, and so forth.
*/
retry?: number[];

/**
* Optional: Schedule options.
*/
Expand Down
11 changes: 9 additions & 2 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,15 @@ export class Activity<ScheduleType extends string> implements Closable {
}

private async handleMessage(channel: string, message: string) {
const [_type, ...args] = splitEvent(message, 8);
const [_type, ...args] = splitEvent(message, 9);
const type = _type as OnActivityEvent["type"];

const [queue, id] = channel.split(":");
const channelParts = channel.split(":");
if (channelParts.length !== 2) {
return;
}

const [queue, id] = channelParts;

if (type === "scheduled") {
const [
Expand All @@ -119,6 +124,7 @@ export class Activity<ScheduleType extends string> implements Closable {
max_times,
exclusive,
count,
retryJson,
payload,
] = args;
await this.onEvent({
Expand All @@ -130,6 +136,7 @@ export class Activity<ScheduleType extends string> implements Closable {
runAt: new Date(+runDate),
count: Number(count),
exclusive: exclusive === "true",
retry: JSON.parse(retryJson),
schedule: schedule_type
? {
type: schedule_type,
Expand Down
21 changes: 16 additions & 5 deletions src/producer/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ declare module "ioredis" {
schedule_meta: string | undefined,
times: number | undefined,
overwrite: boolean,
exclusive: boolean
exclusive: boolean,
retryIntervals: string
): Promise<0 | 1>;

delete(
Expand Down Expand Up @@ -72,6 +73,12 @@ export class Producer<ScheduleType extends string> implements Closable {
job.runAt = new Date();
}

const { retry = [], schedule } = job;

if (retry.length && schedule) {
throw new Error("retry and schedule cannot be used together");
}

await this.redis.schedule(
`jobs:${job.queue}:${job.id}`,
`queues:${job.queue}`,
Expand All @@ -84,7 +91,8 @@ export class Producer<ScheduleType extends string> implements Closable {
job.schedule?.meta,
job.schedule?.times,
job.override ?? false,
job.exclusive ?? false
job.exclusive ?? false,
JSON.stringify(retry)
);
debug("job #%o: enqueued", job.id);

Expand All @@ -96,6 +104,7 @@ export class Producer<ScheduleType extends string> implements Closable {
runAt: job.runAt,
exclusive: job.exclusive ?? false,
schedule: job.schedule,
retry,
};
}

Expand Down Expand Up @@ -139,9 +148,9 @@ export class Producer<ScheduleType extends string> implements Closable {

return {
newCursor: +newCursor,
jobs: (await this.findJobs(jobIds)).filter((j) => !!j) as Job<
ScheduleType
>[],
jobs: (await this.findJobs(jobIds)).filter(
(j) => !!j
) as Job<ScheduleType>[],
};
}

Expand Down Expand Up @@ -178,6 +187,7 @@ export class Producer<ScheduleType extends string> implements Closable {
count,
max_times,
exclusive,
retry,
} = hgetallResult;

if (typeof payload === "undefined") {
Expand All @@ -201,6 +211,7 @@ export class Producer<ScheduleType extends string> implements Closable {
}
: undefined,
count: +count,
retry: JSON.parse(retry),
});
}

Expand Down
5 changes: 3 additions & 2 deletions src/producer/schedule.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ARGV[7] maximum execution times
ARGV[8] override if id already exists
ARGV[9] exclusive
ARGV[10] retryIntervals (as JSON array)
Output:
0 if everything went fine
Expand All @@ -27,15 +28,15 @@ if ARGV[8] == "false" then
end
end

redis.call("HSET", KEYS[1], "payload", ARGV[3], "schedule_type", ARGV[5], "schedule_meta", ARGV[6], "max_times", ARGV[7], "count", 1, "exclusive", ARGV[9])
redis.call("HSET", KEYS[1], "payload", ARGV[3], "schedule_type", ARGV[5], "schedule_meta", ARGV[6], "max_times", ARGV[7], "count", 1, "exclusive", ARGV[9], "retry", ARGV[10])

redis.call("SADD", KEYS[2], ARGV[1])

-- enqueues it
redis.call("ZADD", KEYS[3], ARGV[4], ARGV[2] .. ":" .. ARGV[1])

-- publishes "scheduled" to "<queue>:<id>"
redis.call("PUBLISH", ARGV[2] .. ":" .. ARGV[1], "scheduled" .. ":" .. ARGV[4] .. ":" .. ARGV[5] .. ":" .. ARGV[6] .. ":" .. ARGV[7] .. ":" .. ARGV[9] .. ":" .. 1 .. ":" .. ARGV[3])
redis.call("PUBLISH", ARGV[2] .. ":" .. ARGV[1], "scheduled" .. ":" .. ARGV[4] .. ":" .. ARGV[5] .. ":" .. ARGV[6] .. ":" .. ARGV[7] .. ":" .. ARGV[9] .. ":" .. 1 .. ":" .. ARGV[10] .. ":" .. ARGV[3])
-- publishes "<queue>:<id>" to "scheduled"
redis.call("PUBLISH", "scheduled", ARGV[2] .. ":" .. ARGV[1])

Expand Down
5 changes: 3 additions & 2 deletions src/worker/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ if redis.call("SISMEMBER", KEYS[3], queue) == 1 then
return -1
end

local jobData = redis.call("HMGET", ARGV[1] .. ":" .. queueAndId, "payload", "schedule_type", "schedule_meta", "count", "max_times", "exclusive")
local jobData = redis.call("HMGET", ARGV[1] .. ":" .. queueAndId, "payload", "schedule_type", "schedule_meta", "count", "max_times", "exclusive", "retry")

local payload = jobData[1]
local schedule_type = jobData[2]
local schedule_meta = jobData[3]
local count = jobData[4]
local max_times = jobData[5]
local exclusive = jobData[6]
local retry = jobData[7]

if exclusive == "true" then
if redis.call("HGET", KEYS[4], queue) ~= "0" then
Expand All @@ -78,4 +79,4 @@ redis.call("PUBLISH", queue .. ":" .. id, "requested")
-- publishes "<queue>:<id>" to "requested"
redis.call("PUBLISH", "requested", queue .. ":" .. id)

return { queue, id, payload, score, schedule_type, schedule_meta, count, max_times, exclusive }
return { queue, id, payload, score, schedule_type, schedule_meta, count, max_times, exclusive, retry }
12 changes: 12 additions & 0 deletions src/worker/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export function computeTimestampForNextRetry(
currentRunAt: Date,
retryIntervals: number[],
currentTry: number
): number | undefined {
if (currentTry > retryIntervals.length) {
return undefined;
}

const durationOfPreviousRun = retryIntervals[currentTry - 2] ?? 0;
return +currentRunAt + retryIntervals[currentTry - 1] - durationOfPreviousRun;
}
34 changes: 27 additions & 7 deletions src/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as path from "path";
import type { ScheduleMap } from "../index";
import createDebug from "debug";
import { EggTimer } from "./egg-timer";
import { computeTimestampForNextRetry } from "./retry";

const debug = createDebug("owl:worker");

Expand All @@ -30,7 +31,8 @@ declare module "ioredis" {
schedule_meta: string,
count: string,
max_times: string,
exclusive: "true" | "false"
exclusive: "true" | "false",
retry: string
]
| null
| -1
Expand All @@ -55,7 +57,7 @@ export interface ProcessorMeta {
dontReschedule(): void;
}
export type Processor = (
job: Job,
job: Readonly<Job>,
processorMeta: ProcessorMeta
) => Promise<void>;
export type OnError = (job: Job, error: Error) => void;
Expand Down Expand Up @@ -183,8 +185,10 @@ export class Worker implements Closable {
count,
max_times,
exclusive,
retryJSON = "[]",
] = result;
const runAt = new Date(+runAtTimestamp);
const retry = JSON.parse(retryJSON) as number[];

const job: Job = {
queue,
Expand All @@ -200,6 +204,7 @@ export class Worker implements Closable {
times: max_times ? +max_times : undefined,
}
: undefined,
retry,
};

let dontReschedule = false;
Expand All @@ -214,16 +219,27 @@ export class Worker implements Closable {
} catch (error) {
debug(`requestNextJobs(): job #${id} - failed`);

const isRetryable = !!computeTimestampForNextRetry(
runAt,
retry,
+count
);

const event = isRetryable ? "retry" : "fail";
const pipeline = this.redis.pipeline();

pipeline.publish("fail", `${queue}:${id}:${error}`);
pipeline.publish(queue, `fail:${id}:${error}`);
pipeline.publish(`${queue}:${id}`, `fail:${error}`);
pipeline.publish(`${queue}:${id}:fail`, error);
const errorString = encodeURIComponent(error);

pipeline.publish(event, `${queue}:${id}:${errorString}`);
pipeline.publish(queue, `${event}:${id}:${errorString}`);
pipeline.publish(`${queue}:${id}`, `${event}:${errorString}`);
pipeline.publish(`${queue}:${id}:${event}`, errorString);

await pipeline.exec();

this.onError?.(job, error);
if (!isRetryable) {
this.onError?.(job, error);
}
} finally {
let nextExecDate: number | undefined = undefined;

Expand All @@ -239,6 +255,10 @@ export class Worker implements Closable {
nextExecDate = undefined;
}

if (retry.length) {
nextExecDate = computeTimestampForNextRetry(runAt, retry, +count);
}

await this.redis.acknowledge(
`jobs:${queue}:${id}`,
`queues:${queue}`,
Expand Down
7 changes: 5 additions & 2 deletions test/functional/activity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ function test(backend: "Redis" | "In-Memory") {
runAt: currentDate,
count: 1,
schedule: undefined,
exclusive: false
exclusive: false,
retry: [],
},
},
{
Expand All @@ -70,7 +71,8 @@ function test(backend: "Redis" | "In-Memory") {
runAt: new Date(9999999999999),
count: 1,
schedule: undefined,
exclusive: true
exclusive: true,
retry: [],
},
},
{
Expand All @@ -82,6 +84,7 @@ function test(backend: "Redis" | "In-Memory") {
count: 1,
runAt: currentDate,
exclusive: false,
retry: [],
schedule: {
type: "every",
meta: "10",
Expand Down
4 changes: 4 additions & 0 deletions test/functional/job-management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function test(backend: "Redis" | "In-Memory") {
count: 1,
schedule: undefined,
exclusive: false,
retry: [],
});

await env.producer.enqueue({
Expand All @@ -48,6 +49,7 @@ function test(backend: "Redis" | "In-Memory") {
schedule: undefined,
count: 1,
exclusive: false,
retry: [],
},
{
queue: "producer-scan-queue",
Expand All @@ -57,6 +59,7 @@ function test(backend: "Redis" | "In-Memory") {
schedule: undefined,
count: 1,
exclusive: false,
retry: [],
},
]);

Expand Down Expand Up @@ -86,6 +89,7 @@ function test(backend: "Redis" | "In-Memory") {
schedule: undefined,
count: 1,
exclusive: false,
retry: [],
});
});

Expand Down
Loading

0 comments on commit 4036188

Please sign in to comment.