diff --git a/src/prefect/tasks.py b/src/prefect/tasks.py index 29069cd7499f..efa340c2c41e 100644 --- a/src/prefect/tasks.py +++ b/src/prefect/tasks.py @@ -6,7 +6,6 @@ import datetime import inspect -import os from copy import copy from functools import partial, update_wrapper from typing import ( @@ -76,6 +75,8 @@ logger = get_logger("tasks") +NUM_CHARS_DYNAMIC_KEY: int = 8 + def task_input_hash( context: "TaskRunContext", arguments: Dict[str, Any] @@ -125,6 +126,28 @@ def retry_backoff_callable(retries: int) -> List[float]: return retry_backoff_callable +def _generate_task_key(fn: Callable[..., Any]) -> str: + """Generate a task key based on the function name and source code. + We may eventually want some sort of top-level namespace here to + disambiguate tasks with the same function name in different modules, + in a more human-readable way, while avoiding relative import problems (see #12337). + As long as the task implementations are unique (even if named the same), we should + not have any collisions. + Args: + fn: The function to generate a task key for. + """ + if not hasattr(fn, "__qualname__"): + return to_qualified_name(type(fn)) + + qualname = fn.__qualname__.split(".")[-1] + + code_hash = ( + h[:NUM_CHARS_DYNAMIC_KEY] if (h := hash_objects(fn.__code__)) else "unknown" + ) + + return f"{qualname}-{code_hash}" + + @PrefectObjectRegistry.register_instances class Task(Generic[P, R]): """ @@ -292,17 +315,7 @@ def __init__( self.tags = set(tags if tags else []) - if not hasattr(self.fn, "__qualname__"): - self.task_key = to_qualified_name(type(self.fn)) - else: - try: - task_origin_hash = hash_objects( - self.name, os.path.abspath(inspect.getsourcefile(self.fn)) - ) - except TypeError: - task_origin_hash = "unknown-source-file" - - self.task_key = f"{self.fn.__qualname__}-{task_origin_hash}" + self.task_key = _generate_task_key(self.fn) self.cache_key_fn = cache_key_fn self.cache_expiration = cache_expiration diff --git a/tests/test_background_tasks.py b/tests/test_background_tasks.py index f727b7a8fb30..e1f6d8f844c1 100644 --- a/tests/test_background_tasks.py +++ b/tests/test_background_tasks.py @@ -1,6 +1,4 @@ import asyncio -import inspect -import os from pathlib import Path from typing import AsyncGenerator, Iterable, Tuple from unittest import mock @@ -26,7 +24,6 @@ ) from prefect.task_server import TaskServer from prefect.utilities.asyncutils import sync_compatible -from prefect.utilities.hashing import hash_objects @sync_compatible @@ -395,22 +392,3 @@ async def bar(x: int, mappable: Iterable) -> Tuple[int, Iterable]: assert await result_factory.read_parameters( task_run.state.state_details.task_parameters_id ) == {"x": i + 1, "mappable": ["some", "iterable"]} - - -class TestTaskKey: - def test_task_key_includes_qualname_and_source_file_hash(self): - def some_fn(): - pass - - t = Task(fn=some_fn) - source_file = os.path.abspath(inspect.getsourcefile(some_fn)) - task_origin_hash = hash_objects(t.name, source_file) - assert t.task_key == f"{some_fn.__qualname__}-{task_origin_hash}" - - def test_task_key_handles_unknown_source_file(self, monkeypatch): - def some_fn(): - pass - - monkeypatch.setattr(inspect, "getsourcefile", lambda x: None) - t = Task(fn=some_fn) - assert t.task_key == f"{some_fn.__qualname__}-unknown-source-file" diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 81d408f140f5..b475d81db11b 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -140,6 +140,20 @@ def my_task(): assert my_task.name == "another_name" +class TestTaskKey: + def test_task_key_typical_case(self): + @task + def my_task(): + pass + + assert my_task.task_key.startswith("my_task-") + + def test_task_key_after_import(self): + from tests.generic_tasks import noop + + assert noop.task_key.startswith("noop-") + + class TestTaskRunName: def test_run_name_default(self): @task