diff --git a/python_modules/dagster/dagster/_core/instance/config.py b/python_modules/dagster/dagster/_core/instance/config.py index ec5069910dd16..062439b998e6f 100644 --- a/python_modules/dagster/dagster/_core/instance/config.py +++ b/python_modules/dagster/dagster/_core/instance/config.py @@ -161,6 +161,13 @@ def validate_concurrency_config(dagster_config_dict: Mapping[str, Any]): [], None, ) + granularity = concurrency_config.get("pools", {}).get("granularity") + if granularity and granularity not in ["run", "op"]: + raise DagsterInvalidConfigError( + f"Found value `{granularity}` for `granularity`, Expected value 'run' or 'op'.", + [], + None, + ) if "run_queue" in dagster_config_dict: verify_config_match( diff --git a/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py b/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py index e897d838f4dbf..b71f1c7f75e30 100644 --- a/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py +++ b/python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py @@ -1,11 +1,13 @@ import os from collections import defaultdict from collections.abc import Mapping, Sequence -from typing import Optional +from typing import TYPE_CHECKING, Optional from dagster._core.instance import DagsterInstance +from dagster._core.run_coordinator.queued_run_coordinator import PoolGranularity from dagster._core.snap.execution_plan_snapshot import ExecutionPlanSnapshot from dagster._core.storage.dagster_run import ( + IN_PROGRESS_RUN_STATUSES, DagsterRun, DagsterRunStatus, RunOpConcurrency, @@ -13,6 +15,9 @@ ) from dagster._time import get_current_timestamp +if TYPE_CHECKING: + from dagster._utils.concurrency import ConcurrencyKeyInfo + def compute_run_op_concurrency_info_for_snapshot( plan_snapshot: ExecutionPlanSnapshot, @@ -23,21 +28,26 @@ def compute_run_op_concurrency_info_for_snapshot( root_step_keys = set( [step_key for step_key, deps in plan_snapshot.step_deps.items() if not deps] ) - pool_counts: Mapping[str, int] = defaultdict(int) + root_pool_counts: Mapping[str, int] = defaultdict(int) + all_pools: set[str] = set() has_unconstrained_root_nodes = False for step in plan_snapshot.steps: - if step.key not in root_step_keys: - continue - if step.pool is None: + if step.pool is None and step.key in root_step_keys: has_unconstrained_root_nodes = True + elif step.pool is None: + continue + elif step.key in root_step_keys: + root_pool_counts[step.pool] += 1 + all_pools.add(step.pool) else: - pool_counts[step.pool] += 1 + all_pools.add(step.pool) - if len(pool_counts) == 0: + if len(all_pools) == 0: return None return RunOpConcurrency( - root_key_counts=dict(pool_counts), + all_pools=all_pools, + root_key_counts=dict(root_pool_counts), has_unconstrained_root_nodes=has_unconstrained_root_nodes, ) @@ -49,12 +59,14 @@ def __init__( runs: Sequence[DagsterRun], in_progress_run_records: Sequence[RunRecord], slot_count_offset: int = 0, + pool_granularity: PoolGranularity = PoolGranularity.OP, ): self._root_pools_by_run = {} - self._concurrency_info_by_pool = {} + self._concurrency_info_by_key: dict[str, ConcurrencyKeyInfo] = {} self._launched_pool_counts = defaultdict(int) self._in_progress_pool_counts = defaultdict(int) self._slot_count_offset = slot_count_offset + self._pool_granularity = pool_granularity self._in_progress_run_ids: set[str] = set( [record.dagster_run.run_id for record in in_progress_run_records] ) @@ -66,77 +78,145 @@ def __init__( # priority order self._fetch_concurrency_info(instance, runs) - # fetch all the outstanding concurrency keys for in-progress runs + # fetch all the outstanding pools for in-progress runs self._process_in_progress_runs(in_progress_run_records) def _fetch_concurrency_info(self, instance: DagsterInstance, queued_runs: Sequence[DagsterRun]): - # fetch all the concurrency slot information for the root concurrency keys of all the queued - # runs - all_run_pools = set() + # fetch all the concurrency slot information for all the queued runs + all_pools = set() configured_pools = instance.event_log_storage.get_concurrency_keys() for run in queued_runs: if run.run_op_concurrency: - all_run_pools.update(run.run_op_concurrency.root_key_counts.keys()) - - for key in all_run_pools: - if key is None: + # if using run granularity, consider all the concurrency keys required by the run + # if using op granularity, consider only the root keys + run_pools = ( + run.run_op_concurrency.root_key_counts.keys() + if self._pool_granularity == PoolGranularity.OP + else run.run_op_concurrency.all_pools or [] + ) + all_pools.update(run_pools) + + for pool in all_pools: + if pool is None: continue - if key not in configured_pools: - instance.event_log_storage.initialize_concurrency_limit_to_default(key) + if pool not in configured_pools: + instance.event_log_storage.initialize_concurrency_limit_to_default(pool) - self._concurrency_info_by_pool[key] = instance.event_log_storage.get_concurrency_info( - key + self._concurrency_info_by_key[pool] = instance.event_log_storage.get_concurrency_info( + pool ) - def _should_allocate_slots_for_root_pools(self, record: RunRecord): + def _should_allocate_slots_for_in_progress_run(self, record: RunRecord): + if not record.dagster_run.run_op_concurrency: + return False + status = record.dagster_run.status + if status not in IN_PROGRESS_RUN_STATUSES: + return False + + if self._pool_granularity == PoolGranularity.RUN: + return True + if status == DagsterRunStatus.STARTING: return True + if status != DagsterRunStatus.STARTED or not record.start_time: return False + time_elapsed = get_current_timestamp() - record.start_time if time_elapsed < self._started_run_pools_allotted_seconds: return True + def _slot_counts_for_run(self, run: DagsterRun) -> Mapping[str, int]: + if not run.run_op_concurrency: + return {} + + if self._pool_granularity == PoolGranularity.OP: + return {**run.run_op_concurrency.root_key_counts} + + else: + assert self._pool_granularity == PoolGranularity.RUN + return {pool: 1 for pool in run.run_op_concurrency.all_pools or []} + def _process_in_progress_runs(self, in_progress_records: Sequence[RunRecord]): for record in in_progress_records: - if ( - self._should_allocate_slots_for_root_pools(record) - and record.dagster_run.run_op_concurrency - ): - for ( - pool, - count, - ) in record.dagster_run.run_op_concurrency.root_key_counts.items(): - self._in_progress_pool_counts[pool] += count + if not self._should_allocate_slots_for_in_progress_run(record): + continue + + for pool, count in self._slot_counts_for_run(record.dagster_run).items(): + self._in_progress_pool_counts[pool] += count def is_blocked(self, run: DagsterRun) -> bool: # if any of the ops in the run can make progress (not blocked by concurrency keys), we # should dequeue - if not run.run_op_concurrency or run.run_op_concurrency.has_unconstrained_root_nodes: - # if there exists a root node that is not concurrency blocked, we should dequeue. + if not run.run_op_concurrency: return False - for pool in run.run_op_concurrency.root_key_counts.keys(): - if pool not in self._concurrency_info_by_pool: - # there is no concurrency limit set for this key, we should dequeue - return False - - key_info = self._concurrency_info_by_pool[pool] - available_count = ( - key_info.slot_count - - len(key_info.pending_steps) - - self._launched_pool_counts[pool] - - self._in_progress_pool_counts[pool] - ) - if available_count > -1 * self._slot_count_offset: - # there exists a root concurrency key that is not blocked, we should dequeue - return False + if ( + self._pool_granularity == PoolGranularity.OP + and run.run_op_concurrency.has_unconstrained_root_nodes + ): + # if the granularity is at the op level and there exists a root node that is not + # concurrency blocked, we should dequeue. + return False - # if we reached here, then every root concurrency key is blocked, so we should not dequeue - return True + if self._pool_granularity == PoolGranularity.OP: + # we just need to check all of the root concurrency keys, instead of all the concurrency keys + # in the run + for pool in run.run_op_concurrency.root_key_counts.keys(): + if pool not in self._concurrency_info_by_key: + # there is no concurrency limit set for this key, we should dequeue + return False + + key_info = self._concurrency_info_by_key[pool] + unaccounted_occupied_slots = [ + pending_step + for pending_step in key_info.pending_steps + if pending_step.run_id not in self._in_progress_run_ids + ] + available_count = ( + key_info.slot_count + - len(unaccounted_occupied_slots) + - self._launched_pool_counts[pool] + - self._in_progress_pool_counts[pool] + ) + if available_count + self._slot_count_offset > 0: + # there exists a root concurrency key that is not blocked, we should dequeue + return False + + # if we reached here, then every root concurrency key is blocked, so we should not dequeue + return True + + else: + assert self._pool_granularity == PoolGranularity.RUN + + # if the granularity is at the run level, we should check if any of the concurrency + # keys are blocked + for pool in run.run_op_concurrency.all_pools or []: + if pool not in self._concurrency_info_by_key: + # there is no concurrency limit set for this key + continue + + key_info = self._concurrency_info_by_key[pool] + unaccounted_occupied_slots = [ + pending_step + for pending_step in key_info.pending_steps + if pending_step.run_id not in self._in_progress_run_ids + ] + available_count = ( + key_info.slot_count + - len(unaccounted_occupied_slots) + - self._launched_pool_counts[pool] + - self._in_progress_pool_counts[pool] + ) + if available_count + self._slot_count_offset <= 0: + return True + + # if we reached here then there is at least one available slot for every single concurrency key + # required by this run, so we should dequeue + return False def get_blocked_run_debug_info(self, run: DagsterRun) -> Mapping: if not run.run_op_concurrency: @@ -144,11 +224,12 @@ def get_blocked_run_debug_info(self, run: DagsterRun) -> Mapping: log_info = {} for pool in run.run_op_concurrency.root_key_counts.keys(): - concurrency_info = self._concurrency_info_by_pool.get(pool) + concurrency_info = self._concurrency_info_by_key.get(pool) if not concurrency_info: continue log_info[pool] = { + "granularity": self._pool_granularity.value, "slot_count": concurrency_info.slot_count, "pending_step_count": len(concurrency_info.pending_steps), "pending_step_run_ids": list( @@ -160,8 +241,5 @@ def get_blocked_run_debug_info(self, run: DagsterRun) -> Mapping: return log_info def update_counters_with_launched_item(self, run: DagsterRun): - if not run.run_op_concurrency: - return - for pool, count in run.run_op_concurrency.root_key_counts.items(): - if pool: - self._launched_pool_counts[pool] += count + for pool, count in self._slot_counts_for_run(run).items(): + self._launched_pool_counts[pool] += count diff --git a/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py b/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py index 90ba66764e696..0e30bc678cc5c 100644 --- a/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py +++ b/python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py @@ -1,5 +1,6 @@ import logging from collections.abc import Mapping, Sequence +from enum import Enum from typing import Any, NamedTuple, Optional from typing_extensions import Self @@ -20,6 +21,11 @@ from dagster._serdes import ConfigurableClass, ConfigurableClassData +class PoolGranularity(Enum): + OP = "op" + RUN = "run" + + class RunQueueConfig( NamedTuple( "_RunQueueConfig", @@ -30,6 +36,7 @@ class RunQueueConfig( ("user_code_failure_retry_delay", int), ("should_block_op_concurrency_limited_runs", bool), ("op_concurrency_slot_buffer", int), + ("pool_granularity", Optional[PoolGranularity]), ], ) ): @@ -41,6 +48,7 @@ def __new__( user_code_failure_retry_delay: int = 60, should_block_op_concurrency_limited_runs: bool = False, op_concurrency_slot_buffer: int = 0, + pool_granularity: Optional[PoolGranularity] = None, ): return super().__new__( cls, @@ -52,6 +60,7 @@ def __new__( should_block_op_concurrency_limited_runs, "should_block_op_concurrency_limited_runs" ), check.int_param(op_concurrency_slot_buffer, "op_concurrency_slot_buffer"), + check.opt_inst_param(pool_granularity, "pool_granularity", PoolGranularity), ) def with_concurrency_settings( @@ -59,6 +68,11 @@ def with_concurrency_settings( ) -> "RunQueueConfig": run_settings = concurrency_settings.get("runs", {}) pool_settings = concurrency_settings.get("pools", {}) + pool_granularity = ( + PoolGranularity(pool_settings.get("granularity")) + if pool_settings.get("granularity") + else self.pool_granularity + ) return RunQueueConfig( max_concurrent_runs=run_settings.get("max_concurrent_runs", self.max_concurrent_runs), tag_concurrency_limits=run_settings.get( @@ -66,10 +80,12 @@ def with_concurrency_settings( ), max_user_code_failure_retries=self.max_user_code_failure_retries, user_code_failure_retry_delay=self.user_code_failure_retry_delay, - should_block_op_concurrency_limited_runs=self.should_block_op_concurrency_limited_runs, + should_block_op_concurrency_limited_runs=bool(pool_settings) + or self.should_block_op_concurrency_limited_runs, op_concurrency_slot_buffer=pool_settings.get( "op_run_buffer", self.op_concurrency_slot_buffer ), + pool_granularity=pool_granularity, ) @@ -134,7 +150,17 @@ def __init__( "op_concurrency_slot_buffer can only be set if block_op_concurrency_limited_runs " "is enabled", ) - + self._pool_granularity: Optional[PoolGranularity] = ( + PoolGranularity(block_op_concurrency_limited_runs.get("pool_granularity", "op")) + if block_op_concurrency_limited_runs + else None + ) + if self._pool_granularity: + check.invariant( + self._should_block_op_concurrency_limited_runs, + "pool_granularity can only be set if block_op_concurrency_limited_runs " + "is enabled", + ) self._logger = logging.getLogger("dagster.run_coordinator.queued_run_coordinator") super().__init__() @@ -150,6 +176,7 @@ def get_run_queue_config(self) -> RunQueueConfig: user_code_failure_retry_delay=self._user_code_failure_retry_delay, should_block_op_concurrency_limited_runs=self._should_block_op_concurrency_limited_runs, op_concurrency_slot_buffer=self._op_concurrency_slot_buffer, + pool_granularity=self._pool_granularity, ) @property @@ -268,6 +295,16 @@ def config_type(cls) -> UserConfigSchema: "free." ), ), + "pool_granularity": Field( + str, + is_required=False, + description=( + "Determines the granularity at which concurrency limits are applied. If set to " + "'op', dequeues runs as long as any op in the run can make progress. If set to " + "'run', dequeues runs as long as all of the concurrency groups assigned to the run " + "have free slots available." + ), + ), } ), } @@ -307,6 +344,7 @@ def submit_run(self, context: SubmitRunContext) -> DagsterRun: run = self._instance.get_run_by_id(dagster_run.run_id) if run is None: check.failed(f"Failed to reload run {dagster_run.run_id}") + assert run return run def cancel_run(self, run_id: str) -> bool: diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 19b2e7728418c..42cec16cb6348 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -162,6 +162,7 @@ class RunOpConcurrency( [ ("root_key_counts", Mapping[str, int]), ("has_unconstrained_root_nodes", bool), + ("all_pools", Optional[set[str]]), ], ) ): @@ -173,6 +174,7 @@ def __new__( cls, root_key_counts: Mapping[str, int], has_unconstrained_root_nodes: bool, + all_pools: Optional[set[str]] = None, ): return super().__new__( cls, @@ -182,6 +184,7 @@ def __new__( has_unconstrained_root_nodes=check.bool_param( has_unconstrained_root_nodes, "has_unconstrained_root_nodes" ), + all_pools=check.opt_set_param(all_pools, "all_pools", of_type=str), ) diff --git a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py index 3d90c60f42aee..1e274a6933bd3 100644 --- a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py +++ b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py @@ -272,6 +272,7 @@ def _get_runs_to_dequeue( batch, in_progress_run_records, run_queue_config.op_concurrency_slot_buffer, + run_queue_config.pool_granularity, ) except: self._logger.exception("Failed to initialize op concurrency counter") diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml index bc7cd583fa308..be1eb40462d4a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_coordinator_concurrency.yaml @@ -1,6 +1,6 @@ concurrency: pools: - granularity: op + granularity: run op_run_buffer: 1 runs: max_concurrent_runs: 5 @@ -20,3 +20,4 @@ run_coordinator: block_op_concurrency_limited_runs: enabled: true op_concurrency_slot_buffer: 1 + pool_granularity: run diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml index d61e0b8fd0854..ed1acebef4045 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_config/merged_run_queue_concurrency.yaml @@ -1,6 +1,6 @@ concurrency: pools: - granularity: op + granularity: run op_run_buffer: 1 runs: max_concurrent_runs: 5 @@ -17,3 +17,4 @@ run_queue: block_op_concurrency_limited_runs: enabled: true op_concurrency_slot_buffer: 1 + pool_granularity: run diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py index 5028b2f1f1b0d..01b226c4f6a72 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance_config.py @@ -2,6 +2,7 @@ from dagster import file_relative_path from dagster._core.errors import DagsterInvalidConfigError from dagster._core.instance.config import dagster_instance_config +from dagster._core.run_coordinator.queued_run_coordinator import PoolGranularity from dagster._core.test_utils import environ, instance_for_test @@ -38,6 +39,7 @@ def test_concurrency_config(config_filename, caplog): assert run_queue_config.user_code_failure_retry_delay == 10 assert run_queue_config.should_block_op_concurrency_limited_runs assert run_queue_config.op_concurrency_slot_buffer == 1 + assert run_queue_config.pool_granularity == PoolGranularity.RUN @pytest.mark.parametrize( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py index c9f4b8693228c..7e05fc640d924 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py @@ -3,18 +3,18 @@ from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG -@asset(op_tags={GLOBAL_CONCURRENCY_TAG: "foo"}, key_prefix=["prefix"]) +@asset(pool="foo", key_prefix=["prefix"]) def foo_limited_asset(): return 1 -@asset(op_tags={GLOBAL_CONCURRENCY_TAG: "bar"}, key_prefix=["prefix"]) +@asset(pool="bar", key_prefix=["prefix"]) def bar_limited_asset(): return 1 @asset( - op_tags={GLOBAL_CONCURRENCY_TAG: "baz"}, + pool="baz", key_prefix=["prefix"], ins={"foo_limited_asset": AssetIn(key_prefix="prefix")}, ) @@ -22,12 +22,17 @@ def baz_limited_asset_depends_on_foo(foo_limited_asset): return 1 +@asset(pool="baz", key_prefix=["prefix"]) +def baz_limited_asset(): + return 1 + + concurrency_limited_asset_job = define_asset_job( "concurrency_limited_asset_job", - [foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo], + [foo_limited_asset, bar_limited_asset, baz_limited_asset, baz_limited_asset_depends_on_foo], ).resolve( asset_graph=AssetGraph.from_assets( - [foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo] + [foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo, baz_limited_asset] ) ) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py b/python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py index e6a5f44baf9e3..b50700ec8d7d7 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py @@ -1180,6 +1180,60 @@ def test_concurrency_buffer_with_default_slot( assert set(self.get_run_ids(instance.run_launcher.queue())) == {run_id_1} caplog.text.count(f"Run {run_id_2} is blocked by global concurrency limits") == 1 # pyright: ignore[reportUnusedExpression] + @pytest.mark.parametrize( + "run_coordinator_config", + [ + { + "block_op_concurrency_limited_runs": { + "enabled": True, + "pool_granularity": "run", + }, + }, + ], + ) + def test_concurrency_run_granularity( + self, + concurrency_limited_workspace_context, + daemon, + instance, + ): + run_id_1, run_id_2 = [make_new_run_id() for _ in range(2)] + workspace = concurrency_limited_workspace_context.create_request_context() + # concurrency_limited_asset_job + remote_job = self.get_concurrency_job(workspace) + foo_key = AssetKey(["prefix", "foo_limited_asset"]) + bar_key = AssetKey(["prefix", "bar_limited_asset"]) + baz_key = AssetKey(["prefix", "baz_limited_asset"]) + downstream_baz_key = AssetKey(["prefix", "baz_limited_asset_depends_on_foo"]) + + # first submit a run that occupies the baz slot + self.submit_run( + instance, remote_job, workspace, run_id=run_id_1, asset_selection=set([baz_key]) + ) + list(daemon.run_iteration(concurrency_limited_workspace_context)) + assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1]) + + # submit a run that has 2 root nodes, respectively with foo, bar concurrency groups + # also with a downstream extra baz concurrency group asset + self.submit_run( + instance, + remote_job, + workspace, + run_id=run_id_2, + asset_selection=set([foo_key, bar_key, downstream_baz_key]), + ) + + # even though the root nodes have slots available, the second run is not launched + list(daemon.run_iteration(concurrency_limited_workspace_context)) + assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1]) + + instance.event_log_storage.set_concurrency_slots("baz", 2) + + # all group slots available, run launched, even though there is only one slot open for baz + # and the run has two baz-grouped nodes + list(daemon.run_iteration(concurrency_limited_workspace_context)) + assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1, run_id_2]) + class TestQueuedRunCoordinatorDaemon(QueuedRunCoordinatorDaemonTests): @pytest.fixture