Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cascade terminate purge #662

Merged
merged 10 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ jobs:
validate:
runs-on: ubuntu-latest
env:
GOVER: 1.20
GOVER: 1.21.4
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }}
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
DAPR_CLI_REF: ''
berndverst marked this conversation as resolved.
Show resolved Hide resolved
DAPR_REF: master
berndverst marked this conversation as resolved.
Show resolved Hide resolved
CHECKOUT_REPO: ${{ github.repository }}
CHECKOUT_REF: ${{ github.ref }}

Expand All @@ -56,7 +56,15 @@ jobs:
echo "CHECKOUT_REF=${{ github.event.client_payload.pull_head_ref }}" >> $GITHUB_ENV
echo "DAPR_REF=master" >> $GITHUB_ENV
fi

- name: Parse workflow dispatch payload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you refactored it to here. That's fine - hope it works :)

if: github.event_name == 'workflow_dispatch'
run: |
if [ ${{ github.event.inputs.daprcli_commit }} != '' ]; then
echo "DAPR_CLI_REF=${{ github.event.inputs.daprcli_commit }}" >> $GITHUB_ENV
fi
if [ ${{ github.event.inputs.daprdapr_commit }} != '' ]; then
echo "DAPR_REF=${{ github.event.inputs.daprdapr_commit }}" >> $GITHUB_ENV
fi
- name: Check out code onto GOPATH
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -88,6 +96,8 @@ jobs:
uses: actions/setup-go@v5
with:
go-version: ${{ env.GOVER }}
env:
GOPATH: ${{ github.workspace }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it was failing without it with error GoPath not set

- name: Checkout Dapr CLI repo to override dapr command.
uses: actions/checkout@v4
if: env.DAPR_CLI_REF != ''
Expand Down
31 changes: 21 additions & 10 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,15 +1285,18 @@ async def get_workflow(self, instance_id: str, workflow_component: str) -> GetWo
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
async def terminate_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
"""Terminates a workflow.

Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.

non_recursive (bool): if true, child workflows will not be terminated,
defaults to false.
Returns:
:class:`DaprResponse` gRPC metadata returned from callee

Expand All @@ -1307,7 +1310,9 @@ async def terminate_workflow(self, instance_id: str, workflow_component: str) ->
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id, workflow_component=workflow_component
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
)

try:
Expand Down Expand Up @@ -1449,14 +1454,18 @@ async def resume_workflow(self, instance_id: str, workflow_component: str) -> Da
except grpc.aio.AioRpcError as err:
raise DaprInternalError(err.details())

async def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
async def purge_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
"""Purges a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be purged,
defaults to false.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1470,7 +1479,9 @@ async def purge_workflow(self, instance_id: str, workflow_component: str) -> Dap
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id, workflow_component=workflow_component
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
)

try:
Expand Down
40 changes: 26 additions & 14 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,14 +1273,18 @@ def get_workflow(self, instance_id: str, workflow_component: str) -> GetWorkflow
except RpcError as err:
raise DaprInternalError(err.details())

def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
def terminate_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
"""Terminates a workflow.

Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
Args:
instance_id (str): the ID of the workflow instance, e.g.
`order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be terminated,
defaults to false.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1295,7 +1299,9 @@ def terminate_workflow(self, instance_id: str, workflow_component: str) -> DaprR
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual terminate workflow invocation
req = api_v1.TerminateWorkflowRequest(
instance_id=instance_id, workflow_component=workflow_component
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
)

try:
Expand Down Expand Up @@ -1438,14 +1444,18 @@ def resume_workflow(self, instance_id: str, workflow_component: str) -> DaprResp
except RpcError as err:
raise DaprInternalError(err.details())

def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprResponse:
def purge_workflow(
self, instance_id: str, workflow_component: str, non_recursive: bool = False
) -> DaprResponse:
"""Purges a workflow.

Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
Args:
instance_id (str): the ID of the workflow instance,
e.g. `order_processing_workflow-103784`.
workflow_component (str): the name of the workflow component
that will run the workflow. e.g. `dapr`.
non_recursive (bool): if true, child workflows will not be purged,
defaults to false.

Returns:
:class:`DaprResponse` gRPC metadata returned from callee
Expand All @@ -1459,7 +1469,9 @@ def purge_workflow(self, instance_id: str, workflow_component: str) -> DaprRespo
validateNotBlankString(instance_id=instance_id, workflow_component=workflow_component)
# Actual purge workflow invocation
req = api_v1.PurgeWorkflowRequest(
instance_id=instance_id, workflow_component=workflow_component
instance_id=instance_id,
workflow_component=workflow_component,
non_recursive=non_recursive,
)

try:
Expand Down
6 changes: 3 additions & 3 deletions dapr/proto/common/v1/common_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HTTPExtension(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _VerbEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[HTTPExtension._Verb.ValueType], builtins.type): # noqa: F821
class _VerbEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[HTTPExtension._Verb.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
NONE: HTTPExtension._Verb.ValueType # 0
GET: HTTPExtension._Verb.ValueType # 1
Expand Down Expand Up @@ -279,7 +279,7 @@ class StateOptions(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _StateConcurrencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[StateOptions._StateConcurrency.ValueType], builtins.type): # noqa: F821
class _StateConcurrencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[StateOptions._StateConcurrency.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
CONCURRENCY_UNSPECIFIED: StateOptions._StateConcurrency.ValueType # 0
CONCURRENCY_FIRST_WRITE: StateOptions._StateConcurrency.ValueType # 1
Expand All @@ -296,7 +296,7 @@ class StateOptions(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _StateConsistencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[StateOptions._StateConsistency.ValueType], builtins.type): # noqa: F821
class _StateConsistencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[StateOptions._StateConsistency.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
CONSISTENCY_UNSPECIFIED: StateOptions._StateConsistency.ValueType # 0
CONSISTENCY_EVENTUAL: StateOptions._StateConsistency.ValueType # 1
Expand Down
4 changes: 2 additions & 2 deletions dapr/proto/runtime/v1/appcallback_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TopicEventResponse(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _TopicEventResponseStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicEventResponse._TopicEventResponseStatus.ValueType], builtins.type): # noqa: F821
class _TopicEventResponseStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[TopicEventResponse._TopicEventResponseStatus.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
SUCCESS: TopicEventResponse._TopicEventResponseStatus.ValueType # 0
"""SUCCESS is the default behavior: message is acknowledged and not retried or logged."""
Expand Down Expand Up @@ -396,7 +396,7 @@ class BindingEventResponse(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _BindingEventConcurrencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[BindingEventResponse._BindingEventConcurrency.ValueType], builtins.type): # noqa: F821
class _BindingEventConcurrencyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[BindingEventResponse._BindingEventConcurrency.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
SEQUENTIAL: BindingEventResponse._BindingEventConcurrency.ValueType # 0
"""SEQUENTIAL sends data to output bindings specified in "to" sequentially."""
Expand Down
30 changes: 15 additions & 15 deletions dapr/proto/runtime/v1/dapr_pb2.py

Large diffs are not rendered by default.

18 changes: 13 additions & 5 deletions dapr/proto/runtime/v1/dapr_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ class ActorRuntime(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _ActorRuntimeStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[ActorRuntime._ActorRuntimeStatus.ValueType], builtins.type): # noqa: F821
class _ActorRuntimeStatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[ActorRuntime._ActorRuntimeStatus.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
INITIALIZING: ActorRuntime._ActorRuntimeStatus.ValueType # 0
"""Indicates that the actor runtime is still being initialized."""
Expand Down Expand Up @@ -2000,7 +2000,7 @@ class UnlockResponse(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _StatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[UnlockResponse._Status.ValueType], builtins.type): # noqa: F821
class _StatusEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[UnlockResponse._Status.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
SUCCESS: UnlockResponse._Status.ValueType # 0
LOCK_DOES_NOT_EXIST: UnlockResponse._Status.ValueType # 1
Expand Down Expand Up @@ -2034,7 +2034,7 @@ class SubtleGetKeyRequest(google.protobuf.message.Message):
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _KeyFormatEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[SubtleGetKeyRequest._KeyFormat.ValueType], builtins.type): # noqa: F821
class _KeyFormatEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[SubtleGetKeyRequest._KeyFormat.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
PEM: SubtleGetKeyRequest._KeyFormat.ValueType # 0
"""PEM (PKIX) (default)"""
Expand Down Expand Up @@ -2766,17 +2766,21 @@ class TerminateWorkflowRequest(google.protobuf.message.Message):

INSTANCE_ID_FIELD_NUMBER: builtins.int
WORKFLOW_COMPONENT_FIELD_NUMBER: builtins.int
NON_RECURSIVE_FIELD_NUMBER: builtins.int
instance_id: builtins.str
"""ID of the workflow instance to terminate."""
workflow_component: builtins.str
"""Name of the workflow component."""
non_recursive: builtins.bool
"""Indicates whether this is a non_recursive terminate request"""
def __init__(
self,
*,
instance_id: builtins.str = ...,
workflow_component: builtins.str = ...,
non_recursive: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "workflow_component", b"workflow_component"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "non_recursive", b"non_recursive", "workflow_component", b"workflow_component"]) -> None: ...

global___TerminateWorkflowRequest = TerminateWorkflowRequest

Expand Down Expand Up @@ -2862,17 +2866,21 @@ class PurgeWorkflowRequest(google.protobuf.message.Message):

INSTANCE_ID_FIELD_NUMBER: builtins.int
WORKFLOW_COMPONENT_FIELD_NUMBER: builtins.int
NON_RECURSIVE_FIELD_NUMBER: builtins.int
instance_id: builtins.str
"""ID of the workflow instance to purge."""
workflow_component: builtins.str
"""Name of the workflow component."""
non_recursive: builtins.bool
"""Indicates whether this is a non_recursive purge request"""
def __init__(
self,
*,
instance_id: builtins.str = ...,
workflow_component: builtins.str = ...,
non_recursive: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "workflow_component", b"workflow_component"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["instance_id", b"instance_id", "non_recursive", b"non_recursive", "workflow_component", b"workflow_component"]) -> None: ...

global___PurgeWorkflowRequest = PurgeWorkflowRequest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dapr run --app-id orderapp --app-protocol grpc --dapr-grpc-port 50001 --resource
== APP == New counter value is: 1122!

== APP == Get response from hello_world_wf after terminate call: Terminated
== APP == Get response from child_wf after terminate call: Terminated
== APP == Instance Successfully Purged
```

Expand Down
4 changes: 3 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ pyOpenSSL>=23.2.0
# needed for type checking
Flask>=1.1
# needed for auto fix
ruff>=0.1.11
ruff>=0.1.11
# needed for dapr-ext-workflow
durabletask>=0.1.1a1
2 changes: 2 additions & 0 deletions examples/demo_workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ expected_stdout_lines:
- "== APP == New counter value is: 1111!"
- "== APP == Instance Successfully Purged"
- "== APP == Get response from hello_world_wf after terminate call: Terminated"
- "== APP == Get response from child_wf after terminate call: Terminated"
background: true
timeout_seconds: 30
sleep: 15
Expand Down Expand Up @@ -84,4 +85,5 @@ You should be able to see the following output:
== APP == New counter value is: 1111!
== APP == Instance Successfully Purged
== APP == Get response from hello_world_wf after terminate call: Terminated
== APP == Get response from child_wf after terminate call: Terminated
```
Loading
Loading