Skip to content

Commit

Permalink
Delete dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiancook committed Aug 26, 2023
1 parent 29b4f99 commit 448a745
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 21 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ [email protected]
MAIL_SENDER=
MAIL_REPLY_TO=[email protected]
QSTASH_URL=https://qstash.upstash.io/v1/publish/
QSTASH_MESSAGES_URL=https://qstash.upstash.io/v1/messages/
QSTASH_EVENT_URL=/api/event
QSTASH_TOKEN=
QSTASH_CURRENT_SIGNING_KEY=
Expand Down
20 changes: 10 additions & 10 deletions src/events/schedule/event.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {addDurableEvent, DurableEventData} from "../../data";
import {dispatchQStash, isQStash} from "./qstash";
import {addDurableEvent, deleteDurableEvent, DurableEventData} from "../../data";
import {deleteDispatchQStash, dispatchQStash, isQStash} from "./qstash";

const {
DURABLE_EVENTS_IMMEDIATE
Expand All @@ -10,14 +10,7 @@ export async function dispatchEvent(event: DurableEventData) {
const durable = event.eventId ? event : await addDurableEvent(event);

if (isQStash()) {
await dispatchQStash({
background: {
event: durable.type,
eventId: durable.eventId,
eventTimeStamp: durable.timeStamp
},
schedule: durable.schedule
})
await dispatchQStash(durable);
} else if (DURABLE_EVENTS_IMMEDIATE || durable.schedule?.immediate) {
const { background } = await import("../../background");
// Note that background is locking, so if an event is already running
Expand All @@ -33,4 +26,11 @@ export async function dispatchEvent(event: DurableEventData) {
}

return durable;
}

export async function deleteDispatchEvent(event: DurableEventData) {
if (isQStash()) {
await deleteDispatchQStash(event);
}
await deleteDurableEvent(event);
}
75 changes: 64 additions & 11 deletions src/events/schedule/qstash.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import {getOrigin} from "../../listen";
import {BackgroundQuery} from "../../background";
import {DurableEventSchedule} from "../../data";
import {DurableEventData, getDurableEventStore} from "../../data";
import {ok} from "../../is";

export interface DispatchQStashOptions {
background: BackgroundQuery;
schedule?: DurableEventSchedule;
}

export function isQStash() {
return !!process.env.QSTASH_TOKEN;
}

export async function dispatchQStash({ schedule, background }: DispatchQStashOptions) {
const SCHEDULE_KEY = "schedule";

interface ScheduleMeta {
messageId: string;
scheduleId?: string;
createdAt: string;
}


function getMetaStore(event: DurableEventData) {
ok(event.eventId, "Expected eventId");
return getDurableEventStore(event).meta<ScheduleMeta>(event.eventId);
}

export async function dispatchQStash(event: DurableEventData) {
const store = getMetaStore(event);

if (await store.has(SCHEDULE_KEY)) {
await deleteDispatchQStash(event);
}

const { schedule } = event;
const {
QSTASH_URL,
QSTASH_EVENT_URL,
Expand All @@ -28,7 +44,7 @@ export async function dispatchQStash({ schedule, background }: DispatchQStashOpt
);
const baseUrl = new URL(QSTASH_URL || "https://qstash.upstash.io/v1/publish/");
if (!baseUrl.pathname.endsWith("/")) {
baseUrl.pathname = "/";
baseUrl.pathname = `${baseUrl.pathname}/`;
}
const url = new URL(
`${baseUrl.pathname}${targetUrl.toString()}`,
Expand Down Expand Up @@ -62,6 +78,11 @@ export async function dispatchQStash({ schedule, background }: DispatchQStashOpt
// Allows for a default delay
headers.set("Upstash-Delay", QSTASH_DELAY);
}
const background: BackgroundQuery = {
event: event.type,
eventId: event.eventId,
eventTimeStamp: event.timeStamp
};
const response = await fetch(
url.toString(),
{
Expand All @@ -71,7 +92,39 @@ export async function dispatchQStash({ schedule, background }: DispatchQStashOpt
}
);
ok(response.ok, "Could not dispatch QStash message");
const { messageId }: { messageId: string } = await response.json();
console.log(`Dispatched QStash ${messageId}`)
return { messageId };
const { messageId, scheduleId }: { messageId: string, scheduleId?: string } = await response.json();
console.log(`Dispatched QStash ${messageId} ${scheduleId || "No Schedule"}`)
return { messageId, scheduleId };
}

export async function deleteDispatchQStash(event: DurableEventData) {
const store = getMetaStore(event);
const schedule = await store.get(SCHEDULE_KEY);
if (!schedule) {
return;
}
const {
QSTASH_URL,
QSTASH_MESSAGES_URL,
QSTASH_TOKEN
} = process.env;
ok(QSTASH_TOKEN, "Expected QSTASH_TOKEN");
const baseUrl = new URL(QSTASH_MESSAGES_URL || "/v1/messages/", QSTASH_URL || "https://qstash.upstash.io");
if (!baseUrl.pathname.endsWith("/")) {
baseUrl.pathname = `${baseUrl.pathname}/`;
}
const url = new URL(
`${baseUrl.pathname}${schedule.messageId}`,
baseUrl
);
const response = await fetch(
url.toString(),
{
method: "DELETE",
headers: {
Authorization: `Bearer ${QSTASH_TOKEN}`
}
}
)
ok(response.ok, "Could not delete dispatch QStash message");
}

0 comments on commit 448a745

Please sign in to comment.