From 2a95a35412a841f9eccc65462a8c44a6f86e9a2f Mon Sep 17 00:00:00 2001 From: "promptless[bot]" <179508745+promptless[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 20:53:39 +0000 Subject: [PATCH] Docs update (c396645) --- docs/3.0/develop/write-flows.mdx | 162 +++++++++--------- docs/3.0/resources/upgrade-to-prefect-3.mdx | 63 ++----- .../integrations/prefect-dask/usage_guide.mdx | 65 ++++--- 3 files changed, 122 insertions(+), 168 deletions(-) diff --git a/docs/3.0/develop/write-flows.mdx b/docs/3.0/develop/write-flows.mdx index d61574683d8b..360f2cbaaa20 100644 --- a/docs/3.0/develop/write-flows.mdx +++ b/docs/3.0/develop/write-flows.mdx @@ -16,10 +16,10 @@ from prefect import flow @flow def my_flow() -> str: - return "Hello, world!" +return "Hello, world!" if __name__ == "__main__": - print(my_flow()) +print(my_flow()) ``` When a function becomes a flow, it gains the following capabilities: @@ -40,7 +40,7 @@ from prefect import flow @flow(name="My Flow") def my_flow() -> str: - return "Hello, world!" +return "Hello, world!" ``` If you don't provide a name, Prefect uses the flow function name. @@ -69,17 +69,17 @@ from prefect import flow @flow def get_repo_info(): - url = "https://api.github.com/repos/PrefectHQ/prefect" - response = httpx.get(url) - response.raise_for_status() - repo = response.json() - print("PrefectHQ/prefect repository statistics 🤓:") - print(f"Stars 🌠 : {repo['stargazers_count']}") - print(f"Forks 🍴 : {repo['forks_count']}") +url = "https://api.github.com/repos/PrefectHQ/prefect" +response = httpx.get(url) +response.raise_for_status() +repo = response.json() +print("PrefectHQ/prefect repository statistics 🤓:") +print(f"Stars 🌠 : {repo['stargazers_count']}") +print(f"Forks 🍴 : {repo['forks_count']}") if __name__ == "__main__": - get_repo_info() +get_repo_info() ``` Running this script results in the following output: @@ -106,8 +106,8 @@ For example, to automatically convert an argument to a datetime object: ```python from datetime import ( - datetime, - timezone, +datetime, +timezone, ) from typing import Optional @@ -116,13 +116,13 @@ from prefect import flow @flow def what_day_is_it(date: Optional[datetime] = None): - if date is None: - date = datetime.now(timezone.utc) - print(f"It was {date.strftime('%A')} on {date.isoformat()}") +if date is None: +date = datetime.now(timezone.utc) +print(f"It was {date.strftime('%A')} on {date.isoformat()}") if __name__ == "__main__": - what_day_is_it("2021-01-01T02:00:19.180906") +what_day_is_it("2021-01-01T02:00:19.180906") ``` When you run this flow, you'll see the following output: @@ -140,22 +140,22 @@ from pydantic import BaseModel class Model(BaseModel): - a: int - b: str +a: int +b: str @flow def flow_that_validates_parameters(model: Model): ... if __name__ == "__main__": - flow_that_validates_parameters( - model={"a": "WRONG", "b": "fine"} - ) +flow_that_validates_parameters( +model={"a": "WRONG", "b": "fine"} +) ``` This flow run will fail with the following error: ``` Flow run received invalid parameters: - - model.a: Input should be a valid integer, unable to parse string as an integer +- model.a: Input should be a valid integer, unable to parse string as an integer ``` Note that you can provide parameter values to a flow through the API using a [deployment](/3.0/deploy/). @@ -185,12 +185,12 @@ from prefect import flow, task @task def print_hello(name): - print(f"Hello {name}!") +print(f"Hello {name}!") @flow(name="Hello Flow") def hello_world(name="world"): - print_hello(name) +print_hello(name) ``` A single flow function can contain all of your workflow's code. @@ -240,7 +240,7 @@ from prefect import flow, task @flow(name="Nestedflow") def my_nested_flow(msg): - print(f"Nestedflow says: {msg}") +print(f"Nestedflow says: {msg}") ``` Here's a parent flow that imports and uses `my_nested_flow` as a nested flow: @@ -252,19 +252,19 @@ from myproject.flows import my_nested_flow @task(name="Print Hello") def print_hello(name): - msg = f"Hello {name}!" - print(msg) - return msg +msg = f"Hello {name}!" +print(msg) +return msg @flow(name="Hello Flow") def hello_world(name="world"): - message = print_hello(name) - my_nested_flow(message) +message = print_hello(name) +my_nested_flow(message) if __name__=="__main__": - hello_world("Marvin") +hello_world("Marvin") ``` Running the `hello_world()` flow creates a flow run like this: @@ -282,6 +282,7 @@ Nestedflow says: Hello Marvin! 08:24:08.296 | INFO | Flow run 'sage-mongoose' - Finished in state Completed() ``` +Here are some scenarios where you might want to define a nested flow rather than call tasks individually: Here are some scenarios where you might want to define a nested flow rather than call tasks individually: - Observability: Nested flows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You'll @@ -294,7 +295,6 @@ cases by simply passing different parameters to the nested flow in which they ru - Task runners: Nested flows enable you to specify the task runner used for tasks within the flow. For example, to optimize parallel execution of certain tasks with Dask, group them in a nested flow that uses the Dask task runner. You can use a different task runner for each nested flow. - ## Supported functions Almost any standard Python function can be turned into a Prefect flow by adding the `@flow` decorator. @@ -351,13 +351,11 @@ class MyClass: def my_instance_method(self): pass - @classmethod @flow def my_class_method(cls): pass - @staticmethod @flow def my_static_method(): @@ -441,8 +439,6 @@ generator = next(f()) list(generator()) # prints 'Generator consumed!' ``` - - ## Flow runs A _flow run_ is a single execution of a flow. @@ -475,13 +471,13 @@ from prefect import flow @flow( - name="My Flow", description="My flow with a name and description", log_prints=True) +name="My Flow", description="My flow with a name and description", log_prints=True) def my_flow(): - print("Hello, I'm a flow") +print("Hello, I'm a flow") if __name__ == "__main__": - my_flow() +my_flow() ``` If no description is provided, a flow function's docstring is used as the description. @@ -497,12 +493,12 @@ from prefect import flow @flow(flow_run_name="{name}-on-{date:%A}") def my_flow(name: str, date: datetime.datetime): - pass +pass # creates a flow run called 'marvin-on-Thursday' if __name__ == "__main__": - my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc)) +my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc)) ``` This setting also accepts a function that returns a string for the flow run name: @@ -513,18 +509,18 @@ from prefect import flow def generate_flow_run_name(): - date = datetime.datetime.now(datetime.timezone.utc) - return f"{date:%A}-is-a-nice-day" +date = datetime.datetime.now(datetime.timezone.utc) +return f"{date:%A}-is-a-nice-day" @flow(flow_run_name=generate_flow_run_name) def my_flow(name: str): - pass +pass # creates a flow run named 'Thursday-is-a-nice-day' if __name__ == "__main__": - my_flow(name="marvin") +my_flow(name="marvin") ``` If you need access to information about the flow, use the `prefect.runtime` module. For example: @@ -535,23 +531,23 @@ from prefect.runtime import flow_run def generate_flow_run_name(): - flow_name = flow_run.flow_name +flow_name = flow_run.flow_name - parameters = flow_run.parameters - name = parameters["name"] - limit = parameters["limit"] +parameters = flow_run.parameters +name = parameters["name"] +limit = parameters["limit"] - return f"{flow_name}-with-{name}-and-{limit}" +return f"{flow_name}-with-{name}-and-{limit}" @flow(flow_run_name=generate_flow_run_name) def my_flow(name: str, limit: int = 100): - pass +pass # creates a flow run named 'my-flow-with-marvin-and-100' if __name__ == "__main__": - my_flow(name="marvin") +my_flow(name="marvin") ``` Note that `validate_parameters` checks that input values conform to the annotated types on the function. @@ -578,24 +574,24 @@ from prefect import flow, task @task def always_fails_task(): - raise ValueError("I fail successfully") +raise ValueError("I fail successfully") @task def always_succeeds_task(): - print("I'm fail safe!") - return "success" +print("I'm fail safe!") +return "success" @flow def always_succeeds_flow(): - x = always_fails_task.submit().result(raise_on_failure=False) - y = always_succeeds_task.submit(wait_for=[x]) - return y +x = always_fails_task.submit().result(raise_on_failure=False) +y = always_succeeds_task.submit(wait_for=[x]) +return y if __name__ == "__main__": - always_succeeds_flow() +always_succeeds_flow() ``` This flow run finishes in a **Completed** final state because the flow returns the future of the task that succeeds: @@ -606,7 +602,7 @@ This flow run finishes in a **Completed** final state because the flow returns t 18:35:25.205 | INFO | Flow run 'whispering-guan' - Submitted task run 'always_fails_task-96e4be14-0' for execution. 18:35:25.232 | ERROR | Task run 'always_fails_task-96e4be14-0' - Encountered exception during execution: Traceback (most recent call last): - ... +... ValueError: I fail successfully 18:35:25.265 | ERROR | Task run 'always_fails_task-96e4be14-0' - Finished in state Failed('Task run encountered an exception.') 18:35:25.289 | INFO | Flow run 'whispering-guan' - Created task run 'always_succeeds_task-9c27db32-0' for task 'always_succeeds_task' @@ -627,25 +623,25 @@ from prefect import task, flow @task def always_fails_task(): - raise ValueError("I am bad task") +raise ValueError("I am bad task") @task def always_succeeds_task(): - return "foo" +return "foo" @flow def always_succeeds_flow(): - return "bar" +return "bar" @flow def always_fails_flow(): - x = always_fails_task() - y = always_succeeds_task() - z = always_succeeds_flow() - return x, y, z +x = always_fails_task() +y = always_succeeds_task() +z = always_succeeds_flow() +return x, y, z ``` Running `always_fails_flow` fails because one of the three returned futures fails. @@ -671,27 +667,27 @@ from prefect.states import Completed, Failed @task def always_fails_task(): - raise ValueError("I fail successfully") +raise ValueError("I fail successfully") @task def always_succeeds_task(): - print("I'm fail safe!") - return "success" +print("I'm fail safe!") +return "success" @flow def always_succeeds_flow(): - x = always_fails_task.submit() - y = always_succeeds_task.submit() - if y.result() == "success": - return Completed(message="I am happy with this result") - else: - return Failed(message="How did this happen!?") +x = always_fails_task.submit() +y = always_succeeds_task.submit() +if y.result() == "success": +return Completed(message="I am happy with this result") +else: +return Failed(message="How did this happen!?") if __name__ == "__main__": - always_succeeds_flow() +always_succeeds_flow() ``` Running this flow produces the following result: @@ -720,14 +716,14 @@ from prefect.states import Completed @flow def my_flow(work_to_do: bool): - if not work_to_do: - return Completed(message="No work to do 💤", name="Skipped") - else: - return Completed(message="Work was done 💪") +if not work_to_do: +return Completed(message="No work to do 💤", name="Skipped") +else: +return Completed(message="Work was done 💪") if __name__ == "__main__": - my_flow(work_to_do=False) +my_flow(work_to_do=False) ``` Running this flow produces the following result: diff --git a/docs/3.0/resources/upgrade-to-prefect-3.mdx b/docs/3.0/resources/upgrade-to-prefect-3.mdx index 9db6d2b92d87..ae3282d940e5 100644 --- a/docs/3.0/resources/upgrade-to-prefect-3.mdx +++ b/docs/3.0/resources/upgrade-to-prefect-3.mdx @@ -152,9 +152,6 @@ print(my_flow()) # Output: Failed(message='Flow failed due to task failure') Choose the strategy that best fits your specific use case and error handling requirements. - ------ - ### Futures interface @@ -193,24 +190,24 @@ For example, `Block`'s `load` method is asynchronous in an async context: from prefect.blocks.system import Secret async def my_async_function(): - my_secret = Secret.load("my-secret") - print(my_secret.get()) # AttributeError: 'coroutine' object has no attribute 'get' +my_secret = Secret.load("my-secret") +print(my_secret.get()) # AttributeError: 'coroutine' object has no attribute 'get' ``` ```python Correct from prefect.blocks.system import Secret async def my_async_function(): - my_secret = await Secret.load("my-secret") - print(my_secret.get()) # This will work +my_secret = await Secret.load("my-secret") +print(my_secret.get()) # This will work ``` ```python Also Correct from prefect.blocks.system import Secret async def my_async_function(): - my_secret = Secret.load("my-secret", _sync=True) - print(my_secret.get()) # This will work +my_secret = Secret.load("my-secret", _sync=True) +print(my_secret.get()) # This will work ``` @@ -239,11 +236,11 @@ from prefect import flow, task @task async def my_task(): - pass +pass @flow async def my_flow(): - future = await my_task.submit() # TypeError: object PrefectConcurrentFuture can't be used in 'await' expression +future = await my_task.submit() # TypeError: object PrefectConcurrentFuture can't be used in 'await' expression ``` ```python Correct @@ -251,50 +248,12 @@ from prefect import flow, task @task async def my_task(): - pass +pass @flow async def my_flow(): - future = my_task.submit() # This will work +future = my_task.submit() # This will work ``` -See the [Futures interface section](#futures-interface) for more information on this particular gotcha. - -#### `TypeError: Flow.deploy() got an unexpected keyword argument 'schedule'` - -In Prefect 3.0, the `schedule` argument has been removed in favor of the `schedules` argument. - -This applies to both the `Flow.serve` and `Flow.deploy` methods. - - -```python Prefect 2.0 {11} -from datetime import timedelta -from prefect import flow -from prefect.client.schemas.schedules import IntervalSchedule - -@flow -def my_flow(): - pass - -my_flow.serve( - name="my-flow", - schedule=IntervalSchedule(interval=timedelta(minutes=1)) -) -``` - -```python Prefect 3.0 {11} -from datetime import timedelta -from prefect import flow -from prefect.client.schemas.schedules import IntervalSchedule - -@flow -def my_flow(): - pass - -my_flow.serve( - name="my-flow", - schedules=[IntervalSchedule(interval=timedelta(minutes=1))] -) -``` - +See the [Futures interface section](#futures-interface) for more information on this particular gotcha. \ No newline at end of file diff --git a/docs/integrations/prefect-dask/usage_guide.mdx b/docs/integrations/prefect-dask/usage_guide.mdx index d26fa594afd5..35e4bf78589a 100644 --- a/docs/integrations/prefect-dask/usage_guide.mdx +++ b/docs/integrations/prefect-dask/usage_guide.mdx @@ -20,16 +20,16 @@ from prefect_dask import DaskTaskRunner @task def shout(number): - time.sleep(0.5) - print(f"#{number}") +time.sleep(0.5) +print(f"#{number}") @flow(task_runner=DaskTaskRunner) def count_to(highest_number): - for number in range(highest_number): - shout.submit(number) +for number in range(highest_number): +shout.submit(number) if __name__ == "__main__": - count_to(10) +count_to(10) # outputs #3 @@ -60,7 +60,7 @@ from prefect_dask.task_runners import DaskTaskRunner @flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster")) def my_flow(): - ... +... ``` `DaskTaskRunner` accepts the following optional parameters: @@ -88,7 +88,7 @@ from prefect_dask import DaskTaskRunner # Use 4 worker processes, each with 2 threads DaskTaskRunner( - cluster_kwargs={"n_workers": 4, "threads_per_worker": 2} +cluster_kwargs={"n_workers": 4, "threads_per_worker": 2} ) ``` @@ -106,7 +106,7 @@ def compute_task(): with get_dask_client() as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = df.describe().compute() - return summary_df + return summary_df @flow(task_runner=DaskTaskRunner()) def dask_flow(): @@ -155,7 +155,7 @@ async def compute_task(): async with get_async_dask_client() as client: df = dask.datasets.timeseries("2000", "2001", partition_freq="4w") summary_df = await client.compute(df.describe()) - return summary_df + return summary_df @flow(task_runner=DaskTaskRunner()) async def dask_flow(): @@ -174,7 +174,6 @@ However, you must `await client.compute(dask_collection)` before exiting the con To invoke `compute` from the Dask collection, set `sync=False` and call `result()` before exiting out of the context manager: `await dask_collection.compute(sync=False)`. - ## Use a temporary cluster The `DaskTaskRunner` is capable of creating a temporary cluster using any of [Dask's cluster-manager options](https://docs.dask.org/en/latest/setup.html). This can be useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling. @@ -191,8 +190,8 @@ For example, to configure a flow to use a temporary `dask_cloudprovider.aws.Farg ```python DaskTaskRunner( - cluster_class="dask_cloudprovider.aws.FargateCluster", - cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"}, +cluster_class="dask_cloudprovider.aws.FargateCluster", +cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"}, ) ``` @@ -225,8 +224,8 @@ For example, here we configure a flow to run on a `FargateCluster` scaling up to ```python DaskTaskRunner( - cluster_class="dask_cloudprovider.aws.FargateCluster", - adapt_kwargs={"maximum": 10} +cluster_class="dask_cloudprovider.aws.FargateCluster", +adapt_kwargs={"maximum": 10} ) ``` @@ -243,16 +242,16 @@ from prefect_dask.task_runners import DaskTaskRunner @task def show(x): - print(x) +print(x) @flow(task_runner=DaskTaskRunner()) def my_flow(): - with dask.annotate(priority=-10): - future = show(1) # low priority task +with dask.annotate(priority=-10): +future = show(1) # low priority task - with dask.annotate(priority=10): - future = show(2) # high priority task +with dask.annotate(priority=10): +future = show(2) # high priority task ``` Another common use case is [resource](http://distributed.dask.org/en/stable/resources.html) annotations: @@ -264,25 +263,25 @@ from prefect_dask.task_runners import DaskTaskRunner @task def show(x): - print(x) +print(x) # Create a `LocalCluster` with some resource annotations # Annotations are abstract in dask and not inferred from your system. # Here, we claim that our system has 1 GPU and 1 process available per worker @flow( - task_runner=DaskTaskRunner( - cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}} - ) +task_runner=DaskTaskRunner( +cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}} +) ) def my_flow(): - with dask.annotate(resources={'GPU': 1}): - future = show(0) # this task requires 1 GPU resource on a worker - - with dask.annotate(resources={'process': 1}): - # These tasks each require 1 process on a worker; because we've - # specified that our cluster has 1 process per worker and 1 worker, - # these tasks will run sequentially - future = show(1) - future = show(2) - future = show(3) +with dask.annotate(resources={'GPU': 1}): +future = show(0) # this task requires 1 GPU resource on a worker + +with dask.annotate(resources={'process': 1}): +# These tasks each require 1 process on a worker; because we've +# specified that our cluster has 1 process per worker and 1 worker, +# these tasks will run sequentially +future = show(1) +future = show(2) +future = show(3) ```