Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samples for message passing docs #133

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
18 changes: 18 additions & 0 deletions message_passing/introduction/README.md
dandavison marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Introduction to message-passing

This sample provides an introduction to using Query, Signal, and Update.

See https://docs.temporal.io/develop/python/message-passing.

To run, first see the main [README.md](../../README.md) for prerequisites.

Then create two terminals and `cd` to this directory.

Run the worker in one terminal:

poetry run python worker.py

And execute the workflow in the other terminal:

poetry run python starter.py

53 changes: 53 additions & 0 deletions message_passing/introduction/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
from typing import Optional

from temporalio.client import Client, WorkflowUpdateStage

from message_passing.introduction.workflows import (
ApproveInput,
GetLanguagesInput,
GreetingWorkflow,
Language,
)

TASK_QUEUE = "message-passing-introduction-task-queue"


async def main(client: Optional[Client] = None):
client = client or await Client.connect("localhost:7233")
wf_handle = await client.start_workflow(
GreetingWorkflow.run,
id="greeting-workflow-1234",
task_queue=TASK_QUEUE,
)

# 👉 Send a Query
supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
)
print(f"supported languages: {supported_languages}")

# 👉 Execute an Update
previous_language = await wf_handle.execute_update(
GreetingWorkflow.set_language, Language.CHINESE
)
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Start an Update and then wait for it to complete
update_handle = await wf_handle.start_update(
GreetingWorkflow.set_language_using_activity,
Language.ARABIC,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
previous_language = await update_handle.result()
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Send a Signal
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
print(await wf_handle.result())


if __name__ == "__main__":
asyncio.run(main())
37 changes: 37 additions & 0 deletions message_passing/introduction/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.introduction.workflows import (
GreetingWorkflow,
call_greeting_service,
)

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue="message-passing-introduction-task-queue",
workflows=[GreetingWorkflow],
activities=[call_greeting_service],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
141 changes: 141 additions & 0 deletions message_passing/introduction/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import IntEnum
from typing import List, Optional

from temporalio import activity, workflow
from temporalio.exceptions import ApplicationError


class Language(IntEnum):
ARABIC = 1
CHINESE = 2
ENGLISH = 3
FRENCH = 4
HINDI = 5
PORTUGUESE = 6
SPANISH = 7


@dataclass
class GetLanguagesInput:
include_unsupported: bool


@dataclass
class ApproveInput:
name: str


@workflow.defn
class GreetingWorkflow:
"""
A workflow that that returns a greeting in one of two languages.

It supports a Query to obtain the current language, an Update to change the
current language and receive the previous language in response, and a Signal
to approve the Workflow so that it is allowed to return its result.
"""

# 👉 This Workflow does not use any async handlers and so cannot use any
# Activities. It only supports two languages, whose greetings are hardcoded
# in the Workflow definition. See GreetingWorkflowWithAsyncHandler below for
# a Workflow that uses an async Update handler to call an Activity.

def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.greetings = {
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
}
self.language = Language.ENGLISH
self.lock = asyncio.Lock() # used by the async handler below

@workflow.run
async def run(self) -> str:
# 👉 In addition to waiting for the `approve` Signal, we also wait for
# all handlers to finish. Otherwise, the Workflow might return its
# result while an async set_language_using_activity Update is in
# progress.
await workflow.wait_condition(
lambda: self.approved_for_release and workflow.all_handlers_finished()
)
return self.greetings[self.language]

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> List[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return sorted(Language)
else:
return sorted(self.greetings)

@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")

@workflow.update
async def set_language_using_activity(self, language: Language) -> Language:
# 👉 This update handler is async, so it can execute an activity.
if language not in self.greetings:
# 👉 We use a lock so that, if this handler is executed multiple
# times, each execution can schedule the activity only when the
# previously scheduled activity has completed. This ensures that
# multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# 👉 An update validator cannot be async, so cannot be used
# to check that the remote call_greeting_service supports
# the requested language. Raising ApplicationError will fail
# the Update, but the WorkflowExecutionUpdateAccepted event
# will still be added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language

@workflow.query
def get_language(self) -> Language:
return self.language


@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
dandavison marked this conversation as resolved.
Show resolved Hide resolved
"""
An Activity that simulates a call to a remote greeting service.
The remote greeting service supports the full range of languages.
"""
greetings = {
Language.ARABIC: "مرحبا بالعالم",
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
Language.FRENCH: "Bonjour, monde",
Language.HINDI: "नमस्ते दुनिया",
Language.PORTUGUESE: "Olá mundo",
Language.SPANISH: "¡Hola mundo",
}
await asyncio.sleep(0.2) # Simulate a network call
return greetings.get(to_language)
dandavison marked this conversation as resolved.
Show resolved Hide resolved
File renamed without changes.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from temporalio import common
from temporalio.client import Client, WorkflowHandle

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerWorkflow,
assign_nodes_to_job,
find_bad_nodes,
Expand All @@ -15,7 +15,6 @@


async def main():
# Connect client
client = await Client.connect("localhost:7233")

async with Worker(
Expand All @@ -24,7 +23,6 @@ async def main():
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
# Wait until interrupted
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from temporalio.common import RetryPolicy
from temporalio.exceptions import ApplicationError

from updates_and_signals.safe_message_handlers.activities import (
from message_passing.safe_message_handlers.activities import (
AssignNodesToJobInput,
FindBadNodesInput,
UnassignNodesForJobInput,
Expand Down
Loading