Skip to content

Commit

Permalink
Virtual events, durable if not virtual
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiancook committed Aug 26, 2023
1 parent 448a745 commit e0ce7a2
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 32 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export interface DurableEventData extends Record<string, unknown>, DurableEventT
eventId?: string;
schedule?: DurableEventSchedule;
retain?: boolean;
virtual?: boolean;
}

export interface DurableEvent extends DurableEventData {
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@
"postcoverage": "mkdir -p coverage && node scripts/post-test.js",
"generate": "yarn build && node esnext/generate.js",
"test": "yarn build && yarn test:node",
"test:node": "export $(cat .env | xargs) && OTEL_SERVICE_NAME=logistics-tests node --enable-source-maps esnext/tests/index.js",
"test:node:inspect": "yarn build && export $(cat .env | xargs) && OTEL_SERVICE_NAME=logistics-tests node --enable-source-maps --inspect-brk esnext/tests/index.js",
"coverage": "export $(cat .env | xargs) && OTEL_SERVICE_NAME=logistics-tests c8 node esnext/tests/index.js && yarn postbuild",
"test:node": "export $(cat .env | xargs) && TESTING=1 OTEL_SERVICE_NAME=logistics-tests node --enable-source-maps esnext/tests/index.js",
"test:node:inspect": "yarn build && export $(cat .env | xargs) && TESTING=1 OTEL_SERVICE_NAME=logistics-tests node --enable-source-maps --inspect-brk esnext/tests/index.js",
"coverage": "export $(cat .env | xargs) && TESTING=1 OTEL_SERVICE_NAME=logistics-tests c8 node esnext/tests/index.js && yarn postbuild",
"start": "node --enable-source-maps esnext/listen/main.js",
"start:nodemon": "nodemon --enable-source-maps esnext/listen/main.js",
"quick": "yarn build && yarn start",
Expand Down
33 changes: 13 additions & 20 deletions src/background/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,32 @@ function isQueryInput(input: BackgroundInput): input is QueryInput {

export async function background(input: BackgroundInput | QueryInput = {}) {

const backgroundId = getBackgroundIdentifier();

if (!input.quiet) {
console.log(`Running background tasks for ${backgroundId}`, input);
}

const complete = await getIdentifiedBackground(backgroundId);

try {
if (isQueryInput(input) && input.query.seed) {
if (isQueryInput(input) && input.query.seed) {
const backgroundId = getBackgroundIdentifier();
if (!input.quiet) {
console.log(`Running background tasks for ${backgroundId}`, input);
}
const complete = await getIdentifiedBackground(backgroundId);
try {
await seed();
} else {
await backgroundScheduleWithOptions(input);
} finally {
// Complete no matter what, but allow above to throw
await complete();
}

if (!input.quiet) {
console.log(`Completed background tasks for ${backgroundId}`, input);
}
} finally {
// Complete no matter what, but allow above to throw
await complete();
} else {
// Has its own locking
await backgroundScheduleWithOptions(input);
}

function getBackgroundIdentifier() {
if (isQueryInput(input)) {
if (input.query.cron) {
return `background:cron:${input.query.cron}`;
}
if (input.query.event) {
// Note this is locking per event type
// This is expected here
return `background:event:${input.query.event}`;
}
}
return BACKGROUND_STATIC;
}
Expand Down
3 changes: 2 additions & 1 deletion src/config/env.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const {
ALLOW_ANONYMOUS_VIEWS,
ENABLE_CACHE,
DEFAULT_TIMEZONE = "Pacific/Auckland"
DEFAULT_TIMEZONE = "Pacific/Auckland",
TESTING
} = process.env;
1 change: 1 addition & 0 deletions src/data/durable-event/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface DurableEventData extends Record<string, unknown>, DurableEventT
eventId?: string;
schedule?: DurableEventSchedule;
retain?: boolean;
virtual?: boolean;
}

export interface DurableEvent extends DurableEventData {
Expand Down
1 change: 0 additions & 1 deletion src/data/storage/lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export function createFakeLock(): UnlockFn {
}

export function getGlobalLock(): LockFn | undefined {
if (isRedisMemory()) return undefined;
if (!isRedis()) return undefined;
const url = getRedisUrl();
const existing = GLOBAL_LOCKS.get(url);
Expand Down
3 changes: 2 additions & 1 deletion src/events/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export async function onDispatchEvent(event: UnknownEvent) {
const dispatching: DurableEventData = {
...event.dispatch,
// Dispatched events are virtual, no need to delete, mark as retain
retain: true
retain: true,
virtual: true,
};
// If the instance has no id, give it one
if (!dispatching.eventId) {
Expand Down
16 changes: 12 additions & 4 deletions src/events/schedule/dispatch-scheduled.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {getScheduledFunctions, ScheduledFunctionOptions, ScheduledOptions} from "./schedule";

import {DurableEventData, getDurableEvent, listDurableEvents, deleteDurableEvent} from "../../data";
import {DurableEventData, getDurableEvent, listDurableEvents, deleteDurableEvent, lock} from "../../data";
import {limited} from "../../limited";
import {ok} from "../../is";

export interface BackgroundScheduleOptions extends ScheduledFunctionOptions {

Expand Down Expand Up @@ -36,9 +37,16 @@ export async function dispatchScheduledDurableEvents(options: BackgroundSchedule
}

async function dispatchScheduledEvent(event: DurableEventData) {
await dispatchEventToHandler(event);
if (!event.retain) {
await deleteDurableEvent(event);
ok(event.eventId, "Expected dispatching event to have an id");
const done = await lock(`dispatch:event:${event.type}:${event.eventId}`);
// TODO detect if this event tries to dispatch again
try {
await dispatchEventToHandler(event);
if (!event.retain) {
await deleteDurableEvent(event);
}
} finally {
await done();
}
}

Expand Down
15 changes: 13 additions & 2 deletions src/events/schedule/event.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import {addDurableEvent, deleteDurableEvent, DurableEventData} from "../../data";
import {addDurableEvent, deleteDurableEvent, DurableEventData, getDurableEvent} from "../../data";
import {deleteDispatchQStash, dispatchQStash, isQStash} from "./qstash";

const {
DURABLE_EVENTS_IMMEDIATE
} = process.env;

async function hasDurableEvent(event: DurableEventData) {
const existing = await getDurableEvent(event);
return !!existing;
}

export async function dispatchEvent(event: DurableEventData) {

const durable = event.eventId ? event : await addDurableEvent(event);
let durable = event;

if (!event.virtual) {
if (!event.eventId || !(await hasDurableEvent(event))) {
durable = await addDurableEvent(event);
}
}

if (isQStash()) {
await dispatchQStash(durable);
Expand Down
4 changes: 4 additions & 0 deletions src/events/schedule/qstash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ import {getOrigin} from "../../listen";
import {BackgroundQuery} from "../../background";
import {DurableEventData, getDurableEventStore} from "../../data";
import {ok} from "../../is";
import {TESTING} from "../../config";

export function isQStash() {
if (TESTING && !process.env.QSTASH_TESTING) {
return false;
}
return !!process.env.QSTASH_TOKEN;
}

Expand Down
4 changes: 4 additions & 0 deletions src/events/virtual/virtual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export async function *generateVirtualEvents() {
// Provide it or have it mutated...
event.eventId = v4();
}
if (!event.virtual) {
// Provide it or have it mutated...
event.virtual = true;
}
}
yield yieldingEvents;
for (const event of yieldingEvents) {
Expand Down

0 comments on commit e0ce7a2

Please sign in to comment.