Skip to content

Commit

Permalink
feat: added SerializedWorkflowHistoryStore
Browse files Browse the repository at this point in the history
  • Loading branch information
allanhvam committed May 20, 2024
1 parent 9af4e28 commit 8c41681
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 77 deletions.
40 changes: 20 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.1.0-beta14",
"version": "0.1.0-beta15",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"engines": {
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export * from "./proxyActivities";
export { Worker as WorkflowWorker } from "./Worker";
export * from "./stores/";
export * from "./IWorker";
export * from "./WorkflowContext";
export * from "./IWorkflowContext";
2 changes: 1 addition & 1 deletion src/proxyActivities.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isDeepStrictEqual } from "util";
import { isDeepStrictEqual } from "node:util";
import { DefaultRetryPolicy } from "./DefaultRetryPolicy";
import { deserializeError, serializeError } from "./serialize-error";
import { type WorkflowActivityInstance, type WorkflowInstance } from "./stores/IWorkflowHistoryStore";
Expand Down
55 changes: 25 additions & 30 deletions src/stores/DurableFunctionsWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { type WorkflowActivityInstance, type IWorkflowHistoryStore, type WorkflowInstance, type WorkflowInstanceHeader } from "./IWorkflowHistoryStore";
import type { WorkflowActivityInstance, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore";
import { type GetTableEntityResponse, TableClient, type TableEntity, type TableEntityResult, TableServiceClient, TableTransaction } from "@azure/data-tables";
import { deserializeError, serializeError } from "../serialize-error";
import { BlobServiceClient, type ContainerClient } from "@azure/storage-blob";
import zlib from "zlib";
import { Mutex } from "async-mutex";
import { DefaultSerializer } from "../DefaultSerializer";
import { type ISerializer } from "../ISerializer";
import { isDeepStrictEqual } from "util";
import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore";

interface IDurableFunctionsWorkflowHistory {
Name: string
Expand Down Expand Up @@ -36,19 +35,19 @@ interface IDurableFunctionsWorkflowInstance {
CompletedTime?: Date
}

export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistoryStore {
export class DurableFunctionsWorkflowHistoryStore extends SerializedWorkflowHistoryStore {
private initialized?: boolean;
private readonly history: TableClient;
private readonly instances: TableClient;
private readonly largeMessages: ContainerClient;
private readonly mutex = new Mutex();
private readonly options: { connectionString: string, taskHubName: string, serializer: ISerializer };
private readonly options: { connectionString: string, taskHubName: string };

constructor(options: { connectionString: string, taskHubName?: string, serializer?: ISerializer }) {
super(options?.serializer);
this.options = {
connectionString: options.connectionString,
taskHubName: options.taskHubName ?? "Workflow",
serializer: options.serializer ?? new DefaultSerializer(),
};

const { connectionString, taskHubName } = this.options;
Expand All @@ -59,10 +58,6 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
this.largeMessages = blobServicesClient.getContainerClient(`${taskHubName}-largemessages`.toLowerCase());
}

public equal = (val1: any, val2: any): boolean => {
return (this.options.serializer.equal ?? isDeepStrictEqual)(val1, val2);
};

private async init(): Promise<void> {
if (this.initialized) {
return;
Expand Down Expand Up @@ -119,13 +114,13 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
await this.init();
}

public async getInstance(id: string): Promise<WorkflowInstance | undefined> {
public getInstance = async (id: string): Promise<WorkflowInstance | undefined> => {
return await this.mutex.runExclusive(async () => {
await this.init();

return await this.getInstanceInternal(id);
});
}
};

private async getInstanceInternal(id: string): Promise<WorkflowInstance | undefined> {
async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise<Buffer> {
Expand All @@ -152,7 +147,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
const buffer = await streamToBuffer(downloadBlockBlobResponse.readableStreamBody);

const unzipped = zlib.unzipSync(buffer).toString();
return this.options.serializer.parse(unzipped);
return this.serializer.parse(unzipped);
};

let entity: GetTableEntityResponse<TableEntityResult<IDurableFunctionsWorkflowInstance>> | undefined;
Expand All @@ -178,7 +173,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
if (entity.Input.indexOf("http://") === 0) {
instance.args = await getBlob(`${id}/Input.json.gz`);
} else {
instance.args = this.options.serializer.parse(entity.Input);
instance.args = this.serializer.parse(entity.Input);
}
}

Expand All @@ -193,7 +188,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
args = await getBlob(entity.InputBlobName);
} else {
if (entity.Input) {
args = this.options.serializer.parse(entity.Input);
args = this.serializer.parse(entity.Input);
} else {
args = [];
}
Expand All @@ -212,7 +207,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
if (entity.ResultBlobName) {
result = await getBlob(entity.ResultBlobName);
} else if (entity.Result) {
result = this.options.serializer.parse(entity.Result);
result = this.serializer.parse(entity.Result);
}

if (entity.EventType === "TaskCompleted") {
Expand Down Expand Up @@ -241,7 +236,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
if (entity.Output.indexOf("http://") === 0) {
output = await getBlob(`${id}/Output.json.gz`);
} else {
output = this.options.serializer.parse(entity.Output);
output = this.serializer.parse(entity.Output);
}

if (entity.RuntimeStatus === "Failed") {
Expand All @@ -255,7 +250,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
return instance;
}

public async setInstance(instance: WorkflowInstance): Promise<void> {
public setInstance = async (instance: WorkflowInstance): Promise<void> => {
await this.mutex.runExclusive(async () => {
const isLarge = (data: string | undefined): boolean => {
if (!data) {
Expand Down Expand Up @@ -284,7 +279,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
const task: TableEntity<IDurableFunctionsWorkflowInstance> = {
partitionKey: instance.instanceId,
rowKey: "",
Input: this.options.serializer.stringify(instance.args),
Input: this.serializer.stringify(instance.args),
CreatedTime: this.getDate(instance.start),
Name: instance.instanceId,
Version: "",
Expand All @@ -293,7 +288,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
TaskHubName: this.options.taskHubName,
CustomStatus: instance.status,
ExecutionId: instance.instanceId,
Output: error ? this.options.serializer.stringify(serializeError(instance.error)) : this.options.serializer.stringify(instance.result),
Output: error ? this.serializer.stringify(serializeError(instance.error)) : this.serializer.stringify(instance.result),
CompletedTime: this.getDate(instance.end),
};

Expand Down Expand Up @@ -344,7 +339,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
_Timestamp: this.getDate(activity.start),
EventType: "TaskScheduled",
ExecutionId: instance.instanceId,
Input: this.options.serializer.stringify(activity.args),
Input: this.serializer.stringify(activity.args),
};
if (isLargeHistory(row)) {
row.InputBlobName = `${row.partitionKey}/history-${row.rowKey}-${row.EventType}-Input.json.gz`;
Expand All @@ -370,7 +365,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
EventType: error ? "TaskFailed" : "TaskCompleted",
TaskScheduledId: eventId - 1,
ExecutionId: instance.instanceId,
Result: error ? this.options.serializer.stringify(serializeError(activity.error)) : this.options.serializer.stringify(activity.result),
Result: error ? this.serializer.stringify(serializeError(activity.error)) : this.serializer.stringify(activity.result),
};
if (isLargeHistory(row)) {
row.ResultBlobName = `${row.partitionKey}/history-${row.rowKey}-${row.EventType}-Result.json.gz`;
Expand All @@ -395,7 +390,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
_Timestamp: this.getDate(instance.end),
EventType: "ExecutionCompleted",
ExecutionId: instance.instanceId,
Result: this.options.serializer.stringify(instance.result),
Result: this.serializer.stringify(instance.result),
};

if (isLargeHistory(row)) {
Expand Down Expand Up @@ -453,9 +448,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
}
}
});
}
};

public async getInstances(): Promise<WorkflowInstance[]> {
public getInstances = async (): Promise<WorkflowInstance[]> => {
return await this.mutex.runExclusive(async () => {
await this.init();

Expand All @@ -477,9 +472,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto

return workflows;
});
}
};

public async getInstanceHeaders(): Promise<Array<WorkflowInstanceHeader>> {
public getInstanceHeaders = async (): Promise<Array<WorkflowInstanceHeader>> => {
return await this.mutex.runExclusive(async () => {
await this.init();

Expand Down Expand Up @@ -515,9 +510,9 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto

return headers;
});
}
};

public async removeInstance(id: string): Promise<void> {
public removeInstance = async (id: string): Promise<void> => {
return await this.mutex.runExclusive(async () => {
await this.init();

Expand Down Expand Up @@ -553,5 +548,5 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto

await this.instances.deleteEntity(id, "");
});
}
};
}
Loading

0 comments on commit 8c41681

Please sign in to comment.