Skip to content

Commit

Permalink
Default schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiancook committed Oct 1, 2023
1 parent 097681e commit 6ef3d8f
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {ContentIndexConfig} from "../content-index";
import type {DispatchEventConfig} from "../events";
import type {FastifyConfig} from "../listen";
import type {WorkerPoolConfig} from "../worker/pool";
import type {PeriodicSyncScheduleConfig} from "../periodic-sync/schedule";

export interface LogisticsConfig {

Expand Down Expand Up @@ -44,7 +45,8 @@ export interface Config extends
HappeningTreeConfig,
DispatchEventConfig,
FastifyConfig,
WorkerPoolConfig {
WorkerPoolConfig,
PeriodicSyncScheduleConfig {
name: string;
version: string;
root: string;
Expand Down
7 changes: 7 additions & 0 deletions src/data/durable-event/list-durable-event-ids.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {DurableEventTypeData} from "./types";
import {getDurableEventStore} from "./store";

export function listDurableEventIds(event: DurableEventTypeData) {
const store = getDurableEventStore(event);
return store.keys();
}
3 changes: 2 additions & 1 deletion src/data/expiring-kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { Expiring } from "./expiring";
export const EXPIRING_KEYS = new Set<`${string}|${string}`>();

export const MINUTE_MS = 60 * 1000;
export const DAY_MS = 24 * 60 * MINUTE_MS;
export const HOUR_MS = MINUTE_MS * 60;
export const DAY_MS = 24 * HOUR_MS;
export const MONTH_MS = 31 * DAY_MS;
export const DEFAULT_EXPIRES_IN_MS = 7 * DAY_MS;

Expand Down
1 change: 1 addition & 0 deletions src/events/schedule/qstash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ export async function deleteDispatchQStash(event: DurableEventData) {
}
)
ok(response.ok, "Could not delete dispatch QStash message");
await store.delete(SCHEDULE_KEY);
}
16 changes: 16 additions & 0 deletions src/is.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,20 @@ export function isSignalled(event: unknown): event is Signalled {
event.signal &&
typeof event.signal.aborted === "boolean"
)
}

export function isMatchingObjects(a?: unknown, b?: unknown): boolean {
if (a === b) return true;
if (!a || !b) return false;
if (typeof a !== "object" || typeof b !== "object") return false;
const aEntries = Object.entries(a);
const bMap = new Map(Object.entries(b));
if (aEntries.length !== bMap.size) return false;
return aEntries.every(([key, value]) => {
const otherValue = bMap.get(key);
if (typeof value === "object") {
return isMatchingObjects(value, otherValue);
}
return value === otherValue;
})
}
4 changes: 3 additions & 1 deletion src/periodic-sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from "./dispatch";
export * from "./manager";
export * from "./manager";
export * from "./schedule";
export * from "./virtual";
12 changes: 8 additions & 4 deletions src/periodic-sync/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@ function getPeriodicSyncTagStore() {
})
}

export async function getPeriodicSyncTagRegistrationState(tag: string) {
export async function getPeriodicSyncTagRegistration(tag: string) {
const store = getPeriodicSyncTagStore();
const existing = await store.get(tag);
ok(existing, "Expected to find registered sync tag");
return existing;
}

export async function getPeriodicSyncTagRegistrationState(tag: string) {
const existing = await getPeriodicSyncTagRegistration(tag);
return existing.registrationState;
}

export async function setPeriodicSyncTagRegistrationState(tag: string, registrationState: SyncTagRegistrationState) {
const store = getPeriodicSyncTagStore();
const existing = await store.get(tag);
ok(existing, "Expected to find registered sync tag");
const existing = await getPeriodicSyncTagRegistration(tag);
const next: SyncTag = {
...existing,
registrationState,
registrationStateAt: new Date().toISOString()
};
const store = getPeriodicSyncTagStore();
await store.set(tag, next);
return next;
}
Expand Down
81 changes: 81 additions & 0 deletions src/periodic-sync/schedule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type {DurableEventSchedule} from "../data";
import type {DurablePeriodicSyncManager} from "./manager";
import {getConfig} from "../config";
import {getPeriodicSyncTagRegistration} from "./manager";
import {DAY_MS, HOUR_MS, MINUTE_MS, MONTH_MS} from "../data";

export interface PeriodicSyncSchedule {
tag: string;
schedule: DurableEventSchedule
}

export interface GetPeriodicSyncScheduleFn {
(manager: DurablePeriodicSyncManager): Promise<PeriodicSyncSchedule[]>
}

export interface PeriodicSyncScheduleConfig {
getPeriodicSyncSchedule?: GetPeriodicSyncScheduleFn;
}

export async function getPeriodicSyncSchedule(manager?: DurablePeriodicSyncManager) {
if (!manager) {
const { periodicSync } = await import("./manager");
return getPeriodicSyncSchedule(periodicSync);
}
const config = getConfig();
const fn: GetPeriodicSyncScheduleFn = config.getPeriodicSyncSchedule ?? getDefaultPeriodicSyncSchedule;
return fn(manager);
}

function getCronExpressionFromInterval(interval: number) {
if (interval <= MINUTE_MS) {
return "0 * * * *";
}
if (interval <= HOUR_MS) {
return "0 0 * * *";
}
if (interval <= DAY_MS) {
return "0 0 0 * *";
}
return undefined;
}

export async function getDefaultDurableEventScheduleForPeriodicSyncTag(tag: string): Promise<DurableEventSchedule> {
const { minInterval, createdAt } = await getPeriodicSyncTagRegistration(tag);
if (!minInterval) {
return {
immediate: true
};
}
if (minInterval <= MONTH_MS) {
const cron = getCronExpressionFromInterval(minInterval);
if (cron) {
return {
cron
}
}
}
const createdAtTime = new Date(createdAt).getTime();
const timeSince = Date.now() - createdAtTime;
const intervalsSince = Math.floor(timeSince / minInterval);
const nextInterval = intervalsSince + 1;
const nextIntervalTime = createdAtTime + (nextInterval * minInterval);
const nextIntervalAt = new Date(nextIntervalTime).toISOString();
return {
after: nextIntervalAt
};
}

export async function getDefaultPeriodicSyncSchedule(manager: DurablePeriodicSyncManager): Promise<PeriodicSyncSchedule[]> {
const tags = await manager.getTags();
return await Promise.all(
tags.map(
async (tag): Promise<PeriodicSyncSchedule> => {
return {
tag,
schedule: await getDefaultDurableEventScheduleForPeriodicSyncTag(tag)
}
}
)
)
}
50 changes: 50 additions & 0 deletions src/periodic-sync/virtual.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {virtual} from "../events/virtual/virtual";
import {listDurableEventIds} from "../data/durable-event/list-durable-event-ids";
import {DurableEventSchedule, getDurableEvent} from "../data";
import {isMatchingObjects} from "../is";
import {getPeriodicSyncSchedule} from "./schedule";

export const removePeriodicSyncVirtualFunction = virtual(async function * () {
const schedules = await getPeriodicSyncSchedule();
const type = "periodicsync"
const existingTags = await listDurableEventIds({
type
});
const tags = schedules.map(({ tag }) => tag);
const notMatching = existingTags.filter(tag => !tags.includes(tag));
const { deleteDispatchEvent } = await import("../events");
for (const tag of notMatching) {
const existing = await getDurableEvent({
type,
durableEventId: tag
});
if (existing) {
await deleteDispatchEvent(existing);
}
}
for (const { tag, schedule } of schedules) {
const dispatch = {
durableEventId: tag,
type,
tag,
schedule
};
const existing = await getDurableEvent(dispatch)
if (existing) {
if (isMatchingDurableEventSchedule(existing.schedule, dispatch.schedule)) {
continue;
}
// Ensure we delete the old schedule before defining a new one
// This shouldn't happen often if periodicSync is staying the same
await deleteDispatchEvent(existing);
}
yield {
type: "dispatch",
dispatch
};
}
})

export function isMatchingDurableEventSchedule(a: DurableEventSchedule, b: DurableEventSchedule) {
return isMatchingObjects(a, b);
}

0 comments on commit 6ef3d8f

Please sign in to comment.