Skip to content

Commit

Permalink
Rename get_current_result_store to get_result_store
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Sep 9, 2024
1 parent 4136051 commit f72b067
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 25 deletions.
6 changes: 3 additions & 3 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from prefect.results import (
BaseResult,
ResultStore,
get_current_result_store,
get_result_store,
should_persist_result,
)
from prefect.settings import PREFECT_DEBUG_MODE
Expand Down Expand Up @@ -207,7 +207,7 @@ def begin_run(self) -> State:
self.handle_exception(
exc,
msg=message,
result_store=get_current_result_store().update_for_flow(
result_store=get_result_store().update_for_flow(
self.flow, _sync=True
),
)
Expand Down Expand Up @@ -512,7 +512,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
flow_run=self.flow_run,
parameters=self.parameters,
client=client,
result_store=get_current_result_store().update_for_flow(
result_store=get_result_store().update_for_flow(
self.flow, _sync=True
),
task_runner=task_runner,
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ async def read_parameters(self, identifier: UUID) -> Dict[str, Any]:
return record.result


def get_current_result_store() -> ResultStore:
def get_result_store() -> ResultStore:
"""
Get the current result store.
"""
Expand Down
18 changes: 9 additions & 9 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
BaseResult,
ResultRecord,
_format_user_supplied_storage_key,
get_current_result_store,
get_result_store,
should_persist_result,
)
from prefect.settings import (
Expand Down Expand Up @@ -466,7 +466,7 @@ def handle_success(self, result: R, transaction: Transaction) -> R:
terminal_state = run_coro_as_sync(
return_value_to_state(
result,
result_store=get_current_result_store(),
result_store=get_result_store(),
key=transaction.key,
expiration=expiration,
)
Expand Down Expand Up @@ -542,7 +542,7 @@ def handle_exception(self, exc: Exception) -> None:
exception_to_failed_state(
exc,
message="Task run encountered an exception",
result_store=get_current_result_store(),
result_store=get_result_store(),
write_result=True,
)
)
Expand Down Expand Up @@ -594,7 +594,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
log_prints=log_prints,
task_run=self.task_run,
parameters=self.parameters,
result_store=get_current_result_store().update_for_task(
result_store=get_result_store().update_for_task(
self.task, _sync=True
),
client=client,
Expand Down Expand Up @@ -727,7 +727,7 @@ def transaction_context(self) -> Generator[Transaction, None, None]:

with transaction(
key=self.compute_transaction_key(),
store=get_current_result_store(),
store=get_result_store(),
overwrite=overwrite,
logger=self.logger,
write_on_commit=should_persist_result(),
Expand Down Expand Up @@ -973,7 +973,7 @@ async def handle_success(self, result: R, transaction: Transaction) -> R:

terminal_state = await return_value_to_state(
result,
result_store=get_current_result_store(),
result_store=get_result_store(),
key=transaction.key,
expiration=expiration,
)
Expand Down Expand Up @@ -1047,7 +1047,7 @@ async def handle_exception(self, exc: Exception) -> None:
state = await exception_to_failed_state(
exc,
message="Task run encountered an exception",
result_store=get_current_result_store(),
result_store=get_result_store(),
)
self.record_terminal_state_timing(state)
await self.set_state(state)
Expand Down Expand Up @@ -1097,7 +1097,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
log_prints=log_prints,
task_run=self.task_run,
parameters=self.parameters,
result_store=await get_current_result_store().update_for_task(
result_store=await get_result_store().update_for_task(
self.task, _sync=False
),
client=client,
Expand Down Expand Up @@ -1227,7 +1227,7 @@ async def transaction_context(self) -> AsyncGenerator[Transaction, None]:

with transaction(
key=self.compute_transaction_key(),
store=get_current_result_store(),
store=get_result_store(),
overwrite=overwrite,
logger=self.logger,
write_on_commit=should_persist_result(),
Expand Down
4 changes: 2 additions & 2 deletions tests/results/test_flow_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from prefect.filesystems import LocalFileSystem
from prefect.results import (
ResultRecord,
get_current_result_store,
get_result_store,
)
from prefect.serializers import (
CompressedSerializer,
Expand Down Expand Up @@ -80,7 +80,7 @@ async def test_flow_with_uncached_but_persisted_result(prefect_client):
@flow(persist_result=True, cache_result_in_memory=False)
def foo():
nonlocal store
store = get_current_result_store()
store = get_result_store()
return 1

state = foo(return_state=True)
Expand Down
8 changes: 4 additions & 4 deletions tests/results/test_task_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from prefect.filesystems import LocalFileSystem
from prefect.flows import flow
from prefect.results import get_current_result_store
from prefect.results import get_result_store
from prefect.serializers import JSONSerializer, PickleSerializer
from prefect.settings import (
PREFECT_HOME,
Expand Down Expand Up @@ -96,7 +96,7 @@ def foo():
@task(persist_result=True)
def bar():
nonlocal store
store = get_current_result_store()
store = get_result_store()
return 1

flow_state = foo(return_state=True)
Expand All @@ -115,7 +115,7 @@ def foo():
@task(persist_result=True, cache_result_in_memory=False)
def bar():
nonlocal store
store = get_current_result_store()
store = get_result_store()
return 1

flow_state = foo(return_state=True)
Expand All @@ -140,7 +140,7 @@ async def test_task_with_uncached_but_persisted_result_not_cached_during_flow(
def foo():
state = bar(return_state=True)
nonlocal store
store = get_current_result_store()
store = get_result_store()
assert state.data.metadata.storage_key not in store.cache
assert state.result() == 1
assert state.data.metadata.storage_key not in store.cache
Expand Down
10 changes: 4 additions & 6 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use_profile,
)
from prefect.exceptions import MissingContextError
from prefect.results import ResultStore, get_current_result_store
from prefect.results import ResultStore, get_result_store
from prefect.settings import (
DEFAULT_PROFILES_PATH,
PREFECT_API_KEY,
Expand Down Expand Up @@ -181,9 +181,7 @@ def bar():
task=bar,
task_run=task_run,
client=prefect_client,
result_store=await get_current_result_store().update_for_task(
bar, _sync=False
),
result_store=await get_result_store().update_for_task(bar, _sync=False),
parameters={"foo": "bar"},
) as task_ctx:
assert get_run_context() is task_ctx, "Task context takes precedence"
Expand Down Expand Up @@ -452,7 +450,7 @@ def bar():
task=bar,
task_run=task_run,
client=prefect_client,
result_store=await get_current_result_store().update_for_task(bar),
result_store=await get_result_store().update_for_task(bar),
parameters={"foo": "bar"},
) as task_ctx:
serialized = serialize_context()
Expand Down Expand Up @@ -595,7 +593,7 @@ def bar():
task=bar,
task_run=task_run,
client=prefect_client,
result_store=await get_current_result_store().update_for_task(bar),
result_store=await get_result_store().update_for_task(bar),
parameters={"foo": "bar"},
)

Expand Down

0 comments on commit f72b067

Please sign in to comment.