From 93ff5629bab6347cbf34afcc5fdb2f98a78822fb Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 20 Jun 2024 15:36:47 -0400 Subject: [PATCH] perf: use last_scheduled_task_id for dueSchedules query (#2383) scanning tasks table to determine if a schedule has a running task or had a recently run task doesn't scale we are now using the last_scheduled_task_id column in schedules to avoid heavy subqueries ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is: --- .../lib/workers/scheduling/scheduling.ts | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/packages/scheduler/lib/workers/scheduling/scheduling.ts b/packages/scheduler/lib/workers/scheduling/scheduling.ts index c27d0502b6..84ac909edd 100644 --- a/packages/scheduler/lib/workers/scheduling/scheduling.ts +++ b/packages/scheduler/lib/workers/scheduling/scheduling.ts @@ -7,40 +7,26 @@ import { TASKS_TABLE } from '../../models/tasks.js'; export async function dueSchedules(db: knex.Knex): Promise> { try { - const query = db - .with( - 'due_dates', - // calculate the most recent due date for each schedule that is started/not deleted - db - .select( - 's.id', - db.raw(` - s.starts_at + (FLOOR(EXTRACT(EPOCH FROM (NOW() - s.starts_at)) / EXTRACT(EPOCH FROM s.frequency)) * s.frequency) AS dueAt - `) - ) - .from({ s: SCHEDULES_TABLE }) - .where({ state: 'STARTED' }) - .whereRaw('s.starts_at <= NOW()') - // Locking schedules to prevent any concurrent update or concurrent scheduling of tasks - .forUpdate() - .skipLocked() - ) - .select('*') - .from({ s: SCHEDULES_TABLE }) - .joinRaw('JOIN due_dates lrt ON s.id = lrt.id') - // filter out schedules that have a running task - .whereNotExists( - db - .select('id') - .from({ t: TASKS_TABLE }) - .whereRaw('t.schedule_id = s.id') - .where(function () { - this.where({ state: 'CREATED' }).orWhere({ state: 'STARTED' }); - }) - ) - // filter out schedules that have tasks started after the due date - .whereNotExists(db.select('id').from({ t: TASKS_TABLE }).whereRaw('t.schedule_id = s.id').andWhere('t.starts_after', '>=', db.raw('lrt.dueAt'))); - const schedules = await query; + const schedules: DbSchedule[] = await db + .select('s.*') + .from({ s: SCHEDULES_TABLE }) + .leftJoin(`${TASKS_TABLE} AS t`, 's.last_scheduled_task_id', 't.id') + .where('s.state', 'STARTED') + .where('s.starts_at', '<=', db.fn.now()) + .where(function () { + // schedule has never been run + this.where('s.last_scheduled_task_id', 'IS', null) + // schedule with last task not running and was started before the last due time + .orWhere(function () { + this.whereNotIn('t.state', ['CREATED', 'STARTED']).andWhere( + 't.starts_after', + '<', + db.raw(`s.starts_at + (floor(extract(EPOCH FROM (now() - s.starts_at)) / extract(EPOCH FROM s.frequency)) * s.frequency)`) + ); + }); + }) + .forUpdate('s') + .skipLocked(); return Ok(schedules.map(DbSchedule.from)); } catch (err: unknown) { console.log(err);