Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to allow turning of persistence for tasks globally (#15881) #21

Open
wants to merge 1 commit into
base: frances/test_promptless5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/3.0/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2216,6 +2216,18 @@ This value sets the default retry delay seconds for all tasks.
**Supported environment variables**:
`PREFECT_TASKS_DEFAULT_RETRY_DELAY_SECONDS`, `PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS`

### `default_persist_result`
If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. Note that setting to `False` will override the behavior set by a parent flow or task.

**Type**: `boolean | None`

**Default**: `None`

**TOML dotted key path**: `tasks.default_persist_result`

**Supported environment variables**:
`PREFECT_TASKS_DEFAULT_PERSIST_RESULT`

### `runner`
Settings for controlling task runner behavior

Expand Down
16 changes: 16 additions & 0 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,22 @@
],
"title": "Default Retry Delay Seconds"
},
"default_persist_result": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. Note that setting to `False` will override the behavior set by a parent flow or task.",
"supported_environment_variables": [
"PREFECT_TASKS_DEFAULT_PERSIST_RESULT"
],
"title": "Default Persist Result"
},
"runner": {
"$ref": "#/$defs/TasksRunnerSettings",
"description": "Settings for controlling task runner behavior",
Expand Down
8 changes: 6 additions & 2 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from prefect.client.schemas import FlowRun, TaskRun
from prefect.events.worker import EventsWorker
from prefect.exceptions import MissingContextError
from prefect.results import ResultStore, get_default_persist_setting
from prefect.results import (
ResultStore,
get_default_persist_setting,
get_default_persist_setting_for_tasks,
)
from prefect.settings import Profile, Settings
from prefect.settings.legacy import _get_settings_fields
from prefect.states import State
Expand Down Expand Up @@ -397,7 +401,7 @@ class TaskRunContext(RunContext):

# Result handling
result_store: ResultStore
persist_result: bool = Field(default_factory=get_default_persist_setting)
persist_result: bool = Field(default_factory=get_default_persist_setting_for_tasks)

__var__ = ContextVar("task_run")

Expand Down
44 changes: 27 additions & 17 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@
from prefect.locking.protocol import LockManager
from prefect.logging import get_logger
from prefect.serializers import PickleSerializer, Serializer
from prefect.settings import (
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK,
PREFECT_LOCAL_STORAGE_PATH,
PREFECT_RESULTS_DEFAULT_SERIALIZER,
PREFECT_RESULTS_PERSIST_BY_DEFAULT,
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK,
)
from prefect.settings.context import get_current_settings
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.pydantic import get_dispatch_key, lookup_type, register_base_type
Expand Down Expand Up @@ -94,8 +88,9 @@ async def get_default_result_storage() -> WritableFileSystem:
"""
Generate a default file system for result storage.
"""
default_block = PREFECT_DEFAULT_RESULT_STORAGE_BLOCK.value()
basepath = PREFECT_LOCAL_STORAGE_PATH.value()
settings = get_current_settings()
default_block = settings.results.default_storage_block
basepath = settings.results.local_storage_path

cache_key = (str(default_block), str(basepath))

Expand Down Expand Up @@ -169,36 +164,51 @@ async def get_or_create_default_task_scheduling_storage() -> ResultStorage:
"""
Generate a default file system for background task parameter/result storage.
"""
default_block = PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK.value()
settings = get_current_settings()
default_block = settings.tasks.scheduling.default_storage_block

if default_block is not None:
return await Block.load(default_block)

# otherwise, use the local file system
basepath = PREFECT_LOCAL_STORAGE_PATH.value()
basepath = settings.results.local_storage_path
return LocalFileSystem(basepath=basepath)


def get_default_result_serializer() -> Serializer:
"""
Generate a default file system for result storage.
"""
return resolve_serializer(PREFECT_RESULTS_DEFAULT_SERIALIZER.value())
settings = get_current_settings()
return resolve_serializer(settings.results.default_serializer)


def get_default_persist_setting() -> bool:
"""
Return the default option for result persistence (False).
Return the default option for result persistence.
"""
settings = get_current_settings()
return settings.results.persist_by_default


def get_default_persist_setting_for_tasks() -> bool:
"""
Return the default option for result persistence for tasks.
"""
return PREFECT_RESULTS_PERSIST_BY_DEFAULT.value()
settings = get_current_settings()
return (
settings.tasks.default_persist_result
if settings.tasks.default_persist_result is not None
else settings.results.persist_by_default
)


def should_persist_result() -> bool:
"""
Return the default option for result persistence determined by the current run context.

If there is no current run context, the default value set by
`PREFECT_RESULTS_PERSIST_BY_DEFAULT` will be returned.
If there is no current run context, the value of `results.persist_by_default` on the
current settings will be returned.
"""
from prefect.context import FlowRunContext, TaskRunContext

Expand All @@ -209,7 +219,7 @@ def should_persist_result() -> bool:
if flow_run_context is not None:
return flow_run_context.persist_result

return PREFECT_RESULTS_PERSIST_BY_DEFAULT.value()
return get_default_persist_setting()


def _format_user_supplied_storage_key(key: str) -> str:
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/settings/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class TasksSettings(PrefectBaseSettings):
),
)

default_persist_result: Optional[bool] = Field(
default=None,
description="If `True`, results will be persisted by default for all tasks. Set to `False` to disable persistence by default. "
"Note that setting to `False` will override the behavior set by a parent flow or task.",
)

runner: TasksRunnerSettings = Field(
default_factory=TasksRunnerSettings,
description="Settings for controlling task runner behavior",
Expand Down
25 changes: 19 additions & 6 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
PREFECT_DEBUG_MODE,
PREFECT_TASKS_REFRESH_CACHE,
)
from prefect.settings.context import get_current_settings
from prefect.states import (
AwaitingRetry,
Completed,
Expand Down Expand Up @@ -604,6 +605,8 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
should_log_prints,
)

settings = get_current_settings()

if client is None:
client = self.client
if not self.task_run:
Expand All @@ -612,6 +615,12 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
with ExitStack() as stack:
if log_prints := should_log_prints(self.task):
stack.enter_context(patch_print())
if self.task.persist_result is not None:
persist_result = self.task.persist_result
elif settings.tasks.default_persist_result is not None:
persist_result = settings.tasks.default_persist_result
else:
persist_result = should_persist_result()
stack.enter_context(
TaskRunContext(
task=self.task,
Expand All @@ -622,9 +631,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
self.task, _sync=True
),
client=client,
persist_result=self.task.persist_result
if self.task.persist_result is not None
else should_persist_result(),
persist_result=persist_result,
)
)
stack.enter_context(ConcurrencyContextV1())
Expand Down Expand Up @@ -1106,6 +1113,8 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
should_log_prints,
)

settings = get_current_settings()

if client is None:
client = self.client
if not self.task_run:
Expand All @@ -1114,6 +1123,12 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
with ExitStack() as stack:
if log_prints := should_log_prints(self.task):
stack.enter_context(patch_print())
if self.task.persist_result is not None:
persist_result = self.task.persist_result
elif settings.tasks.default_persist_result is not None:
persist_result = settings.tasks.default_persist_result
else:
persist_result = should_persist_result()
stack.enter_context(
TaskRunContext(
task=self.task,
Expand All @@ -1124,9 +1139,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
self.task, _sync=False
),
client=client,
persist_result=self.task.persist_result
if self.task.persist_result is not None
else should_persist_result(),
persist_result=persist_result,
)
)
stack.enter_context(ConcurrencyContext())
Expand Down
50 changes: 50 additions & 0 deletions tests/results/test_result_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PREFECT_LOCAL_STORAGE_PATH,
PREFECT_RESULTS_DEFAULT_SERIALIZER,
PREFECT_RESULTS_PERSIST_BY_DEFAULT,
PREFECT_TASKS_DEFAULT_PERSIST_RESULT,
temporary_settings,
)
from prefect.testing.utilities import assert_blocks_equal
Expand Down Expand Up @@ -440,6 +441,55 @@ def bar():

assert persist_result is True

with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: True}):
persist_result = bar()

assert persist_result is True


async def test_task_can_opt_out_of_result_persistence_with_setting():
with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: True}):

@task(persist_result=False)
def bar():
return should_persist_result()

persist_result = bar()
assert persist_result is False

async def abar():
return should_persist_result()

persist_result = await abar()
assert persist_result is False


async def test_can_opt_out_of_result_persistence_with_setting_when_flow_uses_feature():
with temporary_settings({PREFECT_TASKS_DEFAULT_PERSIST_RESULT: False}):

@flow(persist_result=True)
def foo():
return bar()

@task
def bar():
return should_persist_result()

persist_result = foo()

assert persist_result is False

@flow(persist_result=True)
async def afoo():
return await abar()

@task
async def abar():
return should_persist_result()

persist_result = await afoo()
assert persist_result is False


def test_nested_flow_custom_persist_setting():
@flow(persist_result=True)
Expand Down
1 change: 1 addition & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@
"PREFECT_SILENCE_API_URL_MISCONFIGURATION": {"test_value": True},
"PREFECT_SQLALCHEMY_MAX_OVERFLOW": {"test_value": 10, "legacy": True},
"PREFECT_SQLALCHEMY_POOL_SIZE": {"test_value": 10, "legacy": True},
"PREFECT_TASKS_DEFAULT_PERSIST_RESULT": {"test_value": True},
"PREFECT_TASKS_DEFAULT_RETRIES": {"test_value": 10},
"PREFECT_TASKS_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10},
"PREFECT_TASKS_REFRESH_CACHE": {"test_value": True},
Expand Down
Loading