Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Samples for message passing docs #133

Merged
merged 7 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ Some examples require extra dependencies. See each sample's directory for specif
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [message-passing introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [safe_message_handlers](updates_and_signals/safe_message_handlers/) - Safely handling updates and signals.
* [safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
Expand Down
File renamed without changes.
18 changes: 18 additions & 0 deletions message_passing/introduction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Introduction to message-passing

This sample provides an introduction to using Query, Signal, and Update.

See https://docs.temporal.io/develop/python/message-passing.

To run, first see the main [README.md](../../README.md) for prerequisites.

Then create two terminals and `cd` to this directory.

Run the worker in one terminal:

poetry run python worker.py

And execute the workflow in the other terminal:

poetry run python starter.py

13 changes: 13 additions & 0 deletions message_passing/introduction/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import IntEnum

TASK_QUEUE = "message-passing-introduction-task-queue"


class Language(IntEnum):
ARABIC = 1
CHINESE = 2
ENGLISH = 3
FRENCH = 4
HINDI = 5
PORTUGUESE = 6
SPANISH = 7
25 changes: 25 additions & 0 deletions message_passing/introduction/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio
from typing import Optional

from temporalio import activity

from message_passing.introduction import Language


@activity.defn
async def call_greeting_service(to_language: Language) -> Optional[str]:
"""
An Activity that simulates a call to a remote greeting service.
The remote greeting service supports the full range of languages.
"""
greetings = {
Language.ARABIC: "مرحبا بالعالم",
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
Language.FRENCH: "Bonjour, monde",
Language.HINDI: "नमस्ते दुनिया",
Language.PORTUGUESE: "Olá mundo",
Language.SPANISH: "¡Hola mundo",
}
await asyncio.sleep(0.2) # Simulate a network call
return greetings.get(to_language)
52 changes: 52 additions & 0 deletions message_passing/introduction/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
from typing import Optional

from temporalio.client import Client, WorkflowUpdateStage

from message_passing.introduction import TASK_QUEUE
from message_passing.introduction.workflows import (
ApproveInput,
GetLanguagesInput,
GreetingWorkflow,
Language,
)


async def main(client: Optional[Client] = None):
client = client or await Client.connect("localhost:7233")
wf_handle = await client.start_workflow(
GreetingWorkflow.run,
id="greeting-workflow-1234",
task_queue=TASK_QUEUE,
)

# 👉 Send a Query
supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
)
print(f"supported languages: {supported_languages}")

# 👉 Execute an Update
previous_language = await wf_handle.execute_update(
GreetingWorkflow.set_language, Language.CHINESE
)
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Start an Update and then wait for it to complete
update_handle = await wf_handle.start_update(
GreetingWorkflow.set_language_using_activity,
Language.ARABIC,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
previous_language = await update_handle.result()
current_language = await wf_handle.query(GreetingWorkflow.get_language)
print(f"language changed: {previous_language.name} -> {current_language.name}")

# 👉 Send a Signal
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
print(await wf_handle.result())


if __name__ == "__main__":
asyncio.run(main())
36 changes: 36 additions & 0 deletions message_passing/introduction/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from message_passing.introduction import TASK_QUEUE
from message_passing.introduction.activities import call_greeting_service
from message_passing.introduction.workflows import GreetingWorkflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[GreetingWorkflow],
activities=[call_greeting_service],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
115 changes: 115 additions & 0 deletions message_passing/introduction/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from typing import List, Optional

from temporalio import workflow
from temporalio.exceptions import ApplicationError

with workflow.unsafe.imports_passed_through():
from message_passing.introduction import Language
from message_passing.introduction.activities import call_greeting_service


@dataclass
class GetLanguagesInput:
include_unsupported: bool


@dataclass
class ApproveInput:
name: str


@workflow.defn
class GreetingWorkflow:
"""
A workflow that that returns a greeting in one of two languages.

It supports a Query to obtain the current language, an Update to change the
current language and receive the previous language in response, and a Signal
to approve the Workflow so that it is allowed to return its result.
"""

# 👉 This Workflow does not use any async handlers and so cannot use any
# Activities. It only supports two languages, whose greetings are hardcoded
# in the Workflow definition. See GreetingWorkflowWithAsyncHandler below for
# a Workflow that uses an async Update handler to call an Activity.

def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.greetings = {
Language.CHINESE: "你好,世界",
Language.ENGLISH: "Hello, world",
}
self.language = Language.ENGLISH
self.lock = asyncio.Lock() # used by the async handler below

@workflow.run
async def run(self) -> str:
# 👉 In addition to waiting for the `approve` Signal, we also wait for
# all handlers to finish. Otherwise, the Workflow might return its
# result while an async set_language_using_activity Update is in
# progress.
await workflow.wait_condition(
lambda: self.approved_for_release and workflow.all_handlers_finished()
)
return self.greetings[self.language]

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> List[Language]:
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.include_unsupported:
return sorted(Language)
else:
return sorted(self.greetings)

@workflow.signal
def approve(self, input: ApproveInput) -> None:
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
self.approved_for_release = True
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
# 👉 An Update handler can mutate the Workflow state and return a value.
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in self.greetings:
# 👉 In an Update validator you raise any exception to reject the Update.
raise ValueError(f"{language.name} is not supported")

@workflow.update
async def set_language_using_activity(self, language: Language) -> Language:
# 👉 This update handler is async, so it can execute an activity.
if language not in self.greetings:
# 👉 We use a lock so that, if this handler is executed multiple
# times, each execution can schedule the activity only when the
# previously scheduled activity has completed. This ensures that
# multiple calls to set_language are processed in order.
async with self.lock:
greeting = await workflow.execute_activity(
call_greeting_service,
language,
start_to_close_timeout=timedelta(seconds=10),
)
if greeting is None:
# 👉 An update validator cannot be async, so cannot be used
# to check that the remote call_greeting_service supports
# the requested language. Raising ApplicationError will fail
# the Update, but the WorkflowExecutionUpdateAccepted event
# will still be added to history.
raise ApplicationError(
f"Greeting service does not support {language.name}"
)
self.greetings[language] = greeting
previous_language, self.language = self.language, language
return previous_language

@workflow.query
def get_language(self) -> Language:
return self.language
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from temporalio import common
from temporalio.client import Client, WorkflowHandle

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from updates_and_signals.safe_message_handlers.workflow import (
from message_passing.safe_message_handlers.workflow import (
ClusterManagerWorkflow,
assign_nodes_to_job,
find_bad_nodes,
Expand All @@ -15,7 +15,6 @@


async def main():
# Connect client
client = await Client.connect("localhost:7233")

async with Worker(
Expand All @@ -24,7 +23,6 @@ async def main():
workflows=[ClusterManagerWorkflow],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
# Wait until interrupted
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from temporalio.common import RetryPolicy
from temporalio.exceptions import ApplicationError

from updates_and_signals.safe_message_handlers.activities import (
from message_passing.safe_message_handlers.activities import (
AssignNodesToJobInput,
FindBadNodesInput,
UnassignNodesForJobInput,
Expand Down
Loading