diff --git a/daprdocs/content/en/concepts/building-blocks-concept.md b/daprdocs/content/en/concepts/building-blocks-concept.md index 4719626f3c6..1841dd58468 100644 --- a/daprdocs/content/en/concepts/building-blocks-concept.md +++ b/daprdocs/content/en/concepts/building-blocks-concept.md @@ -28,5 +28,5 @@ Dapr provides the following building blocks: | [**Secrets**]({{< ref "secrets-overview.md" >}}) | `/v1.0/secrets` | Dapr provides a secrets building block API and integrates with secret stores such as public cloud stores, local stores and Kubernetes to store the secrets. Services can call the secrets API to retrieve secrets, for example to get a connection string to a database. | [**Configuration**]({{< ref "configuration-api-overview.md" >}}) | `/v1.0/configuration` | The Configuration API enables you to retrieve and subscribe to application configuration items for supported configuration stores. This enables an application to retrieve specific configuration information, for example, at start up or when configuration changes are made in the store. | [**Distributed lock**]({{< ref "distributed-lock-api-overview.md" >}}) | `/v1.0-alpha1/lock` | The distributed lock API enables you to take a lock on a resource so that multiple instances of an application can access the resource without conflicts and provide consistency guarantees. -| [**Workflows**]({{< ref "workflow-overview.md" >}}) | `/v1.0-alpha1/workflow` | The Workflow API enables you to define long running, persistent processes or data flows that span multiple microservices using Dapr workflows or workflow components. The Workflow API can be combined with other Dapr API building blocks. For example, a workflow can call another service with service invocation or retrieve secrets, providing flexibility and portability. +| [**Workflows**]({{< ref "workflow-overview.md" >}}) | `/v1.0-beta1/workflow` | The Workflow API enables you to define long running, persistent processes or data flows that span multiple microservices using Dapr workflows or workflow components. The Workflow API can be combined with other Dapr API building blocks. For example, a workflow can call another service with service invocation or retrieve secrets, providing flexibility and portability. | [**Cryptography**]({{< ref "cryptography-overview.md" >}}) | `/v1.0-alpha1/crypto` | The Cryptography API enables you to perform cryptographic operations, such as encrypting and decrypting messages, without exposing keys to your application. \ No newline at end of file diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 6caa7f3ffc7..ada6335ec44 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -6,6 +6,10 @@ weight: 5000 description: "Learn how to develop and author workflows" --- +{{% alert title="Note" color="primary" %}} +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). +{{% /alert %}} + This article provides a high-level overview of how to author workflows that are executed by the Dapr Workflow engine. {{% alert title="Note" color="primary" %}} @@ -30,7 +34,25 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si [Workflow activities]({{< ref "workflow-features-concepts.md#workflow-activites" >}}) are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process. -{{< tabs ".NET" Python >}} +{{< tabs Python ".NET" Java >}} + +{{% codetab %}} + + + +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. + +```python +def hello_act(ctx: WorkflowActivityContext, input): + global counter + counter += input + print(f'New counter value is: {counter}!', flush=True) +``` + +[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) + + +{{% /codetab %}} {{% codetab %}} @@ -102,29 +124,76 @@ public class ProcessPaymentActivity : WorkflowActivity {{% codetab %}} - + -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. +Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `DemoWorkflowActivity` class, which implements the workflow activities. -```python -def hello_act(ctx: WorkflowActivityContext, input): - global counter - counter += input - print(f'New counter value is: {counter}!', flush=True) -``` +```java +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) +public class DemoWorkflowActivity implements WorkflowActivity { -[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) + @Override + public DemoActivityOutput run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + var message = ctx.getInput(DemoActivityInput.class).getMessage(); + var newMessage = message + " World!, from Activity"; + logger.info("Message Received from input: " + message); + logger.info("Sending message to output: " + newMessage); + + logger.info("Sleeping for 5 seconds to simulate long running operation..."); + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + + logger.info("Activity finished"); + + var output = new DemoActivityOutput(message, newMessage); + logger.info("Activity returned: " + output); + + return output; + } +} +``` + +[See the Java SDK workflow activity example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java) {{% /codetab %}} + {{< /tabs >}} ## Write the workflow Next, register and call the activites in a workflow. -{{< tabs ".NET" Python >}} +{{< tabs Python ".NET" Java >}} + +{{% codetab %}} + + + +The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. + +```python +def hello_world_wf(ctx: DaprWorkflowContext, input): + print(f'{input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.wait_for_external_event("event1") + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) +``` + +[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) + + +{{% /codetab %}} {{% codetab %}} @@ -171,103 +240,42 @@ The `OrderProcessingWorkflow` class is derived from a base class called `Workflo {{% codetab %}} - - -The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. - -```python -def hello_world_wf(ctx: DaprWorkflowContext, input): - print(f'{input}') - yield ctx.call_activity(hello_act, input=1) - yield ctx.call_activity(hello_act, input=10) - yield ctx.wait_for_external_event("event1") - yield ctx.call_activity(hello_act, input=100) - yield ctx.call_activity(hello_act, input=1000) -``` - -[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) - - -{{% /codetab %}} + -{{< /tabs >}} +Next, register the workflow with the `WorkflowRuntimeBuilder` and start the workflow runtime. -## Write the application +```java +public class DemoWorkflowWorker { -Finally, compose the application using the workflow. + public static void main(String[] args) throws Exception { -{{< tabs ".NET" Python >}} + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class); + builder.registerActivity(DemoWorkflowActivity.class); -{{% codetab %}} - - - -[In the following `Program.cs` example](https://github.com/dapr/dotnet-sdk/blob/master/examples/Workflow/WorkflowConsoleApp/Program.cs), for a basic ASP.NET order processing application using the .NET SDK, your project code would include: - -- A NuGet package called `Dapr.Workflow` to receive the .NET SDK capabilities -- A builder with an extension method called `AddDaprWorkflow` - - This will allow you to register workflows and workflow activities (tasks that workflows can schedule) -- HTTP API calls - - One for submitting a new order - - One for checking the status of an existing order - -```csharp -using Dapr.Workflow; -//... + // Build and then start the workflow runtime pulling and executing tasks + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } -// Dapr Workflows are registered as part of the service configuration -builder.Services.AddDaprWorkflow(options => -{ - // Note that it's also possible to register a lambda function as the workflow - // or activity implementation instead of a class. - options.RegisterWorkflow(); + System.exit(0); + } +} +``` - // These are the activities that get invoked by the workflow(s). - options.RegisterActivity(); - options.RegisterActivity(); - options.RegisterActivity(); -}); +[See the Java SDK workflow in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) -WebApplication app = builder.Build(); -// POST starts new order workflow instance -app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) => -{ - if (orderInfo?.Name == null) - { - return Results.BadRequest(new - { - message = "Order data was missing from the request", - example = new OrderPayload("Paperclips", 99.95), - }); - } - -//... -}); - -// GET fetches state for order workflow to report status -app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) => -{ - WorkflowState state = await client.GetWorkflowStateAsync(orderId, true); - if (!state.Exists) - { - return Results.NotFound($"No order with ID = '{orderId}' was found."); - } +{{% /codetab %}} - var httpResponsePayload = new - { - details = state.ReadInputAs(), - status = state.RuntimeStatus.ToString(), - result = state.ReadOutputAs(), - }; +{{< /tabs >}} -//... -}).WithName("GetOrderInfoEndpoint"); +## Write the application -app.Run(); -``` +Finally, compose the application using the workflow. -{{% /codetab %}} +{{< tabs Python ".NET" Java >}} {{% codetab %}} @@ -356,6 +364,124 @@ if __name__ == '__main__': ``` +{{% /codetab %}} + +{{% codetab %}} + + + +[In the following `Program.cs` example](https://github.com/dapr/dotnet-sdk/blob/master/examples/Workflow/WorkflowConsoleApp/Program.cs), for a basic ASP.NET order processing application using the .NET SDK, your project code would include: + +- A NuGet package called `Dapr.Workflow` to receive the .NET SDK capabilities +- A builder with an extension method called `AddDaprWorkflow` + - This will allow you to register workflows and workflow activities (tasks that workflows can schedule) +- HTTP API calls + - One for submitting a new order + - One for checking the status of an existing order + +```csharp +using Dapr.Workflow; +//... + +// Dapr Workflows are registered as part of the service configuration +builder.Services.AddDaprWorkflow(options => +{ + // Note that it's also possible to register a lambda function as the workflow + // or activity implementation instead of a class. + options.RegisterWorkflow(); + + // These are the activities that get invoked by the workflow(s). + options.RegisterActivity(); + options.RegisterActivity(); + options.RegisterActivity(); +}); + +WebApplication app = builder.Build(); + +// POST starts new order workflow instance +app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) => +{ + if (orderInfo?.Name == null) + { + return Results.BadRequest(new + { + message = "Order data was missing from the request", + example = new OrderPayload("Paperclips", 99.95), + }); + } + +//... +}); + +// GET fetches state for order workflow to report status +app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) => +{ + WorkflowState state = await client.GetWorkflowStateAsync(orderId, true); + if (!state.Exists) + { + return Results.NotFound($"No order with ID = '{orderId}' was found."); + } + + var httpResponsePayload = new + { + details = state.ReadInputAs(), + status = state.RuntimeStatus.ToString(), + result = state.ReadOutputAs(), + }; + +//... +}).WithName("GetOrderInfoEndpoint"); + +app.Run(); +``` + +{{% /codetab %}} + +{{% codetab %}} + + + +[As in the following example](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java), a hello-world application using the Java SDK and Dapr Workflow would include: + +- A Java package called `io.dapr.workflows.client` to receive the Java SDK client capabilities. +- An import of `io.dapr.workflows.Workflow` +- The `DemoWorkflow` class which extends `Workflow` +- Creating the workflow with input and output. +- API calls. In the example below, these calls start and call the workflow activities. + +```java +package io.dapr.examples.workflows; + +import com.microsoft.durabletask.CompositeTaskFailedException; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskCanceledException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +/** + * Implementation of the DemoWorkflow for the server side. + */ +public class DemoWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + // ... + ctx.getLogger().info("Calling Activity..."); + var input = new DemoActivityInput("Hello Activity!"); + var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await(); + // ... + }; + } +} +``` + +[See the full Java SDK workflow example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java) + {{% /codetab %}} @@ -377,5 +503,6 @@ Now that you've authored a workflow, learn how to manage it. - [Workflow overview]({{< ref workflow-overview.md >}}) - [Workflow API reference]({{< ref workflow_api.md >}}) - Try out the full SDK examples: - - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) + - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) + - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 99cb87c9b58..fb7ad9d57c5 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -6,45 +6,13 @@ weight: 6000 description: Manage and run workflows --- -Now that you've [authored the workflow and its activities in your application]({{< ref howto-author-workflow.md >}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{< ref workflow_api.md >}}). - -{{< tabs ".NET" Python HTTP >}} - - -{{% codetab %}} - -Manage your workflow within your code. In the `OrderProcessingWorkflow` example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code. You can now start, terminate, and get information about a running workflow: - -```csharp -string orderId = "exampleOrderId"; -string workflowComponent = "dapr"; -string workflowName = "OrderProcessingWorkflow"; -OrderPayload input = new OrderPayload("Paperclips", 99.95); -Dictionary workflowOptions; // This is an optional parameter - -// Start the workflow. This returns back a "StartWorkflowResponse" which contains the instance ID for the particular workflow instance. -StartWorkflowResponse startResponse = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, input, workflowOptions); - -// Get information on the workflow. This response contains information such as the status of the workflow, when it started, and more! -GetWorkflowResponse getResponse = await daprClient.GetWorkflowAsync(orderId, workflowComponent, workflowName); - -// Terminate the workflow -await daprClient.TerminateWorkflowAsync(orderId, workflowComponent); - -// Raise an event (an incoming purchase order) that your workflow will wait for. This returns the item waiting to be purchased. -await daprClient.RaiseWorkflowEventAsync(orderId, workflowComponent, workflowName, input); - -// Pause -await daprClient.PauseWorkflowAsync(orderId, workflowComponent); - -// Resume -await daprClient.ResumeWorkflowAsync(orderId, workflowComponent); +{{% alert title="Note" color="primary" %}} +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). +{{% /alert %}} -// Purge -await daprClient.PurgeWorkflowAsync(orderId, workflowComponent); -``` +Now that you've [authored the workflow and its activities in your application]({{< ref howto-author-workflow.md >}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{< ref workflow_api.md >}}). -{{% /codetab %}} +{{< tabs Python ".NET" Java HTTP >}} {{% codetab %}} @@ -95,6 +63,107 @@ d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponen {{% /codetab %}} + +{{% codetab %}} + +Manage your workflow within your code. In the `OrderProcessingWorkflow` example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code. You can now start, terminate, and get information about a running workflow: + +```csharp +string orderId = "exampleOrderId"; +string workflowComponent = "dapr"; +string workflowName = "OrderProcessingWorkflow"; +OrderPayload input = new OrderPayload("Paperclips", 99.95); +Dictionary workflowOptions; // This is an optional parameter + +// Start the workflow. This returns back a "StartWorkflowResponse" which contains the instance ID for the particular workflow instance. +StartWorkflowResponse startResponse = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, input, workflowOptions); + +// Get information on the workflow. This response contains information such as the status of the workflow, when it started, and more! +GetWorkflowResponse getResponse = await daprClient.GetWorkflowAsync(orderId, workflowComponent, eventName); + +// Terminate the workflow +await daprClient.TerminateWorkflowAsync(orderId, workflowComponent); + +// Raise an event (an incoming purchase order) that your workflow will wait for. This returns the item waiting to be purchased. +await daprClient.RaiseWorkflowEventAsync(orderId, workflowComponent, workflowName, input); + +// Pause +await daprClient.PauseWorkflowAsync(orderId, workflowComponent); + +// Resume +await daprClient.ResumeWorkflowAsync(orderId, workflowComponent); + +// Purge the workflow, removing all inbox and history information from associated instance +await daprClient.PurgeWorkflowAsync(orderId, workflowComponent); +``` + +{{% /codetab %}} + + +{{% codetab %}} + +Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs: + +- **scheduleNewWorkflow**: Starts a new workflow instance +- **getInstanceState**: Get information on the status of the workflow +- **waitForInstanceStart**: Pauses or suspends a workflow instance that can later be resumed +- **raiseEvent**: Raises events/tasks for the running workflow instance +- **waitForInstanceCompletion**: Waits for the workflow to complete its tasks +- **purgeInstance**: Removes all metadata related to a specific workflow instance +- **terminateWorkflow**: Terminates the workflow +- **purgeInstance**: Removes all metadata related to a specific workflow + +```java +package io.dapr.examples.workflows; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +// ... +public class DemoWorkflowClient { + + // ... + public static void main(String[] args) throws InterruptedException { + DaprWorkflowClient client = new DaprWorkflowClient(); + + try (client) { + // Start a workflow + String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data"); + + // Get status information on the workflow + WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true); + + // Wait or pause for the workflow instance start + try { + WorkflowInstanceStatus waitForInstanceStartResult = + client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true); + } + + // Raise an event for the workflow; you can raise several events in parallel + client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); + client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); + client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); + + // Wait for workflow to complete running through tasks + try { + WorkflowInstanceStatus waitForInstanceCompletionResult = + client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); + } + + // Purge the workflow instance, removing all metadata associated with it + boolean purgeResult = client.purgeInstance(instanceId); + + // Terminate the workflow instance + client.terminateWorkflow(instanceToTerminateId, null); + + System.exit(0); + } +} +``` + +{{% /codetab %}} + {{% codetab %}} @@ -106,7 +175,7 @@ Manage your workflow using HTTP calls. The example below plugs in the properties To start your workflow with an ID `12345678`, run: ```http -POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 +POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 ``` Note that workflow instance IDs can only contain alphanumeric characters, underscores, and dashes. @@ -116,7 +185,7 @@ Note that workflow instance IDs can only contain alphanumeric characters, unders To terminate your workflow with an ID `12345678`, run: ```http -POST http://localhost:3500/v1.0-alpha1/workflows/dapr/12345678/terminate +POST http://localhost:3500/v1.0-beta1/workflows/dapr/12345678/terminate ``` ### Raise an event @@ -124,7 +193,7 @@ POST http://localhost:3500/v1.0-alpha1/workflows/dapr/12345678/terminate For workflow components that support subscribing to external events, such as the Dapr Workflow engine, you can use the following "raise event" API to deliver a named event to a specific workflow instance. ```http -POST http://localhost:3500/v1.0-alpha1/workflows///raiseEvent/ +POST http://localhost:3500/v1.0-beta1/workflows///raiseEvent/ ``` > An `eventName` can be any function. @@ -134,13 +203,13 @@ POST http://localhost:3500/v1.0-alpha1/workflows//}}). @@ -172,6 +241,8 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref ## Next steps - [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}}) - Try out the full SDK examples: - - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Python example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) + - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) + - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Workflow API reference]({{< ref workflow_api.md >}}) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md index da8bf0a44e1..18ec9110b30 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md @@ -6,6 +6,10 @@ weight: 4000 description: "The Dapr Workflow engine architecture" --- +{{% alert title="Note" color="primary" %}} +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). +{{% /alert %}} + [Dapr Workflows]({{< ref "workflow-overview.md" >}}) allow developers to define workflows using ordinary code in a variety of programming languages. The workflow engine runs inside of the Dapr sidecar and orchestrates workflow code deployed as part of your application. This article describes: - The architecture of the Dapr Workflow engine @@ -189,4 +193,7 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch - [Workflow overview]({{< ref workflow-overview.md >}}) - [Workflow API reference]({{< ref workflow_api.md >}}) - [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}}) -- [Try out the .NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) +- Try out the following examples: + - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) + - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) + - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) \ No newline at end of file diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index c8e3de13187..e957ade3d56 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -6,6 +6,10 @@ weight: 2000 description: "Learn more about the Dapr Workflow features and concepts" --- +{{% alert title="Note" color="primary" %}} +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). +{{% /alert %}} + Now that you've learned about the [workflow building block]({{< ref workflow-overview.md >}}) at a high level, let's deep dive into the features and concepts included with the Dapr Workflow engine and SDKs. Dapr Workflow exposes several core features and concepts which are common across all supported languages. {{% alert title="Note" color="primary" %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index 9f70500c01f..923bc37947e 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -7,7 +7,7 @@ description: "Overview of Dapr Workflow" --- {{% alert title="Note" color="primary" %}} -Dapr Workflow is currently in alpha. +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "#limitations" >}}). {{% /alert %}} Dapr workflow makes it easy for developers to write business logic and integrations in a reliable way. Since Dapr workflows are stateful, they support long-running and fault-tolerant applications, ideal for orchestrating microservices. Dapr workflow works seamlessly with other Dapr building blocks, such as service invocation, pub/sub, state management, and bindings. @@ -83,9 +83,9 @@ You can use the following SDKs to author a workflow. | Language stack | Package | | - | - | -| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) | | Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) | - +| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) | +| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) | ## Try out workflows @@ -95,15 +95,23 @@ Want to put workflows to the test? Walk through the following quickstart and tut | Quickstart/tutorial | Description | | ------------------- | ----------- | -| [Workflow quickstart]({{< ref workflow-quickstart.md >}}) | Run a .NET workflow application with four workflow activities to see Dapr Workflow in action | -| [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. | +| [Workflow quickstart]({{< ref workflow-quickstart.md >}}) | Run a workflow application with four workflow activities to see Dapr Workflow in action | | [Workflow Python SDK example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) | Learn how to create a Dapr Workflow and invoke it using the Python `DaprClient` package. | +| [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. | +| [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. | ### Start using workflows directly in your app Want to skip the quickstarts? Not a problem. You can try out the workflow building block directly in your application. After [Dapr is installed]({{< ref install-dapr-cli.md >}}), you can begin using workflows, starting with [how to author a workflow]({{< ref howto-author-workflow.md >}}). +## Limitations + +With Dapr Workflow in beta stage comes the following limitation(s): + +- **State stores:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, you're not able to use NoSQL databases. Only SQL databases are supported in the latest release. +- **Application instances:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, only a maximum of 2 application instances is supported. + ## Watch the demo Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo?t=131): @@ -120,3 +128,4 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo - Try out the full SDK examples: - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) + - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 4ff10782be4..49b5fc2db01 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -25,42 +25,7 @@ While the pattern is simple, there are many complexities hidden in the implement Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example. -{{< tabs ".NET" Python >}} - -{{% codetab %}} - - -```csharp -// Expotential backoff retry policy that survives long outages -var retryOptions = new WorkflowTaskOptions -{ - RetryPolicy = new WorkflowRetryPolicy( - firstRetryInterval: TimeSpan.FromMinutes(1), - backoffCoefficient: 2.0, - maxRetryInterval: TimeSpan.FromHours(1), - maxNumberOfAttempts: 10), -}; - -try -{ - var result1 = await context.CallActivityAsync("Step1", wfInput, retryOptions); - var result2 = await context.CallActivityAsync("Step2", result1, retryOptions); - var result3 = await context.CallActivityAsync("Step3", result2, retryOptions); - return string.Join(", ", result4); -} -catch (TaskFailedException) // Task failures are surfaced as TaskFailedException -{ - // Retries expired - apply custom compensation logic - await context.CallActivityAsync("MyCompensation", options: retryOptions); - throw; -} -``` - -{{% alert title="Note" color="primary" %}} -In the example above, `"Step1"`, `"Step2"`, `"Step3"`, and `"MyCompensation"` represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example. -{{% /alert %}} - -{{% /codetab %}} +{{< tabs Python ".NET" Java >}} {{% codetab %}} @@ -103,9 +68,63 @@ def error_handler(ctx, error): # Do some compensating work ``` -{{% alert title="Note" color="primary" %}} -Workflow retry policies will be available in a future version of the Python SDK. -{{% /alert %}} +> **Note** Workflow retry policies will be available in a future version of the Python SDK. + +{{% /codetab %}} + +{{% codetab %}} + + +```csharp +// Expotential backoff retry policy that survives long outages +var retryOptions = new WorkflowTaskOptions +{ + RetryPolicy = new WorkflowRetryPolicy( + firstRetryInterval: TimeSpan.FromMinutes(1), + backoffCoefficient: 2.0, + maxRetryInterval: TimeSpan.FromHours(1), + maxNumberOfAttempts: 10), +}; + +try +{ + var result1 = await context.CallActivityAsync("Step1", wfInput, retryOptions); + var result2 = await context.CallActivityAsync("Step2", result1, retryOptions); + var result3 = await context.CallActivityAsync("Step3", result2, retryOptions); + return string.Join(", ", result4); +} +catch (TaskFailedException) // Task failures are surfaced as TaskFailedException +{ + // Retries expired - apply custom compensation logic + await context.CallActivityAsync("MyCompensation", options: retryOptions); + throw; +} +``` + +> **Note** In the example above, `"Step1"`, `"Step2"`, `"Step3"`, and `"MyCompensation"` represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example. + +{{% /codetab %}} + +{{% codetab %}} + + +```java +public static void main(String[] args) throws InterruptedException { + DaprWorkflowClient client = new DaprWorkflowClient(); + + try (client) { + client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + + System.out.println(separatorStr); + System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); + client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); + client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); + System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); + + } +} +``` {{% /codetab %}} @@ -135,32 +154,7 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example: -{{< tabs ".NET" Python >}} - -{{% codetab %}} - - -```csharp -// Get a list of N work items to process in parallel. -object[] workBatch = await context.CallActivityAsync("GetWorkBatch", null); - -// Schedule the parallel tasks, but don't wait for them to complete yet. -var parallelTasks = new List>(workBatch.Length); -for (int i = 0; i < workBatch.Length; i++) -{ - Task task = context.CallActivityAsync("ProcessWorkItem", workBatch[i]); - parallelTasks.Add(task); -} - -// Everything is scheduled. Wait here until all parallel tasks have completed. -await Task.WhenAll(parallelTasks); - -// Aggregate all N outputs and publish the result. -int sum = parallelTasks.Sum(t => t.Result); -await context.CallActivityAsync("PostResults", sum); -``` - -{{% /codetab %}} +{{< tabs Python ".NET" Java >}} {{% codetab %}} @@ -202,6 +196,80 @@ def process_results(ctx, final_result: int): {{% /codetab %}} +{{% codetab %}} + + +```csharp +// Get a list of N work items to process in parallel. +object[] workBatch = await context.CallActivityAsync("GetWorkBatch", null); + +// Schedule the parallel tasks, but don't wait for them to complete yet. +var parallelTasks = new List>(workBatch.Length); +for (int i = 0; i < workBatch.Length; i++) +{ + Task task = context.CallActivityAsync("ProcessWorkItem", workBatch[i]); + parallelTasks.Add(task); +} + +// Everything is scheduled. Wait here until all parallel tasks have completed. +await Task.WhenAll(parallelTasks); + +// Aggregate all N outputs and publish the result. +int sum = parallelTasks.Sum(t => t.Result); +await context.CallActivityAsync("PostResults", sum); +``` + +{{% /codetab %}} + +{{% codetab %}} + + +```java +public static void main(String[] args) throws InterruptedException { + DaprWorkflowClient client = new DaprWorkflowClient(); + + try (client) { + + System.out.println(separatorStr); + System.out.println("**SendExternalMessage**"); + client.raiseEvent(instanceId, "TestEvent", "TestEventPayload"); + + // Get events to process in parallel + System.out.println(separatorStr); + System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload"); + client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload"); + client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload"); + System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId); + + // Register the raised events to be captured + System.out.println(separatorStr); + System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **"); + client.raiseEvent(instanceId, "e2", "event 2 Payload"); + System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId); + + // Wait for all tasks to complete and aggregate results + System.out.println(separatorStr); + System.out.println("**WaitForInstanceCompletion**"); + try { + WorkflowInstanceStatus waitForInstanceCompletionResult = + client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true); + System.out.printf("Result: %s%n", waitForInstanceCompletionResult); + } catch (TimeoutException ex) { + System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex); + } + + System.out.println(separatorStr); + System.out.println("**purgeInstance**"); + boolean purgeResult = client.purgeInstance(instanceId); + System.out.printf("purgeResult: %s%n", purgeResult); + + } +} +``` + +{{% /codetab %}} + {{< /tabs >}} The key takeaways from this example are: @@ -233,7 +301,7 @@ The Dapr workflow HTTP API supports the asynchronous request-reply pattern out-o The following `curl` commands illustrate how the workflow APIs support this pattern. ```bash -curl -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}' +curl -X POST http://localhost:3500/v1.0-beta1/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}' ``` The previous command will result in the following response JSON: @@ -245,7 +313,7 @@ The previous command will result in the following response JSON: The HTTP client can then construct the status query URL using the workflow instance ID and poll it repeatedly until it sees the "COMPLETE", "FAILURE", or "TERMINATED" status in the payload. ```bash -curl http://localhost:3500/v1.0-alpha1/workflows/dapr/12345678 +curl http://localhost:3500/v1.0-beta1/workflows/dapr/12345678 ``` The following is an example of what an in-progress workflow status might look like. @@ -302,48 +370,7 @@ Depending on the business needs, there may be a single monitor or there may be m Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input. -{{< tabs ".NET" Python >}} - -{{% codetab %}} - - -```csharp -public override async Task RunAsync(WorkflowContext context, MyEntityState myEntityState) -{ - TimeSpan nextSleepInterval; - - var status = await context.CallActivityAsync("GetStatus"); - if (status == "healthy") - { - myEntityState.IsHealthy = true; - - // Check less frequently when in a healthy state - nextSleepInterval = TimeSpan.FromMinutes(60); - } - else - { - if (myEntityState.IsHealthy) - { - myEntityState.IsHealthy = false; - await context.CallActivityAsync("SendAlert", myEntityState); - } - - // Check more frequently when in an unhealthy state - nextSleepInterval = TimeSpan.FromMinutes(5); - } - - // Put the workflow to sleep until the determined time - await context.CreateTimer(nextSleepInterval); - - // Restart from the beginning with the updated state - context.ContinueAsNew(myEntityState); - return null; -} -``` - -> This example assumes you have a predefined `MyEntityState` class with a boolean `IsHealthy` property. - -{{% /codetab %}} +{{< tabs Python ".NET" Java >}} {{% codetab %}} @@ -392,6 +419,56 @@ def send_alert(ctx, message: str): {{% /codetab %}} +{{% codetab %}} + + +```csharp +public override async Task RunAsync(WorkflowContext context, MyEntityState myEntityState) +{ + TimeSpan nextSleepInterval; + + var status = await context.CallActivityAsync("GetStatus"); + if (status == "healthy") + { + myEntityState.IsHealthy = true; + + // Check less frequently when in a healthy state + nextSleepInterval = TimeSpan.FromMinutes(60); + } + else + { + if (myEntityState.IsHealthy) + { + myEntityState.IsHealthy = false; + await context.CallActivityAsync("SendAlert", myEntityState); + } + + // Check more frequently when in an unhealthy state + nextSleepInterval = TimeSpan.FromMinutes(5); + } + + // Put the workflow to sleep until the determined time + await context.CreateTimer(nextSleepInterval); + + // Restart from the beginning with the updated state + context.ContinueAsNew(myEntityState); + return null; +} +``` + +> This example assumes you have a predefined `MyEntityState` class with a boolean `IsHealthy` property. + +{{% /codetab %}} + +{{% codetab %}} + + +```java +todo +``` + +{{% /codetab %}} + {{< /tabs >}} A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_. @@ -420,53 +497,7 @@ The following diagram illustrates this flow. The following example code shows how this pattern can be implemented using Dapr Workflow. -{{< tabs ".NET" Python >}} - -{{% codetab %}} - - -```csharp -public override async Task RunAsync(WorkflowContext context, OrderPayload order) -{ - // ...(other steps)... - - // Require orders over a certain threshold to be approved - if (order.TotalCost > OrderApprovalThreshold) - { - try - { - // Request human approval for this order - await context.CallActivityAsync(nameof(RequestApprovalActivity), order); - - // Pause and wait for a human to approve the order - ApprovalResult approvalResult = await context.WaitForExternalEventAsync( - eventName: "ManagerApproval", - timeout: TimeSpan.FromDays(3)); - if (approvalResult == ApprovalResult.Rejected) - { - // The order was rejected, end the workflow here - return new OrderResult(Processed: false); - } - } - catch (TaskCanceledException) - { - // An approval timeout results in automatic order cancellation - return new OrderResult(Processed: false); - } - } - - // ...(other steps)... - - // End the workflow with a success result - return new OrderResult(Processed: true); -} -``` - -{{% alert title="Note" color="primary" %}} -In the example above, `RequestApprovalActivity` is the name of a workflow activity to invoke and `ApprovalResult` is an enumeration defined by the workflow app. For brevity, these definitions were left out of the example code. -{{% /alert %}} - -{{% /codetab %}} +{{< tabs Python ".NET" Java >}} {{% codetab %}} @@ -527,26 +558,101 @@ def place_order(_, order: Order) -> None: {{% /codetab %}} -{{< /tabs >}} +{{% codetab %}} + -The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example: +```csharp +public override async Task RunAsync(WorkflowContext context, OrderPayload order) +{ + // ...(other steps)... + + // Require orders over a certain threshold to be approved + if (order.TotalCost > OrderApprovalThreshold) + { + try + { + // Request human approval for this order + await context.CallActivityAsync(nameof(RequestApprovalActivity), order); + + // Pause and wait for a human to approve the order + ApprovalResult approvalResult = await context.WaitForExternalEventAsync( + eventName: "ManagerApproval", + timeout: TimeSpan.FromDays(3)); + if (approvalResult == ApprovalResult.Rejected) + { + // The order was rejected, end the workflow here + return new OrderResult(Processed: false); + } + } + catch (TaskCanceledException) + { + // An approval timeout results in automatic order cancellation + return new OrderResult(Processed: false); + } + } -{{< tabs ".NET" Python >}} + // ...(other steps)... + + // End the workflow with a success result + return new OrderResult(Processed: true); +} +``` + +> **Note** In the example above, `RequestApprovalActivity` is the name of a workflow activity to invoke and `ApprovalResult` is an enumeration defined by the workflow app. For brevity, these definitions were left out of the example code. + +{{% /codetab %}} {{% codetab %}} - + + +```java +public static void main(String[] args) throws InterruptedException { + DaprWorkflowClient client = new DaprWorkflowClient(); + + try (client) { + String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class); + System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId); + client.raiseEvent(eventInstanceId, "TestException", null); + System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId); + + System.out.println(separatorStr); + String instanceToTerminateId = "terminateMe"; + client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId); + System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId); + + TimeUnit.SECONDS.sleep(5); + System.out.println("Terminate this workflow instance manually before the timeout is reached"); + client.terminateWorkflow(instanceToTerminateId, null); + System.out.println(separatorStr); + + String restartingInstanceId = "restarting"; + client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId); + System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId); + System.out.println("Sleeping 30 seconds to restart the workflow"); + TimeUnit.SECONDS.sleep(30); + + System.out.println("**SendExternalMessage: RestartEvent**"); + client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); + + System.out.println("Sleeping 30 seconds to terminate the eternal workflow"); + TimeUnit.SECONDS.sleep(30); + client.terminateWorkflow(restartingInstanceId, null); + } -```csharp -// Raise the workflow event to the waiting workflow -await daprClient.RaiseWorkflowEventAsync( - instanceId: orderId, - workflowComponent: "dapr", - eventName: "ManagerApproval", - eventData: ApprovalResult.Approved); + System.out.println("Exiting DemoWorkflowClient."); + System.exit(0); + +} ``` {{% /codetab %}} +{{< /tabs >}} + +The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example: + +{{< tabs Python ".NET" Java >}} + {{% codetab %}} @@ -564,6 +670,30 @@ with DaprClient() as d: {{% /codetab %}} +{{% codetab %}} + + +```csharp +// Raise the workflow event to the waiting workflow +await daprClient.RaiseWorkflowEventAsync( + instanceId: orderId, + workflowComponent: "dapr", + eventName: "ManagerApproval", + eventData: ApprovalResult.Approved); +``` + +{{% /codetab %}} + +{{% codetab %}} + + +```java +System.out.println("**SendExternalMessage: RestartEvent**"); +client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); +``` + +{{% /codetab %}} + {{< /tabs >}} External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API. @@ -577,4 +707,7 @@ External events don't have to be directly triggered by humans. They can also be - [Try out Dapr Workflows using the quickstart]({{< ref workflow-quickstart.md >}}) - [Workflow overview]({{< ref workflow-overview.md >}}) - [Workflow API reference]({{< ref workflow_api.md >}}) -- [Try out the .NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) +- Try out the following examples: + - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) + - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) + - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) \ No newline at end of file diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index d139561cb04..2ac77f77146 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -7,7 +7,7 @@ description: Get started with the Dapr Workflow building block --- {{% alert title="Note" color="primary" %}} -The workflow building block is currently in **alpha**. +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). {{% /alert %}} Let's take a look at the Dapr [Workflow building block]({{< ref workflow >}}). In this Quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and the workflow management APIs. @@ -21,7 +21,251 @@ In this guide, you'll: -{{< tabs ".NET" "Python" >}} +{{< tabs "Python" ".NET" "Java" >}} + + +{{% codetab %}} + +The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: + +- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: + - You have insufficient inventory + - Your payment couldn't be processed, etc. +- `process_payment_activity`: Processes and authorizes the payment. +- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. +- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. +- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. + +### Step 1: Pre-requisites + +For this example, you will need: + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). +- [Python 3.7+ installed](https://www.python.org/downloads/). + +- [Docker Desktop](https://www.docker.com/products/docker-desktop) + + +### Step 2: Set up the environment + +Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows). + +```bash +git clone https://github.com/dapr/quickstarts.git +``` + +In a new terminal window, navigate to the `order-processor` directory: + +```bash +cd workflows/python/sdk/order-processor +``` + +Install the Dapr Python SDK package: + +```bash +pip3 install -r requirements.txt +``` + +### Step 3: Run the order processor app + +In the terminal, start the order processor app alongside a Dapr sidecar: + +```bash +dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +``` + +> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. + +This starts the `order-processor` app with unique workflow ID and runs the workflow activities. + +Expected output: + +```bash +== APP == Starting order workflow, purchasing 10 of cars +== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... +== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! +== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars +== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase +== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars +== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval +== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! +== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD +== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully +== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars +== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock +== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! +== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED +== APP == Workflow completed! Result: Completed +== APP == Purchase of item is Completed +``` + +### (Optional) Step 4: View in Zipkin + +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: + +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + + +### What happened? + +When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: + +1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +1. Your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The workflow terminates as completed. + +#### `order-processor/app.py` + +In the application's program file: +- The unique workflow order ID is generated +- The workflow is scheduled +- The workflow status is retrieved +- The workflow and the workflow activities it invokes are registered + +```python +class WorkflowConsoleApp: + def main(self): + # Register workflow and activities + workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) + workflowRuntime.register_workflow(order_processing_workflow) + workflowRuntime.register_activity(notify_activity) + workflowRuntime.register_activity(requst_approval_activity) + workflowRuntime.register_activity(verify_inventory_activity) + workflowRuntime.register_activity(process_payment_activity) + workflowRuntime.register_activity(update_inventory_activity) + workflowRuntime.start() + + print("==========Begin the purchase of item:==========", flush=True) + item_name = default_item_name + order_quantity = 10 + + total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost + order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + + # Start Workflow + print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) + start_resp = daprClient.start_workflow(workflow_component=workflow_component, + workflow_name=workflow_name, + input=order) + _id = start_resp.instance_id + + def prompt_for_approval(daprClient: DaprClient): + daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, + event_name="manager_approval", event_data={'approval': True}) + + approval_seeked = False + start_time = datetime.now() + while True: + time_delta = datetime.now() - start_time + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == "Completed" or\ + state.runtime_status == "Failed" or\ + state.runtime_status == "Terminated": + print(f'Workflow completed! Result: {state.runtime_status}', flush=True) + break + if time_delta.total_seconds() >= 10: + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if total_cost > 50000 and ( + state.runtime_status != "Completed" or + state.runtime_status != "Failed" or + state.runtime_status != "Terminated" + ) and not approval_seeked: + approval_seeked = True + threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() + + print("Purchase of item is ", state.runtime_status, flush=True) + + def restock_inventory(self, daprClient: DaprClient, baseInventory): + for key, item in baseInventory.items(): + print(f'item: {item}') + item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ + "per_item_cost": {item.per_item_cost}}}' + daprClient.save_state("statestore-actors", key, item_str) + +if __name__ == '__main__': + app = WorkflowConsoleApp() + app.main() +``` + +#### `order-processor/workflow.py` + +In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```python + def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): + """Defines the order processing workflow. + When the order is received, the inventory is checked to see if there is enough inventory to + fulfill the order. If there is enough inventory, the payment is processed and the inventory is + updated. If there is not enough inventory, the order is rejected. + If the total order is greater than $50,000, the order is sent to a manager for approval. + """ + order_id = ctx.instance_id + order_payload=json.loads(order_payload_str) + yield ctx.call_activity(notify_activity, + input=Notification(message=('Received order ' +order_id+ ' for ' + +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' + +' at $'+f'{order_payload["total_cost"]}' +' !'))) + result = yield ctx.call_activity(verify_inventory_activity, + input=InventoryRequest(request_id=order_id, + item_name=order_payload["item_name"], + quantity=order_payload["quantity"])) + if not result.success: + yield ctx.call_activity(notify_activity, + input=Notification(message='Insufficient inventory for ' + +f'{order_payload["item_name"]}'+'!')) + return OrderResult(processed=False) + + if order_payload["total_cost"] > 50000: + yield ctx.call_activity(requst_approval_activity, input=order_payload) + approval_task = ctx.wait_for_external_event("manager_approval") + timeout_event = ctx.create_timer(timedelta(seconds=200)) + winner = yield when_any([approval_task, timeout_event]) + if winner == timeout_event: + yield ctx.call_activity(notify_activity, + input=Notification(message='Payment for order '+order_id + +' has been cancelled due to timeout!')) + return OrderResult(processed=False) + approval_result = yield approval_task + if approval_result["approval"]: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been approved!')) + else: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been rejected!')) + return OrderResult(processed=False) + + yield ctx.call_activity(process_payment_activity, input=PaymentRequest( + request_id=order_id, item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], quantity=order_payload["quantity"])) + + try: + yield ctx.call_activity(update_inventory_activity, + input=PaymentRequest(request_id=order_id, + item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], + quantity=order_payload["quantity"])) + except Exception: + yield ctx.call_activity(notify_activity, + input=Notification(message=f'Order {order_id} Failed!')) + return OrderResult(processed=False) + + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Order {order_id} has completed!')) + return OrderResult(processed=True) +``` +{{% /codetab %}} {{% codetab %}} @@ -99,7 +343,13 @@ Expected output: ### (Optional) Step 4: View in Zipkin -If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: + +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). @@ -254,27 +504,35 @@ The `Activities` directory holds the four workflow activities used by the workfl - `ProcessPaymentActivity.cs` - `UpdateInventoryActivity.cs` +## Watch the demo + +Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): + + + {{% /codetab %}} - + {{% codetab %}} -The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: +The `order-processor` console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks: +- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow +- `RequestApprovalActivity`: Requests approval for processing payment +- `ReserveInventoryActivity`: Checks the state store to ensure that there is enough inventory for the purchase +- `ProcessPaymentActivity`: Processes and authorizes the payment +- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value -- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: - - You have insufficient inventory - - Your payment couldn't be processed, etc. -- `process_payment_activity`: Processes and authorizes the payment. -- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. -- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. -- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. ### Step 1: Pre-requisites For this example, you will need: - [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). -- [Python 3.7+ installed](https://www.python.org/downloads/). +- Java JDK 11 (or greater): + - [Microsoft JDK 11](https://docs.microsoft.com/java/openjdk/download#openjdk-11) + - [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + - [OpenJDK 11](https://jdk.java.net/11/) +- [Apache Maven](https://maven.apache.org/install.html) version 3.x. - [Docker Desktop](https://www.docker.com/products/docker-desktop) @@ -287,16 +545,16 @@ Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quic git clone https://github.com/dapr/quickstarts.git ``` -In a new terminal window, navigate to the `order-processor` directory: +Navigate to the `order-processor` directory: ```bash -cd workflows/python/sdk/order-processor +cd workflows/java/sdk/order-processor ``` -Install the Dapr Python SDK package: +Install the dependencies: ```bash -pip3 install -r requirements.txt +mvn clean install ``` ### Step 3: Run the order processor app @@ -304,54 +562,70 @@ pip3 install -r requirements.txt In the terminal, start the order processor app alongside a Dapr sidecar: ```bash -dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp ``` -> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. - This starts the `order-processor` app with unique workflow ID and runs the workflow activities. Expected output: -```bash +``` +== APP == *** Welcome to the Dapr Workflow console app sample! +== APP == *** Using this app, you can place orders that start workflows. +== APP == Start workflow runtime +== APP == Sep 20, 2023 3:23:05 PM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. + +== APP == ==========Begin the purchase of item:========== == APP == Starting order workflow, purchasing 10 of cars -== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... -== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! -== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars -== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase -== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars -== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval -== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! -== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD -== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully -== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars -== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock -== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! -== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED -== APP == Workflow completed! Result: Completed -== APP == Purchase of item is Completed + +== APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T19:23:09.755Z +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 started +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: edceba90-9c45-4be8-ad40-60d16e060797 for 10 cars at $150000 +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'edceba90-9c45-4be8-ad40-60d16e060797' processed successfully +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797': there are now 90 cars left in stock +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : edceba90-9c45-4be8-ad40-60d16e060797 + +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 completed, out is: {"processed":true} ``` ### (Optional) Step 4: View in Zipkin -If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: - +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + ### What happened? -When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: +When you ran `dapr run`: -1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. A unique order ID for the workflow is generated (in the above example, `edceba90-9c45-4be8-ad40-60d16e060797`) and the workflow is scheduled. 1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. 1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. -1. Your workflow starts and notifies you of its status. -1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. Once approved, your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `edceba90-9c45-4be8-ad40-60d16e060797` and confirms if successful. 1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. -1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `edceba90-9c45-4be8-ad40-60d16e060797` has completed. 1. The workflow terminates as completed. -#### `order-processor/app.py` +#### `order-processor/WorkflowConsoleApp.java` In the application's program file: - The unique workflow order ID is generated @@ -359,151 +633,227 @@ In the application's program file: - The workflow status is retrieved - The workflow and the workflow activities it invokes are registered -```python -class WorkflowConsoleApp: - def main(self): - # Register workflow and activities - workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) - workflowRuntime.register_workflow(order_processing_workflow) - workflowRuntime.register_activity(notify_activity) - workflowRuntime.register_activity(requst_approval_activity) - workflowRuntime.register_activity(verify_inventory_activity) - workflowRuntime.register_activity(process_payment_activity) - workflowRuntime.register_activity(update_inventory_activity) - workflowRuntime.start() - - print("==========Begin the purchase of item:==========", flush=True) - item_name = default_item_name - order_quantity = 10 +```java +package io.dapr.quickstarts.workflows; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.workflows.client.DaprWorkflowClient; + +public class WorkflowConsoleApp { + + private static final String STATE_STORE_NAME = "statestore-actors"; + + // ... + public static void main(String[] args) throws Exception { + System.out.println("*** Welcome to the Dapr Workflow console app sample!"); + System.out.println("*** Using this app, you can place orders that start workflows."); + // Wait for the sidecar to become available + Thread.sleep(5 * 1000); + + // Register the OrderProcessingWorkflow and its activities with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class); + builder.registerActivity(NotifyActivity.class); + builder.registerActivity(ProcessPaymentActivity.class); + builder.registerActivity(RequestApprovalActivity.class); + builder.registerActivity(ReserveInventoryActivity.class); + builder.registerActivity(UpdateInventoryActivity.class); + + // Build the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(false); + } - total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost - order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + InventoryItem inventory = prepareInventoryAndOrder(); - # Start Workflow - print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) - start_resp = daprClient.start_workflow(workflow_component=workflow_component, - workflow_name=workflow_name, - input=order) - _id = start_resp.instance_id - - def prompt_for_approval(daprClient: DaprClient): - daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, - event_name="manager_approval", event_data={'approval': True}) + DaprWorkflowClient workflowClient = new DaprWorkflowClient(); + try (workflowClient) { + executeWorkflow(workflowClient, inventory); + } - approval_seeked = False - start_time = datetime.now() - while True: - time_delta = datetime.now() - start_time - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if not state: - print("Workflow not found!") # not expected - elif state.runtime_status == "Completed" or\ - state.runtime_status == "Failed" or\ - state.runtime_status == "Terminated": - print(f'Workflow completed! Result: {state.runtime_status}', flush=True) - break - if time_delta.total_seconds() >= 10: - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if total_cost > 50000 and ( - state.runtime_status != "Completed" or - state.runtime_status != "Failed" or - state.runtime_status != "Terminated" - ) and not approval_seeked: - approval_seeked = True - threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() - - print("Purchase of item is ", state.runtime_status, flush=True) + } + + // Start the workflow runtime, pulling and executing tasks + private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) { + System.out.println("==========Begin the purchase of item:=========="); + String itemName = inventory.getName(); + int orderQuantity = inventory.getQuantity(); + int totalcost = orderQuantity * inventory.getPerItemCost(); + OrderPayload order = new OrderPayload(); + order.setItemName(itemName); + order.setQuantity(orderQuantity); + order.setTotalCost(totalcost); + System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName); + + String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order); + System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n", + instanceId); + + // Check workflow instance start status + try { + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + System.out.printf("workflow instance %s started%n", instanceId); + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId); + return; + } - def restock_inventory(self, daprClient: DaprClient, baseInventory): - for key, item in baseInventory.items(): - print(f'item: {item}') - item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ - "per_item_cost": {item.per_item_cost}}}' - daprClient.save_state("statestore-actors", key, item_str) + // Check workflow instance complete status + try { + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, + Duration.ofSeconds(30), + true); + if (workflowStatus != null) { + System.out.printf("workflow instance %s completed, out is: %s %n", instanceId, + workflowStatus.getSerializedOutput()); + } else { + System.out.printf("workflow instance %s not found%n", instanceId); + } + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId); + } -if __name__ == '__main__': - app = WorkflowConsoleApp() - app.main() + } + + private static InventoryItem prepareInventoryAndOrder() { + // prepare 100 cars in inventory + InventoryItem inventory = new InventoryItem(); + inventory.setName("cars"); + inventory.setPerItemCost(15000); + inventory.setQuantity(100); + DaprClient daprClient = new DaprClientBuilder().build(); + restockInventory(daprClient, inventory); + + // prepare order for 10 cars + InventoryItem order = new InventoryItem(); + order.setName("cars"); + order.setPerItemCost(15000); + order.setQuantity(10); + return order; + } + + private static void restockInventory(DaprClient daprClient, InventoryItem inventory) { + String key = inventory.getName(); + daprClient.saveState(STATE_STORE_NAME, key, inventory).block(); + } +} ``` -#### `order-processor/workflow.py` - -In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). +#### `OrderProcessingWorkflow.java` + +In `OrderProcessingWorkflow.java`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```java +package io.dapr.quickstarts.workflows; +import io.dapr.workflows.Workflow; + +public class OrderProcessingWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String orderId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID(order ID): " + orderId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Received Order: " + order.toString()); + OrderResult orderResult = new OrderResult(); + orderResult.setProcessed(false); + + // Notify the user that an order has come through + Notification notification = new Notification(); + notification.setMessage("Received Order: " + order.toString()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Determine if there is enough of the item available for purchase by checking + // the inventory + InventoryRequest inventoryRequest = new InventoryRequest(); + inventoryRequest.setRequestId(orderId); + inventoryRequest.setItemName(order.getItemName()); + inventoryRequest.setQuantity(order.getQuantity()); + InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + + // If there is insufficient inventory, fail and let the user know + if (!inventoryResult.isSuccess()) { + notification.setMessage("Insufficient inventory for order : " + order.getItemName()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Require orders over a certain threshold to be approved + if (order.getTotalCost() > 5000) { + ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), + order, ApprovalResult.class).await(); + if (approvalResult != ApprovalResult.Approved) { + notification.setMessage("Order " + order.getItemName() + " was not approved."); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + } + + // There is enough inventory available so the user can purchase the item(s). + // Process their payment + PaymentRequest paymentRequest = new PaymentRequest(); + paymentRequest.setRequestId(orderId); + paymentRequest.setItemBeingPurchased(order.getItemName()); + paymentRequest.setQuantity(order.getQuantity()); + paymentRequest.setAmount(order.getTotalCost()); + boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), + paymentRequest, boolean.class).await(); + if (!isOK) { + notification.setMessage("Payment failed for order : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + if (!inventoryResult.isSuccess()) { + // If there is an error updating the inventory, refund the user + // paymentRequest.setAmount(-1 * paymentRequest.getAmount()); + // ctx.callActivity(ProcessPaymentActivity.class.getName(), + // paymentRequest).await(); + + // Let users know their payment processing failed + notification.setMessage("Order failed to update inventory! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Let user know their order was processed + notification.setMessage("Order completed! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Complete the workflow with order result is processed + orderResult.setProcessed(true); + ctx.complete(orderResult); + }; + } + +} +``` -```python - def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): - """Defines the order processing workflow. - When the order is received, the inventory is checked to see if there is enough inventory to - fulfill the order. If there is enough inventory, the payment is processed and the inventory is - updated. If there is not enough inventory, the order is rejected. - If the total order is greater than $50,000, the order is sent to a manager for approval. - """ - order_id = ctx.instance_id - order_payload=json.loads(order_payload_str) - yield ctx.call_activity(notify_activity, - input=Notification(message=('Received order ' +order_id+ ' for ' - +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' - +' at $'+f'{order_payload["total_cost"]}' +' !'))) - result = yield ctx.call_activity(verify_inventory_activity, - input=InventoryRequest(request_id=order_id, - item_name=order_payload["item_name"], - quantity=order_payload["quantity"])) - if not result.success: - yield ctx.call_activity(notify_activity, - input=Notification(message='Insufficient inventory for ' - +f'{order_payload["item_name"]}'+'!')) - return OrderResult(processed=False) - - if order_payload["total_cost"] > 50000: - yield ctx.call_activity(requst_approval_activity, input=order_payload) - approval_task = ctx.wait_for_external_event("manager_approval") - timeout_event = ctx.create_timer(timedelta(seconds=200)) - winner = yield when_any([approval_task, timeout_event]) - if winner == timeout_event: - yield ctx.call_activity(notify_activity, - input=Notification(message='Payment for order '+order_id - +' has been cancelled due to timeout!')) - return OrderResult(processed=False) - approval_result = yield approval_task - if approval_result["approval"]: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been approved!')) - else: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been rejected!')) - return OrderResult(processed=False) - - yield ctx.call_activity(process_payment_activity, input=PaymentRequest( - request_id=order_id, item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], quantity=order_payload["quantity"])) +#### `activities` directory - try: - yield ctx.call_activity(update_inventory_activity, - input=PaymentRequest(request_id=order_id, - item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], - quantity=order_payload["quantity"])) - except Exception: - yield ctx.call_activity(notify_activity, - input=Notification(message=f'Order {order_id} Failed!')) - return OrderResult(processed=False) +The `Activities` directory holds the four workflow activities used by the workflow, defined in the following files: +- [`NotifyActivity.java`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java) +- [`RequestApprovalActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java) +- [`ReserveInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java) +- [`ProcessPaymentActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java) +- [`UpdateInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java) - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Order {order_id} has completed!')) - return OrderResult(processed=True) -``` {{% /codetab %}} - {{< /tabs >}} -## Watch the demo - -Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): - - - - ## Tell us what you think! We're continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement? @@ -515,4 +865,4 @@ Join the discussion in our [discord channel](https://discord.com/channels/778680 - Walk through a more in-depth [.NET SDK example workflow](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - Learn more about [Workflow as a Dapr building block]({{< ref workflow-overview >}}) -{{< button text="Explore Dapr tutorials >>" page="getting-started/tutorials/_index.md" >}} +{{< button text="Explore Dapr tutorials >>" page="getting-started/tutorials/_index.md" >}} \ No newline at end of file diff --git a/daprdocs/content/en/operations/support/alpha-apis.md b/daprdocs/content/en/operations/support/alpha-beta-apis.md similarity index 76% rename from daprdocs/content/en/operations/support/alpha-apis.md rename to daprdocs/content/en/operations/support/alpha-beta-apis.md index 3e2e26354ee..c35d4fc4240 100644 --- a/daprdocs/content/en/operations/support/alpha-apis.md +++ b/daprdocs/content/en/operations/support/alpha-beta-apis.md @@ -1,16 +1,27 @@ --- type: docs -title: "Alpha APIs" -linkTitle: "Alpha APIs" +title: "Alpha and Beta APIs" +linkTitle: "Alpha & Beta APIs" weight: 5000 -description: "List of current alpha APIs" +description: "List of current alpha and beta APIs" --- +## Alpha APIs + | Building block/API | gRPC | HTTP | Description | Documentation | Version introduced | | ------------------ | ---- | ---- | ----------- | ------------- | ------------------ | | Query State | [Query State proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L44) | `v1.0-alpha1/state/statestore/query` | The state query API enables you to retrieve, filter, and sort the key/value data stored in state store components. | [Query State API]({{< ref "howto-state-query-api.md" >}}) | v1.5 | | Distributed Lock | [Lock proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L112) | `/v1.0-alpha1/lock` | The distributed lock API enables you to take a lock on a resource. | [Distributed Lock API]({{< ref "distributed-lock-api-overview.md" >}}) | v1.8 | -| Workflow | [Workflow proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L151) | `/v1.0-alpha1/workflow` | The workflow API enables you to define long running, persistent processes or data flows. | [Workflow API]({{< ref "workflow-overview.md" >}}) | v1.10 | | Bulk Publish | [Bulk publish proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L59) | `v1.0-alpha1/publish/bulk` | The bulk publish API allows you to publish multiple messages to a topic in a single request. | [Bulk Publish and Subscribe API]({{< ref "pubsub-bulk.md" >}}) | v1.10 | | Bulk Subscribe | [Bulk subscribe proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/appcallback.proto#L57) | N/A | The bulk subscribe application callback receives multiple messages from a topic in a single call. | [Bulk Publish and Subscribe API]({{< ref "pubsub-bulk.md" >}}) | v1.10 | | Cryptography | [Crypto proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L118) | `v1.0-alpha1/crypto` | The cryptography API enables you to perform **high level** cryptography operations for encrypting and decrypting messages. | [Cryptography API]({{< ref "cryptography-overview.md" >}}) | v1.11 | + +## Beta APIs + +| Building block/API | gRPC | HTTP | Description | Documentation | Version introduced | +| ------------------ | ---- | ---- | ----------- | ------------- | ------------------ | +| Workflow | [Workflow proto](https://github.com/dapr/dapr/blob/5aba3c9aa4ea9b3f388df125f9c66495b43c5c9e/dapr/proto/runtime/v1/dapr.proto#L151) | `/v1.0-beta1/workflow` | The workflow API enables you to define long running, persistent processes or data flows. | [Workflow API]({{< ref "workflow-overview.md" >}}) | v1.10 | + +## Related links + +[Learn more about the Alpha, Beta, and Stable lifecycle stages.]({{< ref "certification-lifecycle.md#certification-levels" >}}) \ No newline at end of file diff --git a/daprdocs/content/en/reference/api/workflow_api.md b/daprdocs/content/en/reference/api/workflow_api.md index 80814852e0d..9f9c34de81a 100644 --- a/daprdocs/content/en/reference/api/workflow_api.md +++ b/daprdocs/content/en/reference/api/workflow_api.md @@ -6,6 +6,10 @@ description: "Detailed documentation on the workflow API" weight: 900 --- +{{% alert title="Note" color="primary" %}} +Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "workflow-overview.md#limitations" >}}). +{{% /alert %}} + Dapr provides users with the ability to interact with workflows and comes with a built-in `dapr` component. ## Start workflow request @@ -13,7 +17,7 @@ Dapr provides users with the ability to interact with workflows and comes with a Start a workflow instance with the given name and optionally, an instance ID. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///start[?instanceID=] +POST http://localhost:3500/v1.0-beta1/workflows///start[?instanceID=] ``` Note that workflow instance IDs can only contain alphanumeric characters, underscores, and dashes. @@ -53,7 +57,7 @@ The API call will provide a response similar to this: Terminate a running workflow instance with the given name and instance ID. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///terminate +POST http://localhost:3500/v1.0-beta1/workflows///terminate ``` ### URL parameters @@ -80,7 +84,7 @@ This API does not return any content. For workflow components that support subscribing to external events, such as the Dapr Workflow engine, you can use the following "raise event" API to deliver a named event to a specific workflow instance. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///raiseEvent/ +POST http://localhost:3500/v1.0-beta1/workflows///raiseEvent/ ``` {{% alert title="Note" color="primary" %}} @@ -113,7 +117,7 @@ None. Pause a running workflow instance. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///pause +POST http://localhost:3500/v1.0-beta1/workflows///pause ``` ### URL parameters @@ -140,7 +144,7 @@ None. Resume a paused workflow instance. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///resume +POST http://localhost:3500/v1.0-beta1/workflows///resume ``` ### URL parameters @@ -167,7 +171,7 @@ None. Purge the workflow state from your state store with the workflow's instance ID. ``` -POST http://localhost:3500/v1.0-alpha1/workflows///purge +POST http://localhost:3500/v1.0-beta1/workflows///purge ``` ### URL parameters @@ -194,7 +198,7 @@ None. Get information about a given workflow instance. ``` -GET http://localhost:3500/v1.0-alpha1/workflows// +GET http://localhost:3500/v1.0-beta1/workflows// ``` ### URL parameters