diff --git a/src/prefect/_internal/concurrency/services.py b/src/prefect/_internal/concurrency/services.py index 1f992763ef83..eabee601948a 100644 --- a/src/prefect/_internal/concurrency/services.py +++ b/src/prefect/_internal/concurrency/services.py @@ -32,7 +32,7 @@ def __init__(self, *args: Hashable) -> None: self._task: Optional[asyncio.Task[None]] = None self._stopped: bool = False self._started: bool = False - self._key = hash(args) + self._key = hash((self.__class__, *args)) self._lock = threading.Lock() self._queue_get_thread = WorkerThread( # TODO: This thread should not need to be a daemon but when it is not, it @@ -256,7 +256,7 @@ def instance(cls, *args: Hashable) -> Self: If an instance already exists with the given arguments, it will be returned. """ with cls._instance_lock: - key = hash(args) + key = hash((cls, *args)) if key not in cls._instances: cls._instances[key] = cls._new_instance(*args) diff --git a/tests/_internal/concurrency/test_services.py b/tests/_internal/concurrency/test_services.py index fe41e17820b3..15cea7996acc 100644 --- a/tests/_internal/concurrency/test_services.py +++ b/tests/_internal/concurrency/test_services.py @@ -97,6 +97,14 @@ def test_instance_returns_new_instance_with_unique_key(): assert isinstance(new_instance, MockService) +def test_different_subclasses_have_unique_instances(): + instance = MockService.instance() + assert isinstance(instance, MockService) + new_instance = MockBatchedService.instance() + assert new_instance is not instance + assert isinstance(new_instance, MockBatchedService) + + def test_instance_returns_same_instance_after_error(): event = threading.Event()