diff --git a/src/prefect/flow_engine.py b/src/prefect/flow_engine.py index e72233d1a9af..3a04180f7ae0 100644 --- a/src/prefect/flow_engine.py +++ b/src/prefect/flow_engine.py @@ -50,7 +50,7 @@ from prefect.results import ( BaseResult, ResultStore, - get_current_result_store, + get_result_store, should_persist_result, ) from prefect.settings import PREFECT_DEBUG_MODE @@ -207,7 +207,7 @@ def begin_run(self) -> State: self.handle_exception( exc, msg=message, - result_store=get_current_result_store().update_for_flow( + result_store=get_result_store().update_for_flow( self.flow, _sync=True ), ) @@ -512,7 +512,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): flow_run=self.flow_run, parameters=self.parameters, client=client, - result_store=get_current_result_store().update_for_flow( + result_store=get_result_store().update_for_flow( self.flow, _sync=True ), task_runner=task_runner, diff --git a/src/prefect/results.py b/src/prefect/results.py index f29a09b5452a..d3e646ef921c 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -815,7 +815,7 @@ async def read_parameters(self, identifier: UUID) -> Dict[str, Any]: return record.result -def get_current_result_store() -> ResultStore: +def get_result_store() -> ResultStore: """ Get the current result store. """ diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 0ffc6acec2b1..775007c2827d 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -59,7 +59,7 @@ BaseResult, ResultRecord, _format_user_supplied_storage_key, - get_current_result_store, + get_result_store, should_persist_result, ) from prefect.settings import ( @@ -466,7 +466,7 @@ def handle_success(self, result: R, transaction: Transaction) -> R: terminal_state = run_coro_as_sync( return_value_to_state( result, - result_store=get_current_result_store(), + result_store=get_result_store(), key=transaction.key, expiration=expiration, ) @@ -542,7 +542,7 @@ def handle_exception(self, exc: Exception) -> None: exception_to_failed_state( exc, message="Task run encountered an exception", - result_store=get_current_result_store(), + result_store=get_result_store(), write_result=True, ) ) @@ -594,7 +594,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None): log_prints=log_prints, task_run=self.task_run, parameters=self.parameters, - result_store=get_current_result_store().update_for_task( + result_store=get_result_store().update_for_task( self.task, _sync=True ), client=client, @@ -727,7 +727,7 @@ def transaction_context(self) -> Generator[Transaction, None, None]: with transaction( key=self.compute_transaction_key(), - store=get_current_result_store(), + store=get_result_store(), overwrite=overwrite, logger=self.logger, write_on_commit=should_persist_result(), @@ -973,7 +973,7 @@ async def handle_success(self, result: R, transaction: Transaction) -> R: terminal_state = await return_value_to_state( result, - result_store=get_current_result_store(), + result_store=get_result_store(), key=transaction.key, expiration=expiration, ) @@ -1047,7 +1047,7 @@ async def handle_exception(self, exc: Exception) -> None: state = await exception_to_failed_state( exc, message="Task run encountered an exception", - result_store=get_current_result_store(), + result_store=get_result_store(), ) self.record_terminal_state_timing(state) await self.set_state(state) @@ -1097,7 +1097,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None): log_prints=log_prints, task_run=self.task_run, parameters=self.parameters, - result_store=await get_current_result_store().update_for_task( + result_store=await get_result_store().update_for_task( self.task, _sync=False ), client=client, @@ -1227,7 +1227,7 @@ async def transaction_context(self) -> AsyncGenerator[Transaction, None]: with transaction( key=self.compute_transaction_key(), - store=get_current_result_store(), + store=get_result_store(), overwrite=overwrite, logger=self.logger, write_on_commit=should_persist_result(), diff --git a/tests/results/test_flow_results.py b/tests/results/test_flow_results.py index 627333ac1b7e..89f708ca24bf 100644 --- a/tests/results/test_flow_results.py +++ b/tests/results/test_flow_results.py @@ -9,7 +9,7 @@ from prefect.filesystems import LocalFileSystem from prefect.results import ( ResultRecord, - get_current_result_store, + get_result_store, ) from prefect.serializers import ( CompressedSerializer, @@ -80,7 +80,7 @@ async def test_flow_with_uncached_but_persisted_result(prefect_client): @flow(persist_result=True, cache_result_in_memory=False) def foo(): nonlocal store - store = get_current_result_store() + store = get_result_store() return 1 state = foo(return_state=True) diff --git a/tests/results/test_task_results.py b/tests/results/test_task_results.py index 4b67d3585c80..4307d0bcc20c 100644 --- a/tests/results/test_task_results.py +++ b/tests/results/test_task_results.py @@ -4,7 +4,7 @@ from prefect.filesystems import LocalFileSystem from prefect.flows import flow -from prefect.results import get_current_result_store +from prefect.results import get_result_store from prefect.serializers import JSONSerializer, PickleSerializer from prefect.settings import ( PREFECT_HOME, @@ -96,7 +96,7 @@ def foo(): @task(persist_result=True) def bar(): nonlocal store - store = get_current_result_store() + store = get_result_store() return 1 flow_state = foo(return_state=True) @@ -115,7 +115,7 @@ def foo(): @task(persist_result=True, cache_result_in_memory=False) def bar(): nonlocal store - store = get_current_result_store() + store = get_result_store() return 1 flow_state = foo(return_state=True) @@ -140,7 +140,7 @@ async def test_task_with_uncached_but_persisted_result_not_cached_during_flow( def foo(): state = bar(return_state=True) nonlocal store - store = get_current_result_store() + store = get_result_store() assert state.data.metadata.storage_key not in store.cache assert state.result() == 1 assert state.data.metadata.storage_key not in store.cache diff --git a/tests/test_context.py b/tests/test_context.py index 0ee9a5daffb8..5db3c2a42c2f 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -24,7 +24,7 @@ use_profile, ) from prefect.exceptions import MissingContextError -from prefect.results import ResultStore, get_current_result_store +from prefect.results import ResultStore, get_result_store from prefect.settings import ( DEFAULT_PROFILES_PATH, PREFECT_API_KEY, @@ -181,9 +181,7 @@ def bar(): task=bar, task_run=task_run, client=prefect_client, - result_store=await get_current_result_store().update_for_task( - bar, _sync=False - ), + result_store=await get_result_store().update_for_task(bar, _sync=False), parameters={"foo": "bar"}, ) as task_ctx: assert get_run_context() is task_ctx, "Task context takes precedence" @@ -452,7 +450,7 @@ def bar(): task=bar, task_run=task_run, client=prefect_client, - result_store=await get_current_result_store().update_for_task(bar), + result_store=await get_result_store().update_for_task(bar), parameters={"foo": "bar"}, ) as task_ctx: serialized = serialize_context() @@ -595,7 +593,7 @@ def bar(): task=bar, task_run=task_run, client=prefect_client, - result_store=await get_current_result_store().update_for_task(bar), + result_store=await get_result_store().update_for_task(bar), parameters={"foo": "bar"}, )