Skip to content

Commit

Permalink
Merge pull request #11 from allanhvam/dev
Browse files Browse the repository at this point in the history
feat: get workflow instance headers
  • Loading branch information
allanhvam authored Nov 30, 2023
2 parents d682a03 + 90b9ebc commit d2bda6c
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.1.0-beta11",
"version": "0.1.0-beta12",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"engines": {
Expand Down
43 changes: 41 additions & 2 deletions src/stores/DurableFunctionsWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowActivityInstance, IWorkflowHistoryStore, WorkflowInstance } from "./IWorkflowHistoryStore";
import { WorkflowActivityInstance, IWorkflowHistoryStore, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore";
import { GetTableEntityResponse, TableClient, TableEntity, TableEntityResult, TableServiceClient, TableTransaction } from "@azure/data-tables";
import { deserializeError, serializeError } from "../serialize-error";
import { BlobServiceClient, ContainerClient } from "@azure/storage-blob";
Expand Down Expand Up @@ -430,7 +430,7 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
public async getInstances(): Promise<WorkflowInstance[]> {
return await this.mutex.runExclusive(async () => {
await this.init();

let instancesIterator = this.instances.listEntities<IDurableFunctionsWorkflowHistory>().byPage({ maxPageSize: 50 });

const workflows = new Array<WorkflowInstance>();
Expand All @@ -444,6 +444,45 @@ export class DurableFunctionsWorkflowHistoryStore implements IWorkflowHistorySto
});
}

public async getInstanceHeaders(): Promise<Array<WorkflowInstanceHeader>> {
return await this.mutex.runExclusive(async () => {
await this.init();

let instancesIterator = this.instances.listEntities<IDurableFunctionsWorkflowHistory>().byPage({ maxPageSize: 50 });

const headers = new Array<WorkflowInstanceHeader>();
for await (const page of instancesIterator) {
for await (const entity of page) {
const id = entity.Name;
try {
const entity = await this.instances.getEntity<IDurableFunctionsWorkflowInstance>(id, "");

let header: WorkflowInstanceHeader = {
instanceId: id,
status: entity.CustomStatus as any,
start: entity.CreatedTime,
end: entity.CompletedTime,
};

if (header.end && entity.Output && entity.RuntimeStatus === "Failed") {
header.error = true;
}

headers.push(header);
} catch (e) {
if (e.statusCode === 404) {
continue;
}
throw e;
}
}
}

return headers;
});
}


public async removeInstance(id: string): Promise<void> {
return await this.mutex.runExclusive(async () => {
await this.init();
Expand Down
15 changes: 14 additions & 1 deletion src/stores/FileSystemWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IWorkflowHistoryStore, WorkflowInstance } from "./IWorkflowHistoryStore";
import { IWorkflowHistoryStore, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore";
import { resolve } from "path";
import { cwd } from "process";
import * as fs from "fs";
Expand Down Expand Up @@ -115,6 +115,19 @@ export class FileSystemWorkflowHistoryStore implements IWorkflowHistoryStore {
return instances;
}

public async getInstanceHeaders(): Promise<Array<WorkflowInstanceHeader>> {
const instances = await this.getInstances();
return Promise.resolve(instances.map(instance => {
return {
instanceId: instance.instanceId,
status: instance.status,
start: instance.start,
end: instance.end,
error: instance.error ? true : false,
};
}));
}

public async removeInstance(id: string): Promise<void> {
let filePath = resolve(this.options.path, `${id}.json`);

Expand Down
9 changes: 7 additions & 2 deletions src/stores/IWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
export type WorkflowInstance = {
export type WorkflowInstanceHeader = {
instanceId: string;
status?: "timeout";
args: Array<unknown>;
start: Date;
end?: Date;
error?: boolean;
}

export type WorkflowInstance = Omit<WorkflowInstanceHeader, "error"> & {
args: Array<unknown>;
result?: unknown;
error?: unknown;

Expand All @@ -23,6 +27,7 @@ export interface IWorkflowHistoryStore {
getInstance: (id: string) => Promise<WorkflowInstance>;
setInstance: (instance: WorkflowInstance) => Promise<void>;
getInstances: () => Promise<Array<WorkflowInstance>>;
getInstanceHeaders: () => Promise<Array<WorkflowInstanceHeader>>;
removeInstance: (id: string) => Promise<void>;
equal(val1: any, val2: any): boolean;
}
14 changes: 13 additions & 1 deletion src/stores/MemoryWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { isDeepStrictEqual } from "util";
import { IWorkflowHistoryStore, WorkflowInstance } from "./IWorkflowHistoryStore";
import { IWorkflowHistoryStore, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore";

export class MemoryWorkflowHistoryStore implements IWorkflowHistoryStore {
public workflowHistory: Array<WorkflowInstance> = [];
Expand All @@ -24,6 +24,18 @@ export class MemoryWorkflowHistoryStore implements IWorkflowHistoryStore {
return Promise.resolve(this.workflowHistory);
}

public async getInstanceHeaders(): Promise<Array<WorkflowInstanceHeader>> {
return Promise.resolve(this.workflowHistory.map(instance => {
return {
instanceId: instance.instanceId,
status: instance.status,
start: instance.start,
end: instance.end,
error: instance.error ? true : false,
};
}));
}

public async removeInstance(id: string): Promise<void> {
const index = this.workflowHistory.findIndex(i => i.instanceId === id);
if (index > -1) {
Expand Down
4 changes: 2 additions & 2 deletions src/tests/store-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ test("Workflow store, removeInstance", async (t) => {
await handle.result();

// Act
let workflowInstances = await store.getInstances();
let workflowInstances = await store.getInstanceHeaders();

// Assert
let workflow = workflowInstances.find(wi => wi.instanceId === workflowId);
t.truthy(workflow);
await store.removeInstance(workflow.instanceId);

workflowInstances = await store.getInstances();
workflowInstances = await store.getInstanceHeaders();
workflow = workflowInstances.find(wi => wi.instanceId === workflowId);
t.falsy(workflow);
});

0 comments on commit d2bda6c

Please sign in to comment.