diff --git a/docs/3.0rc/develop/state-hooks.mdx b/docs/3.0rc/develop/state-hooks.mdx index 3f28cc037281..533c5aa1227f 100644 --- a/docs/3.0rc/develop/state-hooks.mdx +++ b/docs/3.0rc/develop/state-hooks.mdx @@ -165,36 +165,24 @@ Here's how to create a hook that deletes a Cloud Run job if the flow run crashes ```python import os from prefect import flow, task -from prefect.variables import Variable -from prefect.client.orchestration import get_client -import prefect.runtime -async def delete_cloud_run_job(flow, flow_run, state): - """Flow run state change hook that deletes a Cloud Run Job if - the flow run crashes.""" +def delete_cloud_run_job(flow, flow_run, state): + """hook that deletes the Cloud Run Job associated with the flow run.""" - # retrieve Cloud Run job name - cloud_run_job_name = await Variable.get( - name="crashing-flow_cloud_run_job" - ) - - # delete Cloud Run job - delete_job_command = f"yes | gcloud beta run jobs delete - {cloud_run_job_name.value} --region us-central1" - os.system(delete_job_command) + cloud_run_job_name = flow_run.name + delete_cloud_run_job_command = ( + "yes | gcloud beta run jobs delete" + f"{cloud_run_job_name} --region us-central1" + ) + os.system(delete_cloud_run_job_command) @task def my_task_that_crashes(): - raise SystemExit("Crashing on purpose!") + raise SystemExit("Crashing out! 💥") @flow(on_crashed=[delete_cloud_run_job]) def crashing_flow(): - """Save the flow run name (i.e. Cloud Run job name) as a - Variable. It then executes a task that ends up crashing.""" - flow_run_name = prefect.runtime.flow_run.name - cloud_run_job_name = Variable.set(name="crashing_flow_cloud_run_job", value=flow_run_name, overwrite=True) - my_task_that_crashes() if __name__ == "__main__":