From aeebc9660c55d52ba26b9485b45c5378e11fb952 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 31 Oct 2024 13:44:44 -0500 Subject: [PATCH] Add setting to allow turning of persistence for tasks globally (#15881) --- docs/3.0/develop/settings-ref.mdx | 12 +++++++ schemas/settings.schema.json | 16 +++++++++ src/prefect/context.py | 8 +++-- src/prefect/results.py | 44 ++++++++++++++---------- src/prefect/settings/models/tasks.py | 6 ++++ src/prefect/task_engine.py | 25 ++++++++++---- tests/results/test_result_store.py | 50 ++++++++++++++++++++++++++++ tests/test_settings.py | 1 + 8 files changed, 137 insertions(+), 25 deletions(-) diff --git a/docs/3.0/develop/settings-ref.mdx b/docs/3.0/develop/settings-ref.mdx index 1fe191f4d97a..f6e23ed22282 100644 --- a/docs/3.0/develop/settings-ref.mdx +++ b/docs/3.0/develop/settings-ref.mdx @@ -2216,6 +2216,18 @@ This value sets the default retry delay seconds for all tasks. **Supported environment variables**: `PREFECT_TASKS_DEFAULT_RETRY_DELAY_SECONDS`, `PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS` +### `default_persist_result` +If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. Note that setting to `False` will override the behavior set by a parent flow or task. + +**Type**: `boolean | None` + +**Default**: `None` + +**TOML dotted key path**: `tasks.default_persist_result` + +**Supported environment variables**: +`PREFECT_TASKS_DEFAULT_PERSIST_RESULT` + ### `runner` Settings for controlling task runner behavior diff --git a/schemas/settings.schema.json b/schemas/settings.schema.json index 109fbdc210e2..d7982a4b32ce 100644 --- a/schemas/settings.schema.json +++ b/schemas/settings.schema.json @@ -1971,6 +1971,22 @@ ], "title": "Default Retry Delay Seconds" }, + "default_persist_result": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "null" + } + ], + "default": null, + "description": "If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. Note that setting to `False` will override the behavior set by a parent flow or task.", + "supported_environment_variables": [ + "PREFECT_TASKS_DEFAULT_PERSIST_RESULT" + ], + "title": "Default Persist Result" + }, "runner": { "$ref": "#/$defs/TasksRunnerSettings", "description": "Settings for controlling task runner behavior", diff --git a/src/prefect/context.py b/src/prefect/context.py index f9eb82db01c1..f4ae26606196 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -38,7 +38,11 @@ from prefect.client.schemas import FlowRun, TaskRun from prefect.events.worker import EventsWorker from prefect.exceptions import MissingContextError -from prefect.results import ResultStore, get_default_persist_setting +from prefect.results import ( + ResultStore, + get_default_persist_setting, + get_default_persist_setting_for_tasks, +) from prefect.settings import Profile, Settings from prefect.settings.legacy import _get_settings_fields from prefect.states import State @@ -397,7 +401,7 @@ class TaskRunContext(RunContext): # Result handling result_store: ResultStore - persist_result: bool = Field(default_factory=get_default_persist_setting) + persist_result: bool = Field(default_factory=get_default_persist_setting_for_tasks) __var__ = ContextVar("task_run") diff --git a/src/prefect/results.py b/src/prefect/results.py index d0a5cdc1ea71..dd17f614953d 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -56,13 +56,7 @@ from prefect.locking.protocol import LockManager from prefect.logging import get_logger from prefect.serializers import PickleSerializer, Serializer -from prefect.settings import ( - PREFECT_DEFAULT_RESULT_STORAGE_BLOCK, - PREFECT_LOCAL_STORAGE_PATH, - PREFECT_RESULTS_DEFAULT_SERIALIZER, - PREFECT_RESULTS_PERSIST_BY_DEFAULT, - PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK, -) +from prefect.settings.context import get_current_settings from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import sync_compatible from prefect.utilities.pydantic import get_dispatch_key, lookup_type, register_base_type @@ -94,8 +88,9 @@ async def get_default_result_storage() -> WritableFileSystem: """ Generate a default file system for result storage. """ - default_block = PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value() - basepath = PREFECT_LOCAL_STORAGE_PATH.value() + settings = get_current_settings() + default_block = settings.results.default_storage_block + basepath = settings.results.local_storage_path cache_key = (str(default_block), str(basepath)) @@ -169,13 +164,14 @@ async def get_or_create_default_task_scheduling_storage() -> ResultStorage: """ Generate a default file system for background task parameter/result storage. """ - default_block = PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK.value() + settings = get_current_settings() + default_block = settings.tasks.scheduling.default_storage_block if default_block is not None: return await Block.load(default_block) # otherwise, use the local file system - basepath = PREFECT_LOCAL_STORAGE_PATH.value() + basepath = settings.results.local_storage_path return LocalFileSystem(basepath=basepath) @@ -183,22 +179,36 @@ def get_default_result_serializer() -> Serializer: """ Generate a default file system for result storage. """ - return resolve_serializer(PREFECT_RESULTS_DEFAULT_SERIALIZER.value()) + settings = get_current_settings() + return resolve_serializer(settings.results.default_serializer) def get_default_persist_setting() -> bool: """ - Return the default option for result persistence (False). + Return the default option for result persistence. + """ + settings = get_current_settings() + return settings.results.persist_by_default + + +def get_default_persist_setting_for_tasks() -> bool: + """ + Return the default option for result persistence for tasks. """ - return PREFECT_RESULTS_PERSIST_BY_DEFAULT.value() + settings = get_current_settings() + return ( + settings.tasks.default_persist_result + if settings.tasks.default_persist_result is not None + else settings.results.persist_by_default + ) def should_persist_result() -> bool: """ Return the default option for result persistence determined by the current run context. - If there is no current run context, the default value set by - `PREFECT_RESULTS_PERSIST_BY_DEFAULT` will be returned. + If there is no current run context, the value of `results.persist_by_default` on the + current settings will be returned. """ from prefect.context import FlowRunContext, TaskRunContext @@ -209,7 +219,7 @@ def should_persist_result() -> bool: if flow_run_context is not None: return flow_run_context.persist_result - return PREFECT_RESULTS_PERSIST_BY_DEFAULT.value() + return get_default_persist_setting() def _format_user_supplied_storage_key(key: str) -> str: diff --git a/src/prefect/settings/models/tasks.py b/src/prefect/settings/models/tasks.py index 48e1666ff29e..86b8e20e08ba 100644 --- a/src/prefect/settings/models/tasks.py +++ b/src/prefect/settings/models/tasks.py @@ -73,6 +73,12 @@ class TasksSettings(PrefectBaseSettings): ), ) + default_persist_result: Optional[bool] = Field( + default=None, + description="If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. " + "Note that setting to `False` will override the behavior set by a parent flow or task.", + ) + runner: TasksRunnerSettings = Field( default_factory=TasksRunnerSettings, description="Settings for controlling task runner behavior", diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index ad984b2bfa2d..43238d1cbc6c 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -67,6 +67,7 @@ PREFECT_DEBUG_MODE, PREFECT_TASKS_REFRESH_CACHE, ) +from prefect.settings.context import get_current_settings from prefect.states import ( AwaitingRetry, Completed, @@ -604,6 +605,8 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): should_log_prints, ) + settings = get_current_settings() + if client is None: client = self.client if not self.task_run: @@ -612,6 +615,12 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): with ExitStack() as stack: if log_prints := should_log_prints(self.task): stack.enter_context(patch_print()) + if self.task.persist_result is not None: + persist_result = self.task.persist_result + elif settings.tasks.default_persist_result is not None: + persist_result = settings.tasks.default_persist_result + else: + persist_result = should_persist_result() stack.enter_context( TaskRunContext( task=self.task, @@ -622,9 +631,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): self.task, _sync=True ), client=client, - persist_result=self.task.persist_result - if self.task.persist_result is not None - else should_persist_result(), + persist_result=persist_result, ) ) stack.enter_context(ConcurrencyContextV1()) @@ -1106,6 +1113,8 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): should_log_prints, ) + settings = get_current_settings() + if client is None: client = self.client if not self.task_run: @@ -1114,6 +1123,12 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): with ExitStack() as stack: if log_prints := should_log_prints(self.task): stack.enter_context(patch_print()) + if self.task.persist_result is not None: + persist_result = self.task.persist_result + elif settings.tasks.default_persist_result is not None: + persist_result = settings.tasks.default_persist_result + else: + persist_result = should_persist_result() stack.enter_context( TaskRunContext( task=self.task, @@ -1124,9 +1139,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): self.task, _sync=False ), client=client, - persist_result=self.task.persist_result - if self.task.persist_result is not None - else should_persist_result(), + persist_result=persist_result, ) ) stack.enter_context(ConcurrencyContext()) diff --git a/tests/results/test_result_store.py b/tests/results/test_result_store.py index 513d186410a8..f516af249c9d 100644 --- a/tests/results/test_result_store.py +++ b/tests/results/test_result_store.py @@ -16,6 +16,7 @@ PREFECT_LOCAL_STORAGE_PATH, PREFECT_RESULTS_DEFAULT_SERIALIZER, PREFECT_RESULTS_PERSIST_BY_DEFAULT, + PREFECT_TASKS_DEFAULT_PERSIST_RESULT, temporary_settings, ) from prefect.testing.utilities import assert_blocks_equal @@ -440,6 +441,55 @@ def bar(): assert persist_result is True + with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: True}): + persist_result = bar() + + assert persist_result is True + + +async def test_task_can_opt_out_of_result_persistence_with_setting(): + with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: True}): + + @task(persist_result=False) + def bar(): + return should_persist_result() + + persist_result = bar() + assert persist_result is False + + async def abar(): + return should_persist_result() + + persist_result = await abar() + assert persist_result is False + + +async def test_can_opt_out_of_result_persistence_with_setting_when_flow_uses_feature(): + with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: False}): + + @flow(persist_result=True) + def foo(): + return bar() + + @task + def bar(): + return should_persist_result() + + persist_result = foo() + + assert persist_result is False + + @flow(persist_result=True) + async def afoo(): + return await abar() + + @task + async def abar(): + return should_persist_result() + + persist_result = await afoo() + assert persist_result is False + def test_nested_flow_custom_persist_setting(): @flow(persist_result=True) diff --git a/tests/test_settings.py b/tests/test_settings.py index 46dab2935fcb..40a64b82edf9 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -389,6 +389,7 @@ "PREFECT_SILENCE_API_URL_MISCONFIGURATION": {"test_value": True}, "PREFECT_SQLALCHEMY_MAX_OVERFLOW": {"test_value": 10, "legacy": True}, "PREFECT_SQLALCHEMY_POOL_SIZE": {"test_value": 10, "legacy": True}, + "PREFECT_TASKS_DEFAULT_PERSIST_RESULT": {"test_value": True}, "PREFECT_TASKS_DEFAULT_RETRIES": {"test_value": 10}, "PREFECT_TASKS_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10}, "PREFECT_TASKS_REFRESH_CACHE": {"test_value": True},