Skip to content

Commit

Permalink
refactor!: added initial workflows and triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
allanhvam committed Sep 3, 2024
1 parent 31d0ef5 commit e088d08
Show file tree
Hide file tree
Showing 53 changed files with 342 additions and 114 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.1.0-beta17",
"version": "0.1.0",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"type": "module",
Expand Down
13 changes: 0 additions & 13 deletions src/Workflow.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/debug.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Worker } from "./Worker.js";
import { greetWorkflow } from "./tests/workflows/greet-workflow.js";
import { Worker } from "./worker/Worker.js";
import { greetWorkflow } from "./tests/workflow-functions/greet-workflow.js";
import { FileSystemWorkflowHistoryStore } from "./stores/FileSystemWorkflowHistoryStore.js";

const run = async (): Promise<void> => {
Expand Down
13 changes: 8 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
export * from "./proxyActivities.js";
export { Worker as WorkflowWorker } from "./Worker.js";
export * from "./proxy/proxyActivities.js";
export { Worker as WorkflowWorker } from "./worker/Worker.js";
export * from "./stores/index.js";
export * from "./IWorker.js";
export * from "./WorkflowContext.js";
export * from "./IWorkflowContext.js";
export * from "./worker/IWorker.js";
export * from "./worker/WorkflowContext.js";
export * from "./worker/IWorkflowContext.js";
export * from "./workflows/index.js";
export * from "./serialization/index.js";
export * from "./triggers/index.js";
13 changes: 4 additions & 9 deletions src/proxyActivities.ts → src/proxy/proxyActivities.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { isDeepStrictEqual } from "node:util";
import { DefaultRetryPolicy } from "./DefaultRetryPolicy.js";
import type { WorkflowActivity, WorkflowInstance } from "./stores/IWorkflowHistoryStore.js";
import { Worker } from "./Worker.js";

type PromiseFuncKeys<T> = {
[K in keyof T]: T[K] extends ((...args: any[]) => Promise<any>) ? K : never;
}[keyof T];

type OnlyAsync<T> = Pick<T, PromiseFuncKeys<T>>;
import { DefaultRetryPolicy } from "../worker/DefaultRetryPolicy.js";
import type { WorkflowActivity, WorkflowInstance } from "../stores/IWorkflowHistoryStore.js";
import { Worker } from "../worker/Worker.js";
import type { OnlyAsync } from "../types/OnlyAsync.js";

export function proxyActivities<A extends object>(activities: A, options?: { retry?: number }): OnlyAsync<A> {
return new Proxy(activities, {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { type ISerializer } from "./ISerializer.js";

/**
* @internal
*/
export class DefaultSerializer implements ISerializer {
stringify(o: any): string {
return JSON.stringify(o, (k, v) => v === undefined ? null : v);
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions src/serialization/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { serializeError, deserializeError } from "./serialize-error.js";
export { ISerializer } from "./ISerializer.js";
export { DefaultSerializer } from "./DefaultSerializer.js";
File renamed without changes.
71 changes: 55 additions & 16 deletions src/stores/DurableFunctionsWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { GetInstancesOptions, GetInstancesResult, WorkflowActivity, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore.js";
import { type GetTableEntityResponse, TableClient, type TableEntity, type TableEntityResult, TableServiceClient, TableTransaction } from "@azure/data-tables";
import { deserializeError, serializeError } from "../serialize-error/index.js";
import { BlobServiceClient, type ContainerClient } from "@azure/storage-blob";
import { deserializeError, serializeError } from "../serialization/index.js";
import { AnonymousCredential, BlobServiceClient, StorageSharedKeyCredential, type ContainerClient } from "@azure/storage-blob";
import zlib from "zlib";
import { Mutex } from "async-mutex";
import { type ISerializer } from "../ISerializer.js";
import { type ISerializer } from "../serialization/ISerializer.js";
import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore.js";
import { TokenCredential } from "@azure/core-auth";

interface IDurableFunctionsWorkflowHistory {
Name: string
Expand Down Expand Up @@ -36,25 +37,51 @@ interface IDurableFunctionsWorkflowInstance {
}

export class DurableFunctionsWorkflowHistoryStore extends SerializedWorkflowHistoryStore {
public readonly name = "durable-functions";

private initialized?: boolean;
private readonly history: TableClient;
private readonly instances: TableClient;
private readonly largeMessages: ContainerClient;
private readonly mutex = new Mutex();
private readonly options: { connectionString: string, taskHubName: string };
public readonly options: {
connectionString?: string,

tableUrl?: string;
blobUrl?: string;
credential?: TokenCredential;

taskHubName: string,
};

constructor(options: { connectionString: string, taskHubName?: string, serializer?: ISerializer }) {
constructor(options: ({ connectionString: string } | { tableUrl: string, blobUrl: string, credential: TokenCredential }) & { taskHubName?: string, serializer?: ISerializer }) {
super(options?.serializer);
this.options = {
connectionString: options.connectionString,
taskHubName: options.taskHubName ?? "Workflow",
};

const { connectionString, taskHubName } = this.options;
this.history = TableClient.fromConnectionString(connectionString, `${taskHubName}History`);
this.instances = TableClient.fromConnectionString(connectionString, `${taskHubName}Instances`);
if ("connectionString" in options) {
this.options.connectionString = options.connectionString;

const { connectionString, taskHubName } = this.options;
this.history = TableClient.fromConnectionString(connectionString, `${taskHubName}History`);
this.instances = TableClient.fromConnectionString(connectionString, `${taskHubName}Instances`);

const blobServicesClient = BlobServiceClient.fromConnectionString(connectionString);
this.largeMessages = blobServicesClient.getContainerClient(`${taskHubName}-largemessages`.toLowerCase());

return;
}

this.options.tableUrl = options.tableUrl;
this.options.blobUrl = options.blobUrl;
this.options.credential = options.credential;

const { tableUrl, blobUrl, credential, taskHubName } = this.options;
this.history = new TableClient(tableUrl, `${taskHubName}History`, credential);
this.instances = new TableClient(tableUrl, `${taskHubName}Instances`, credential);

const blobServicesClient = BlobServiceClient.fromConnectionString(connectionString);
const blobServicesClient = new BlobServiceClient(blobUrl, credential);
this.largeMessages = blobServicesClient.getContainerClient(`${taskHubName}-largemessages`.toLowerCase());
}

Expand Down Expand Up @@ -102,13 +129,25 @@ export class DurableFunctionsWorkflowHistoryStore extends SerializedWorkflowHist
}

public async clear(): Promise<void> {
const tableServiceClient = TableServiceClient.fromConnectionString(this.options.connectionString);
await tableServiceClient.deleteTable(`${this.options.taskHubName}History`);
await tableServiceClient.deleteTable(`${this.options.taskHubName}Instances`);
if (this.options.connectionString) {
const tableServiceClient = TableServiceClient.fromConnectionString(this.options.connectionString);
await tableServiceClient.deleteTable(`${this.options.taskHubName}History`);
await tableServiceClient.deleteTable(`${this.options.taskHubName}Instances`);

const blobServicesClient = BlobServiceClient.fromConnectionString(this.options.connectionString);
const largeMessages = blobServicesClient.getContainerClient(`${this.options.taskHubName}-largemessages`.toLowerCase());
await largeMessages.deleteIfExists();
}

if (this.options.tableUrl && this.options.blobUrl && this.options.credential) {
const tableServiceClient = new TableServiceClient(this.options.tableUrl, this.options.credential);
await tableServiceClient.deleteTable(`${this.options.taskHubName}History`);
await tableServiceClient.deleteTable(`${this.options.taskHubName}Instances`);

const blobServicesClient = BlobServiceClient.fromConnectionString(this.options.connectionString);
const largeMessages = blobServicesClient.getContainerClient(`${this.options.taskHubName}-largemessages`.toLowerCase());
await largeMessages.deleteIfExists();
const blobServicesClient = new BlobServiceClient(this.options.blobUrl, this.options.credential);
const largeMessages = blobServicesClient.getContainerClient(`${this.options.taskHubName}-largemessages`.toLowerCase());
await largeMessages.deleteIfExists();
}

this.initialized = false;
await this.init();
Expand Down
6 changes: 4 additions & 2 deletions src/stores/FileSystemWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import type { GetInstancesOptions, GetInstancesResult, WorkflowInstance } from "
import { resolve, parse as pathParse } from "path";
import { cwd } from "process";
import * as fs from "node:fs";
import { deserializeError, serializeError } from "../serialize-error/index.js";
import { type ISerializer } from "../ISerializer.js";
import { deserializeError, serializeError } from "../serialization/index.js";
import { type ISerializer } from "../serialization/ISerializer.js";
import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore.js";

export class FileSystemWorkflowHistoryStore extends SerializedWorkflowHistoryStore {
public readonly name = "file-system";

public workflowHistory: Array<WorkflowInstance> = [];
private readonly options: { path: string };

Expand Down
2 changes: 2 additions & 0 deletions src/stores/IWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export type GetInstancesResult = Promise<{
}>;

export interface IWorkflowHistoryStore {
name: string;

getInstance: (id: string) => Promise<WorkflowInstance | undefined>;
setInstance: (instance: WorkflowInstance) => Promise<void>;
removeInstance: (id: string) => Promise<void>;
Expand Down
2 changes: 2 additions & 0 deletions src/stores/MemoryWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { GetInstancesOptions, GetInstancesResult, WorkflowInstance } from "
import { WorkflowHistoryStore } from "./WorkflowHistoryStore.js";

export class MemoryWorkflowHistoryStore extends WorkflowHistoryStore {
public readonly name = "memory";

public workflowHistory: Array<WorkflowInstance> = [];

public getInstance = async (id: string): Promise<WorkflowInstance | undefined> => {
Expand Down
6 changes: 3 additions & 3 deletions src/stores/SerializedWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { WorkflowInstance, GetInstancesOptions, GetInstancesResult } from "./IWorkflowHistoryStore.js";
import type { ISerializer } from "../ISerializer.js";
import { DefaultSerializer } from "../DefaultSerializer.js";
import type { ISerializer } from "../serialization/ISerializer.js";
import { DefaultSerializer } from "../serialization/DefaultSerializer.js";
import { isDeepStrictEqual } from "node:util";
import { WorkflowHistoryStore } from "./WorkflowHistoryStore.js";

export abstract class SerializedWorkflowHistoryStore extends WorkflowHistoryStore {
protected readonly serializer: ISerializer;
public readonly serializer: ISerializer;

public constructor(serializer?: ISerializer) {
super();
Expand Down
2 changes: 2 additions & 0 deletions src/stores/WorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { IWorkflowHistoryStore, WorkflowInstance, GetInstancesOptions, GetInstancesResult, WorkflowInstanceHeader } from "./IWorkflowHistoryStore.js";

export abstract class WorkflowHistoryStore implements IWorkflowHistoryStore {
abstract name: string;

protected getWorkflowInstanceHeaders = async (instances: Array<WorkflowInstance>, options?: GetInstancesOptions): GetInstancesResult => {
let headers = instances.map(this.getWorkflowInstanceHeader);

Expand Down
2 changes: 1 addition & 1 deletion src/tests/activities/get-workflow-id.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowContext } from "../../WorkflowContext.js";
import { WorkflowContext } from "../../worker/WorkflowContext.js";

export async function getWorkflowId(): Promise<string | undefined> {
return WorkflowContext.current()?.workflowId;
Expand Down
5 changes: 5 additions & 0 deletions src/tests/services/MathService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { add } from "../activities/add.js";

export class MathService {
public add = add;
}
8 changes: 4 additions & 4 deletions src/tests/store.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { test } from "node:test";
import assert from "node:assert";
import { Worker } from "../Worker.js";
import { testWorkflow } from "./workflows/test-workflow.js";
import { Worker } from "../worker/Worker.js";
import { testWorkflow } from "./workflow-functions/test-workflow.js";
import { DurableFunctionsWorkflowHistoryStore, MemoryWorkflowHistoryStore, type WorkflowInstanceHeader } from "../stores/index.js";
import { sleep } from "../sleep.js";
import { throwErrorWorkflow } from "./workflows/throw-error-workflow.js";
import { throwErrorWorkflow } from "./workflow-functions/throw-error-workflow.js";

test.before(async () => {
const worker = Worker.getInstance();
Expand All @@ -22,7 +22,7 @@ test.before(async () => {
if (isStorageEmulatorRunning) {
const store = new DurableFunctionsWorkflowHistoryStore({
connectionString: "UseDevelopmentStorage=true",
taskHubName: "StoreTestWorkflow",
taskHubName: "Store",
});
await store.clear();
worker.store = store;
Expand Down
43 changes: 22 additions & 21 deletions src/tests/workflow.test.ts → src/tests/workflow-functions.test.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import { test } from "node:test";
import assert from "node:assert";
import { Worker } from "../Worker.js";
import { greetWorkflow } from "./workflows/greet-workflow.js";
import { incrementCounterWorkflow } from "./workflows/increment-counter-workflow.js";
import { testWorkflow } from "./workflows/test-workflow.js";
import { addWorkflow } from "./workflows/add-workflow.js";
import { voidWorkflow } from "./workflows/void-workflow.js";
import { Worker } from "../worker/Worker.js";
import { greetWorkflow } from "./workflow-functions/greet-workflow.js";
import { incrementCounterWorkflow } from "./workflow-functions/increment-counter-workflow.js";
import { testWorkflow } from "./workflow-functions/test-workflow.js";
import { addWorkflow } from "./workflow-functions/add-workflow.js";
import { voidWorkflow } from "./workflow-functions/void-workflow.js";
import { Counters } from "./activities/Counters.js";
import { timeoutWorkflow } from "./workflows/timeout-workflow.js";
import { distanceWorkflow } from "./workflows/distance-workflow.js";
import { moveWorkflow } from "./workflows/move-workflow.js";
import { throwErrorWorkflow } from "./workflows/throw-error-workflow.js";
import { callTwiceWorkflow } from "./workflows/call-twice-workflow.js";
import { noStore } from "./workflows/no-store.js";
import { nestedWorkflow } from "./workflows/nested-workflow.js";
import { longWorkflow } from "./workflows/long-workflow.js";
import { largeWorkflow } from "./workflows/large-workflow.js";
import { concurrentWorkflow } from "./workflows/concurrent-workflow.js";
import { noTimeoutWorkflow } from "./workflows/no-timeout-workflow.js";
import { nowWorkflow } from "./workflows/now-workflow.js";
import { timeoutWorkflow } from "./workflow-functions/timeout-workflow.js";
import { distanceWorkflow } from "./workflow-functions/distance-workflow.js";
import { moveWorkflow } from "./workflow-functions/move-workflow.js";
import { throwErrorWorkflow } from "./workflow-functions/throw-error-workflow.js";
import { callTwiceWorkflow } from "./workflow-functions/call-twice-workflow.js";
import { noStore } from "./workflow-functions/no-store.js";
import { nestedWorkflow } from "./workflow-functions/nested-workflow.js";
import { longWorkflow } from "./workflow-functions/long-workflow.js";
import { largeWorkflow } from "./workflow-functions/large-workflow.js";
import { concurrentWorkflow } from "./workflow-functions/concurrent-workflow.js";
import { noTimeoutWorkflow } from "./workflow-functions/no-timeout-workflow.js";
import { nowWorkflow } from "./workflow-functions/now-workflow.js";
import { DurableFunctionsWorkflowHistoryStore } from "../stores/index.js";
import { sleep } from "../sleep.js";
import { throwWorkflow } from "./workflows/throw-workflow.js";
import { throwWorkflow } from "./workflow-functions/throw-workflow.js";
import superjson from "superjson";
import { greetServiceWorkflow } from "./workflows/greet-service-workflow.js";
import { stateServiceWorkflow } from "./workflows/state-service-workflow.js";
import { greetServiceWorkflow } from "./workflow-functions/greet-service-workflow.js";
import { stateServiceWorkflow } from "./workflow-functions/state-service-workflow.js";

let isStorageEmulatorRunning = false;

Expand All @@ -43,6 +43,7 @@ test.before(async () => {
if (isStorageEmulatorRunning) {
const store = new DurableFunctionsWorkflowHistoryStore({
connectionString: "UseDevelopmentStorage=true",
taskHubName: "Functions",
});
await store.clear();
worker.store = store;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { add } = proxyActivities(activities, {});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { callTwice } = proxyActivities(activities, { retry: 5 });

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { sleep } = proxyActivities(activities, {});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { getDistance } = proxyActivities(activities, {});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";
import { GreetService } from "../services/GreetService.js";

const greetService = proxyActivities(new GreetService());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { greet } = proxyActivities(activities, {});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { incrementCounter, getCounter } = proxyActivities(activities, {});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as activities from "../activities/index.js";
import { proxyActivities } from "../../proxyActivities.js";
import { proxyActivities } from "../../proxy/proxyActivities.js";

const { greet } = proxyActivities(activities, {});

Expand Down
Loading

0 comments on commit e088d08

Please sign in to comment.