diff --git a/package-lock.json b/package-lock.json index d50d0ce..ba5f839 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "simple-workflows", - "version": "0.1.0-beta14", + "version": "0.1.0-beta15", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "simple-workflows", - "version": "0.1.0-beta14", + "version": "0.1.0-beta15", "license": "MIT", "dependencies": { "@azure/data-tables": "^13.0.1", @@ -234,9 +234,9 @@ } }, "node_modules/@azure/storage-blob": { - "version": "12.17.0", - "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.17.0.tgz", - "integrity": "sha512-sM4vpsCpcCApagRW5UIjQNlNylo02my2opgp0Emi8x888hZUvJ3dN69Oq20cEGXkMUWnoCrBaB0zyS3yeB87sQ==", + "version": "12.18.0", + "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.18.0.tgz", + "integrity": "sha512-BzBZJobMoDyjJsPRMLNHvqHycTGrT8R/dtcTx9qUFcqwSRfGVK9A/cZ7Nx38UQydT9usZGbaDCN75QRNjezSAA==", "dependencies": { "@azure/abort-controller": "^1.0.0", "@azure/core-http": "^3.0.0", @@ -410,9 +410,9 @@ "dev": true }, "node_modules/@types/node": { - "version": "18.19.31", - "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.31.tgz", - "integrity": "sha512-ArgCD39YpyyrtFKIqMDvjz79jto5fcI/SVUs2HwB+f0dAzq68yqOdyaSivLiLugSziTpNXLQrVb7RZFmdZzbhA==", + "version": "18.19.33", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.33.tgz", + "integrity": "sha512-NR9+KrpSajr2qBVp/Yt5TU/rp+b5Mayi3+OlMlcg2cVCfRmcG5PWZ7S4+MG9PZ5gWBoc9Pd0BKSRViuBCRPu0A==", "dependencies": { "undici-types": "~5.26.4" } @@ -3518,9 +3518,9 @@ } }, "node_modules/typescript": { - "version": "5.3.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.3.3.tgz", - "integrity": "sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw==", + "version": "5.4.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz", + "integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==", "dev": true, "bin": { "tsc": "bin/tsc", @@ -3838,9 +3838,9 @@ } }, "@azure/storage-blob": { - "version": "12.17.0", - "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.17.0.tgz", - "integrity": "sha512-sM4vpsCpcCApagRW5UIjQNlNylo02my2opgp0Emi8x888hZUvJ3dN69Oq20cEGXkMUWnoCrBaB0zyS3yeB87sQ==", + "version": "12.18.0", + "resolved": "https://registry.npmjs.org/@azure/storage-blob/-/storage-blob-12.18.0.tgz", + "integrity": "sha512-BzBZJobMoDyjJsPRMLNHvqHycTGrT8R/dtcTx9qUFcqwSRfGVK9A/cZ7Nx38UQydT9usZGbaDCN75QRNjezSAA==", "requires": { "@azure/abort-controller": "^1.0.0", "@azure/core-http": "^3.0.0", @@ -3968,9 +3968,9 @@ "dev": true }, "@types/node": { - "version": "18.19.31", - "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.31.tgz", - "integrity": "sha512-ArgCD39YpyyrtFKIqMDvjz79jto5fcI/SVUs2HwB+f0dAzq68yqOdyaSivLiLugSziTpNXLQrVb7RZFmdZzbhA==", + "version": "18.19.33", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.33.tgz", + "integrity": "sha512-NR9+KrpSajr2qBVp/Yt5TU/rp+b5Mayi3+OlMlcg2cVCfRmcG5PWZ7S4+MG9PZ5gWBoc9Pd0BKSRViuBCRPu0A==", "requires": { "undici-types": "~5.26.4" } @@ -6133,9 +6133,9 @@ } }, "typescript": { - "version": "5.3.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.3.3.tgz", - "integrity": "sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw==", + "version": "5.4.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz", + "integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==", "dev": true }, "unbox-primitive": { diff --git a/package.json b/package.json index bce59fd..f36a853 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "simple-workflows", - "version": "0.1.0-beta14", + "version": "0.1.0-beta15", "description": "Workflows as code in TypeScript", "main": "lib/index.js", "engines": { diff --git a/src/index.ts b/src/index.ts index 2d72072..cb850d5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,6 @@ export * from "./proxyActivities"; export { Worker as WorkflowWorker } from "./Worker"; export * from "./stores/"; +export * from "./IWorker"; +export * from "./WorkflowContext"; +export * from "./IWorkflowContext"; diff --git a/src/proxyActivities.ts b/src/proxyActivities.ts index 179fe95..708637d 100644 --- a/src/proxyActivities.ts +++ b/src/proxyActivities.ts @@ -1,4 +1,4 @@ -import { isDeepStrictEqual } from "util"; +import { isDeepStrictEqual } from "node:util"; import { DefaultRetryPolicy } from "./DefaultRetryPolicy"; import { deserializeError, serializeError } from "./serialize-error"; import { type WorkflowActivityInstance, type WorkflowInstance } from "./stores/IWorkflowHistoryStore"; diff --git a/src/stores/DurableFunctionsWorkflowHistoryStore.ts b/src/stores/DurableFunctionsWorkflowHistoryStore.ts index a08cacd..fa005a1 100644 --- a/src/stores/DurableFunctionsWorkflowHistoryStore.ts +++ b/src/stores/DurableFunctionsWorkflowHistoryStore.ts @@ -1,12 +1,11 @@ -import { type WorkflowActivityInstance, type IWorkflowHistoryStore, type WorkflowInstance, type WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; +import type { WorkflowActivityInstance, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; import { type GetTableEntityResponse, TableClient, type TableEntity, type TableEntityResult, TableServiceClient, TableTransaction } from "@azure/data-tables"; import { deserializeError, serializeError } from "../serialize-error"; import { BlobServiceClient, type ContainerClient } from "@azure/storage-blob"; import zlib from "zlib"; import { Mutex } from "async-mutex"; -import { DefaultSerializer } from "../DefaultSerializer"; import { type ISerializer } from "../ISerializer"; -import { isDeepStrictEqual } from "util"; +import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore"; interface IDurableFunctionsWorkflowHistory { Name: string @@ -36,19 +35,19 @@ interface IDurableFunctionsWorkflowInstance { CompletedTime?: Date } -export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistoryStore { +export class DurableFunctionsWorkflowHistoryStore extends SerializedWorkflowHistoryStore { private initialized?: boolean; private readonly history: TableClient; private readonly instances: TableClient; private readonly largeMessages: ContainerClient; private readonly mutex = new Mutex(); - private readonly options: { connectionString: string, taskHubName: string, serializer: ISerializer }; + private readonly options: { connectionString: string, taskHubName: string }; constructor(options: { connectionString: string, taskHubName?: string, serializer?: ISerializer }) { + super(options?.serializer); this.options = { connectionString: options.connectionString, taskHubName: options.taskHubName ?? "Workflow", - serializer: options.serializer ?? new DefaultSerializer(), }; const { connectionString, taskHubName } = this.options; @@ -59,10 +58,6 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto this.largeMessages = blobServicesClient.getContainerClient(`${taskHubName}-largemessages`.toLowerCase()); } - public equal = (val1: any, val2: any): boolean => { - return (this.options.serializer.equal ?? isDeepStrictEqual)(val1, val2); - }; - private async init(): Promise { if (this.initialized) { return; @@ -119,13 +114,13 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto await this.init(); } - public async getInstance(id: string): Promise { + public getInstance = async (id: string): Promise => { return await this.mutex.runExclusive(async () => { await this.init(); return await this.getInstanceInternal(id); }); - } + }; private async getInstanceInternal(id: string): Promise { async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise { @@ -152,7 +147,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto const buffer = await streamToBuffer(downloadBlockBlobResponse.readableStreamBody); const unzipped = zlib.unzipSync(buffer).toString(); - return this.options.serializer.parse(unzipped); + return this.serializer.parse(unzipped); }; let entity: GetTableEntityResponse> | undefined; @@ -178,7 +173,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto if (entity.Input.indexOf("http://") === 0) { instance.args = await getBlob(`${id}/Input.json.gz`); } else { - instance.args = this.options.serializer.parse(entity.Input); + instance.args = this.serializer.parse(entity.Input); } } @@ -193,7 +188,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto args = await getBlob(entity.InputBlobName); } else { if (entity.Input) { - args = this.options.serializer.parse(entity.Input); + args = this.serializer.parse(entity.Input); } else { args = []; } @@ -212,7 +207,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto if (entity.ResultBlobName) { result = await getBlob(entity.ResultBlobName); } else if (entity.Result) { - result = this.options.serializer.parse(entity.Result); + result = this.serializer.parse(entity.Result); } if (entity.EventType === "TaskCompleted") { @@ -241,7 +236,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto if (entity.Output.indexOf("http://") === 0) { output = await getBlob(`${id}/Output.json.gz`); } else { - output = this.options.serializer.parse(entity.Output); + output = this.serializer.parse(entity.Output); } if (entity.RuntimeStatus === "Failed") { @@ -255,7 +250,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto return instance; } - public async setInstance(instance: WorkflowInstance): Promise { + public setInstance = async (instance: WorkflowInstance): Promise => { await this.mutex.runExclusive(async () => { const isLarge = (data: string | undefined): boolean => { if (!data) { @@ -284,7 +279,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto const task: TableEntity = { partitionKey: instance.instanceId, rowKey: "", - Input: this.options.serializer.stringify(instance.args), + Input: this.serializer.stringify(instance.args), CreatedTime: this.getDate(instance.start), Name: instance.instanceId, Version: "", @@ -293,7 +288,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto TaskHubName: this.options.taskHubName, CustomStatus: instance.status, ExecutionId: instance.instanceId, - Output: error ? this.options.serializer.stringify(serializeError(instance.error)) : this.options.serializer.stringify(instance.result), + Output: error ? this.serializer.stringify(serializeError(instance.error)) : this.serializer.stringify(instance.result), CompletedTime: this.getDate(instance.end), }; @@ -344,7 +339,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto _Timestamp: this.getDate(activity.start), EventType: "TaskScheduled", ExecutionId: instance.instanceId, - Input: this.options.serializer.stringify(activity.args), + Input: this.serializer.stringify(activity.args), }; if (isLargeHistory(row)) { row.InputBlobName = `${row.partitionKey}/history-${row.rowKey}-${row.EventType}-Input.json.gz`; @@ -370,7 +365,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto EventType: error ? "TaskFailed" : "TaskCompleted", TaskScheduledId: eventId - 1, ExecutionId: instance.instanceId, - Result: error ? this.options.serializer.stringify(serializeError(activity.error)) : this.options.serializer.stringify(activity.result), + Result: error ? this.serializer.stringify(serializeError(activity.error)) : this.serializer.stringify(activity.result), }; if (isLargeHistory(row)) { row.ResultBlobName = `${row.partitionKey}/history-${row.rowKey}-${row.EventType}-Result.json.gz`; @@ -395,7 +390,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto _Timestamp: this.getDate(instance.end), EventType: "ExecutionCompleted", ExecutionId: instance.instanceId, - Result: this.options.serializer.stringify(instance.result), + Result: this.serializer.stringify(instance.result), }; if (isLargeHistory(row)) { @@ -453,9 +448,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto } } }); - } + }; - public async getInstances(): Promise { + public getInstances = async (): Promise => { return await this.mutex.runExclusive(async () => { await this.init(); @@ -477,9 +472,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto return workflows; }); - } + }; - public async getInstanceHeaders(): Promise> { + public getInstanceHeaders = async (): Promise> => { return await this.mutex.runExclusive(async () => { await this.init(); @@ -515,9 +510,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto return headers; }); - } + }; - public async removeInstance(id: string): Promise { + public removeInstance = async (id: string): Promise => { return await this.mutex.runExclusive(async () => { await this.init(); @@ -553,5 +548,5 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto await this.instances.deleteEntity(id, ""); }); - } + }; } diff --git a/src/stores/FileSystemWorkflowHistoryStore.ts b/src/stores/FileSystemWorkflowHistoryStore.ts index c886d99..17d8030 100644 --- a/src/stores/FileSystemWorkflowHistoryStore.ts +++ b/src/stores/FileSystemWorkflowHistoryStore.ts @@ -1,20 +1,20 @@ -import { type IWorkflowHistoryStore, type WorkflowInstance, type WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; +import type { WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; import { resolve, parse as pathParse } from "path"; import { cwd } from "process"; -import * as fs from "fs"; +import * as fs from "node:fs"; import { deserializeError, serializeError } from "../serialize-error"; import { type ISerializer } from "../ISerializer"; -import { DefaultSerializer } from "../DefaultSerializer"; -import { isDeepStrictEqual } from "util"; +import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore"; -export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { +export class FileSystemWorkflowHistoryStore extends SerializedWorkflowHistoryStore { public workflowHistory: Array = []; - private readonly options: { path: string, serializer: ISerializer }; + private readonly options: { path: string }; public constructor(options?: { path?: string, serializer?: ISerializer }) { + super(options?.serializer); + this.options = { path: options?.path ?? resolve(cwd(), "./workflow-history/"), - serializer: options?.serializer ?? new DefaultSerializer(), }; if (!fs.existsSync(this.options.path)) { @@ -22,18 +22,14 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { } } - public equal = (val1: any, val2: any): boolean => { - return (this.options.serializer.equal ?? isDeepStrictEqual)(val1, val2); - }; - - public async getInstance(id: string): Promise { + public getInstance = async (id: string): Promise => { const filePath = resolve(this.options.path, `${id}.json`); if (!fs.existsSync(filePath)) { return await Promise.resolve(undefined); } const contents = fs.readFileSync(filePath, { encoding: "utf-8" }); - const instance: WorkflowInstance = this.options.serializer.parse(contents); + const instance: WorkflowInstance = this.serializer.parse(contents); // Deserialize dates and errors if (instance.start) { instance.start = new Date(instance.start); @@ -59,9 +55,9 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { } return instance; - } + }; - public async setInstance(instance: WorkflowInstance): Promise { + public setInstance = async (instance: WorkflowInstance): Promise => { const current = await this.getInstance(instance.instanceId); const filePath = resolve(this.options.path, `${instance.instanceId}.json`); @@ -78,13 +74,13 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { } if (!current) { - fs.writeFileSync(filePath, this.options.serializer.stringify(instance), { encoding: "utf-8" }); + fs.writeFileSync(filePath, this.serializer.stringify(instance), { encoding: "utf-8" }); } else { Object.assign(current, instance); - fs.writeFileSync(filePath, this.options.serializer.stringify(current), { encoding: "utf-8" }); + fs.writeFileSync(filePath, this.serializer.stringify(current), { encoding: "utf-8" }); } return await Promise.resolve(); - } + }; public async clear(): Promise { const path = this.options.path; @@ -96,7 +92,7 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { }); } - public async getInstances(): Promise> { + public getInstances = async (): Promise> => { let files = fs.readdirSync(this.options.path); files = files.filter(file => fs.lstatSync(file).isFile()); @@ -111,9 +107,9 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { } } return instances; - } + }; - public async getInstanceHeaders(): Promise> { + public getInstanceHeaders = async (): Promise> => { const instances = await this.getInstances(); return await Promise.resolve(instances.map(instance => { return { @@ -124,13 +120,13 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore { error: !!instance.error, }; })); - } + }; - public async removeInstance(id: string): Promise { + public removeInstance = async (id: string): Promise => { const filePath = resolve(this.options.path, `${id}.json`); if (fs.existsSync(filePath)) { fs.unlinkSync(filePath); } - } + }; } diff --git a/src/stores/MemoryWorkflowHistoryStore.ts b/src/stores/MemoryWorkflowHistoryStore.ts index 99a402d..38d58d4 100644 --- a/src/stores/MemoryWorkflowHistoryStore.ts +++ b/src/stores/MemoryWorkflowHistoryStore.ts @@ -1,4 +1,4 @@ -import { isDeepStrictEqual } from "util"; +import { isDeepStrictEqual } from "node:util"; import { type IWorkflowHistoryStore, type WorkflowInstance, type WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; export class MemoryWorkflowHistoryStore implements IWorkflowHistoryStore { diff --git a/src/stores/SerializedWorkflowHistoryStore.ts b/src/stores/SerializedWorkflowHistoryStore.ts new file mode 100644 index 0000000..df7e88a --- /dev/null +++ b/src/stores/SerializedWorkflowHistoryStore.ts @@ -0,0 +1,22 @@ +import { type IWorkflowHistoryStore, type WorkflowInstance, type WorkflowInstanceHeader } from "./IWorkflowHistoryStore"; +import { type ISerializer } from "../ISerializer"; +import { DefaultSerializer } from "../DefaultSerializer"; +import { isDeepStrictEqual } from "node:util"; + +export abstract class SerializedWorkflowHistoryStore implements IWorkflowHistoryStore { + protected readonly serializer: ISerializer; + + public constructor(serializer?: ISerializer) { + this.serializer = serializer ?? new DefaultSerializer(); + } + + public equal = (val1: any, val2: any): boolean => { + return (this.serializer.equal ?? isDeepStrictEqual)(val1, val2); + }; + + abstract getInstance: (id: string) => Promise; + abstract setInstance: (instance: WorkflowInstance) => Promise; + abstract getInstances: () => Promise; + abstract getInstanceHeaders: () => Promise; + abstract removeInstance: (id: string) => Promise; +} diff --git a/src/stores/index.ts b/src/stores/index.ts index 9e30ae5..dabbe3d 100644 --- a/src/stores/index.ts +++ b/src/stores/index.ts @@ -1,3 +1,5 @@ +export * from "./SerializedWorkflowHistoryStore"; export * from "./DurableFunctionsWorkflowHistoryStore"; export * from "./FileSystemWorkflowHistoryStore"; export * from "./MemoryWorkflowHistoryStore"; +export * from "./IWorkflowHistoryStore";