Skip to content

Commit

Permalink
perf: use last_scheduled_task_id for dueSchedules query (NangoHQ#2383)
Browse files Browse the repository at this point in the history
scanning tasks table to determine if a schedule has a running task or
had a recently run task doesn't scale
we are now using the last_scheduled_task_id column in schedules to avoid
heavy subqueries

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [ ] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
TBonnin authored Jun 20, 2024
1 parent 0a00097 commit 93ff562
Showing 1 changed file with 20 additions and 34 deletions.
54 changes: 20 additions & 34 deletions packages/scheduler/lib/workers/scheduling/scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,26 @@ import { TASKS_TABLE } from '../../models/tasks.js';

export async function dueSchedules(db: knex.Knex): Promise<Result<Schedule[]>> {
try {
const query = db
.with(
'due_dates',
// calculate the most recent due date for each schedule that is started/not deleted
db
.select(
's.id',
db.raw(`
s.starts_at + (FLOOR(EXTRACT(EPOCH FROM (NOW() - s.starts_at)) / EXTRACT(EPOCH FROM s.frequency)) * s.frequency) AS dueAt
`)
)
.from({ s: SCHEDULES_TABLE })
.where({ state: 'STARTED' })
.whereRaw('s.starts_at <= NOW()')
// Locking schedules to prevent any concurrent update or concurrent scheduling of tasks
.forUpdate()
.skipLocked()
)
.select('*')
.from<DbSchedule>({ s: SCHEDULES_TABLE })
.joinRaw('JOIN due_dates lrt ON s.id = lrt.id')
// filter out schedules that have a running task
.whereNotExists(
db
.select('id')
.from({ t: TASKS_TABLE })
.whereRaw('t.schedule_id = s.id')
.where(function () {
this.where({ state: 'CREATED' }).orWhere({ state: 'STARTED' });
})
)
// filter out schedules that have tasks started after the due date
.whereNotExists(db.select('id').from({ t: TASKS_TABLE }).whereRaw('t.schedule_id = s.id').andWhere('t.starts_after', '>=', db.raw('lrt.dueAt')));
const schedules = await query;
const schedules: DbSchedule[] = await db
.select('s.*')
.from({ s: SCHEDULES_TABLE })
.leftJoin(`${TASKS_TABLE} AS t`, 's.last_scheduled_task_id', 't.id')
.where('s.state', 'STARTED')
.where('s.starts_at', '<=', db.fn.now())
.where(function () {
// schedule has never been run
this.where('s.last_scheduled_task_id', 'IS', null)
// schedule with last task not running and was started before the last due time
.orWhere(function () {
this.whereNotIn('t.state', ['CREATED', 'STARTED']).andWhere(
't.starts_after',
'<',
db.raw(`s.starts_at + (floor(extract(EPOCH FROM (now() - s.starts_at)) / extract(EPOCH FROM s.frequency)) * s.frequency)`)
);
});
})
.forUpdate('s')
.skipLocked();
return Ok(schedules.map(DbSchedule.from));
} catch (err: unknown) {
console.log(err);
Expand Down

0 comments on commit 93ff562

Please sign in to comment.