From e51874eb1e80f1f7fb6286fb15f53e586431dad2 Mon Sep 17 00:00:00 2001 From: prha Date: Wed, 11 Dec 2024 13:37:00 -0800 Subject: [PATCH] group --- .../decorators/asset_check_decorator.py | 2 +- .../definitions/decorators/asset_decorator.py | 18 +++++------ .../decorator_assets_definition_builder.py | 2 +- .../definitions/decorators/op_decorator.py | 12 ++++---- .../decorators/source_asset_decorator.py | 2 +- .../_core/definitions/op_definition.py | 30 +++++++++---------- .../dagster/_core/execution/plan/active.py | 8 ++--- .../dagster/_core/execution/plan/plan.py | 12 ++++---- .../dagster/_core/execution/plan/step.py | 24 +++++++-------- .../_core/snap/execution_plan_snapshot.py | 14 ++++----- .../core_tests/execution_tests/test_active.py | 6 ++-- .../engine_tests/test_op_concurrency.py | 10 +++---- 12 files changed, 70 insertions(+), 70 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py index f690a37e47bee..2e7b740c37f34 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py @@ -240,7 +240,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition: asset_in_map={}, asset_out_map={}, execution_type=None, - concurrency_key=None, + concurrency_group=None, ) builder = DecoratorAssetsDefinitionBuilder( diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 5edb48f1972d9..f75487f40db76 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -106,7 +106,7 @@ def asset( check_specs: Optional[Sequence[AssetCheckSpec]] = ..., owners: Optional[Sequence[str]] = ..., kinds: Optional[AbstractSet[str]] = ..., - concurrency_key: Optional[str] = ..., + concurrency_group: Optional[str] = ..., **kwargs, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -184,7 +184,7 @@ def asset( check_specs: Optional[Sequence[AssetCheckSpec]] = None, owners: Optional[Sequence[str]] = None, kinds: Optional[AbstractSet[str]] = None, - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, **kwargs, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Create a definition for how to compute an asset. @@ -260,7 +260,7 @@ def asset( e.g. `team:finops`. kinds (Optional[Set[str]]): A list of strings representing the kinds of the asset. These will be made visible in the Dagster UI. - concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs + concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs this asset's execution. non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the asset. @@ -322,7 +322,7 @@ def my_asset(my_upstream_asset: int) -> int: check_specs=check_specs, key=key, owners=owners, - concurrency_key=concurrency_key, + concurrency_group=concurrency_group, ) if compute_fn is not None: @@ -395,7 +395,7 @@ class AssetDecoratorArgs(NamedTuple): key: Optional[CoercibleToAssetKey] check_specs: Optional[Sequence[AssetCheckSpec]] owners: Optional[Sequence[str]] - concurrency_key: Optional[str] + concurrency_group: Optional[str] class ResourceRelatedState(NamedTuple): @@ -520,7 +520,7 @@ def create_assets_def_from_fn_and_decorator_args( can_subset=False, decorator_name="@asset", execution_type=AssetExecutionType.MATERIALIZATION, - concurrency_key=args.concurrency_key, + concurrency_group=args.concurrency_group, ) builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator( @@ -567,7 +567,7 @@ def multi_asset( code_version: Optional[str] = None, specs: Optional[Sequence[AssetSpec]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, **kwargs: Any, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a combined definition of multiple assets that are computed using the same op and same @@ -622,7 +622,7 @@ def multi_asset( by this function. check_specs (Optional[Sequence[AssetCheckSpec]]): Specs for asset checks that execute in the decorated function after materializing the assets. - concurrency_key (Optional[str]): A string that identifies the concurrency limit group that + concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs this multi-asset's execution. non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the @@ -699,7 +699,7 @@ def my_function(asset0): backfill_policy=backfill_policy, decorator_name="@multi_asset", execution_type=AssetExecutionType.MATERIALIZATION, - concurrency_key=concurrency_key, + concurrency_group=concurrency_group, ) def inner(fn: Callable[..., Any]) -> AssetsDefinition: diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py index 74f637bb462fa..ebe39fbf7a99f 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py @@ -237,7 +237,7 @@ class DecoratorAssetsDefinitionBuilderArgs(NamedTuple): specs: Sequence[AssetSpec] upstream_asset_deps: Optional[Iterable[AssetDep]] execution_type: Optional[AssetExecutionType] - concurrency_key: Optional[str] + concurrency_group: Optional[str] @property def check_specs(self) -> Sequence[AssetCheckSpec]: diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py index cc58d7ad6331f..7df0a57a158e9 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py @@ -52,7 +52,7 @@ def __init__( retry_policy: Optional[RetryPolicy] = None, ins: Optional[Mapping[str, In]] = None, out: Optional[Union[Out, Mapping[str, Out]]] = None, - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, ): self.name = check.opt_str_param(name, "name") self.decorator_takes_context = check.bool_param( @@ -66,7 +66,7 @@ def __init__( self.tags = tags self.code_version = code_version self.retry_policy = retry_policy - self.concurrency_key = concurrency_key + self.concurrency_group = concurrency_group # config will be checked within OpDefinition self.config_schema = config_schema @@ -134,7 +134,7 @@ def __call__(self, fn: Callable[..., Any]) -> "OpDefinition": code_version=self.code_version, retry_policy=self.retry_policy, version=None, # code_version has replaced version - concurrency_key=self.concurrency_key, + concurrency_group=self.concurrency_group, ) update_wrapper(op_def, compute_fn.decorated_fn) return op_def @@ -157,7 +157,7 @@ def op( version: Optional[str] = ..., retry_policy: Optional[RetryPolicy] = ..., code_version: Optional[str] = ..., - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, ) -> _Op: ... @@ -177,7 +177,7 @@ def op( version: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, ) -> Union["OpDefinition", _Op]: """Create an op with the specified parameters from the decorated function. @@ -271,7 +271,7 @@ def multi_out() -> Tuple[str, int]: retry_policy=retry_policy, ins=ins, out=out, - concurrency_key=concurrency_key, + concurrency_group=concurrency_group, ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py index 518878e131d8e..68aba787497d0 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py @@ -300,7 +300,7 @@ def my_function(): backfill_policy=None, decorator_name="@multi_observable_source_asset", execution_type=AssetExecutionType.OBSERVATION, - concurrency_key=None, + concurrency_group=None, ) def inner(fn: Callable[..., Any]) -> AssetsDefinition: diff --git a/python_modules/dagster/dagster/_core/definitions/op_definition.py b/python_modules/dagster/dagster/_core/definitions/op_definition.py index ee295d0f89110..65ba994d8b470 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/op_definition.py @@ -91,7 +91,7 @@ class OpDefinition(NodeDefinition, IHasInternalInit): code_version (Optional[str]): (Experimental) Version of the code encapsulated by the op. If set, this is used as a default code version for all outputs. retry_policy (Optional[RetryPolicy]): The retry policy for this op. - concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs + concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs this op's execution. @@ -114,7 +114,7 @@ def _add_one(_context, inputs): _required_resource_keys: AbstractSet[str] _version: Optional[str] _retry_policy: Optional[RetryPolicy] - _concurrency_key: Optional[str] + _concurrency_group: Optional[str] def __init__( self, @@ -129,7 +129,7 @@ def __init__( version: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, - concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, ): from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, @@ -174,7 +174,7 @@ def __init__( check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str) ) self._retry_policy = check.opt_inst_param(retry_policy, "retry_policy", RetryPolicy) - self._concurrency_key = _validate_concurrency_key(concurrency_key, tags) + self._concurrency_group = _validate_concurrency_group(concurrency_group, tags) positional_inputs = ( self._compute_fn.positional_inputs() @@ -204,7 +204,7 @@ def dagster_internal_init( version: Optional[str], retry_policy: Optional[RetryPolicy], code_version: Optional[str], - concurrency_key: Optional[str], + concurrency_group: Optional[str], ) -> "OpDefinition": return OpDefinition( compute_fn=compute_fn, @@ -218,7 +218,7 @@ def dagster_internal_init( version=version, retry_policy=retry_policy, code_version=code_version, - concurrency_key=concurrency_key, + concurrency_group=concurrency_group, ) @property @@ -305,9 +305,9 @@ def with_retry_policy(self, retry_policy: RetryPolicy) -> "PendingNodeInvocation return super(OpDefinition, self).with_retry_policy(retry_policy) @property - def concurrency_key(self) -> Optional[str]: + def concurrency_group(self) -> Optional[str]: """Optional[str]: The concurrency key for this op.""" - return self._concurrency_key + return self._concurrency_group def is_from_decorator(self) -> bool: from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction @@ -393,7 +393,7 @@ def with_replaced_properties( code_version=self._version, retry_policy=self.retry_policy, version=None, # code_version replaces version - concurrency_key=self.concurrency_key, + concurrency_group=self.concurrency_group, ) def copy_for_configured( @@ -602,19 +602,19 @@ def _is_result_object_type(ttype): return ttype in (MaterializeResult, ObserveResult, AssetCheckResult) -def _validate_concurrency_key(concurrency_key, tags): +def _validate_concurrency_group(concurrency_group, tags): from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG - check.opt_str_param(concurrency_key, "concurrency_key") + check.opt_str_param(concurrency_group, "concurrency_group") tags = check.opt_mapping_param(tags, "tags") tag_concurrency_key = tags.get(GLOBAL_CONCURRENCY_TAG) - if concurrency_key and tag_concurrency_key and concurrency_key != tag_concurrency_key: + if concurrency_group and tag_concurrency_key and concurrency_group != tag_concurrency_key: raise DagsterInvalidDefinitionError( - f'Concurrency key "{concurrency_key}" that conflicts with the concurrency key tag "{tag_concurrency_key}".' + f'Concurrency group "{concurrency_group}" that conflicts with the concurrency key tag "{tag_concurrency_key}".' ) - if concurrency_key: - return concurrency_key + if concurrency_group: + return concurrency_group if tag_concurrency_key: return tag_concurrency_key diff --git a/python_modules/dagster/dagster/_core/execution/plan/active.py b/python_modules/dagster/dagster/_core/execution/plan/active.py index 195924f538149..f85d1aa09ef4d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/active.py +++ b/python_modules/dagster/dagster/_core/execution/plan/active.py @@ -350,14 +350,14 @@ def get_steps_to_execute( if run_scoped_concurrency_limits_counter: run_scoped_concurrency_limits_counter.update_counters_with_launched_item(step) - if step.concurrency_key and self._instance_concurrency_context: + if step.concurrency_group and self._instance_concurrency_context: try: step_priority = int(step.tags.get(PRIORITY_TAG, 0)) except ValueError: step_priority = 0 if not self._instance_concurrency_context.claim( - step.concurrency_key, step.key, step_priority + step.concurrency_group, step.key, step_priority ): continue @@ -655,9 +655,9 @@ def concurrency_event_iterator( ): step = self.get_step_by_key(step_key) step_context = plan_context.for_step(step) - step_concurrency_key = cast(str, step.concurrency_key) + concurrency_group = cast(str, step.concurrency_group) self._messaged_concurrency_slots[step_key] = time.time() is_initial_message = last_messaged_timestamp is None yield DagsterEvent.step_concurrency_blocked( - step_context, step_concurrency_key, initial=is_initial_message + step_context, concurrency_group, initial=is_initial_message ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/plan.py b/python_modules/dagster/dagster/_core/execution/plan/plan.py index 6ee99beda0b1b..796e054985807 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/plan.py +++ b/python_modules/dagster/dagster/_core/execution/plan/plan.py @@ -316,7 +316,7 @@ def _build_from_sorted_nodes( ), step_outputs=step_outputs, tags=node.tags, - concurrency_key=node.definition.concurrency_key, + concurrency_group=node.definition.concurrency_group, ) elif has_pending_input: new_step = UnresolvedCollectExecutionStep( @@ -327,7 +327,7 @@ def _build_from_sorted_nodes( ), step_outputs=step_outputs, tags=node.tags, - concurrency_key=node.definition.concurrency_key, + concurrency_group=node.definition.concurrency_group, ) else: new_step = ExecutionStep( @@ -336,7 +336,7 @@ def _build_from_sorted_nodes( step_inputs=cast(List[StepInput], step_inputs), step_outputs=step_outputs, tags=node.tags, - concurrency_key=node.definition.concurrency_key, + concurrency_group=node.definition.concurrency_group, ) self.add_step(new_step) @@ -1031,7 +1031,7 @@ def rebuild_from_snapshot( step_inputs, # type: ignore # (plain StepInput only) step_outputs, step_snap.tags, - step_snap.concurrency_key, + step_snap.concurrency_group, ) elif step_snap.kind == StepKind.UNRESOLVED_MAPPED: step = UnresolvedMappedExecutionStep( @@ -1043,7 +1043,7 @@ def rebuild_from_snapshot( step_inputs, # type: ignore # (StepInput or UnresolvedMappedStepInput only) step_outputs, step_snap.tags, - step_snap.concurrency_key, + step_snap.concurrency_group, ) elif step_snap.kind == StepKind.UNRESOLVED_COLLECT: step = UnresolvedCollectExecutionStep( @@ -1052,7 +1052,7 @@ def rebuild_from_snapshot( step_inputs, # type: ignore # (StepInput or UnresolvedCollectStepInput only) step_outputs, step_snap.tags, - step_snap.concurrency_key, + step_snap.concurrency_group, ) else: raise Exception(f"Unexpected step kind {step_snap.kind}") diff --git a/python_modules/dagster/dagster/_core/execution/plan/step.py b/python_modules/dagster/dagster/_core/execution/plan/step.py index c8fc2be645d5a..b1e0d36f517c8 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/step.py @@ -90,7 +90,7 @@ def tags(self) -> Optional[Mapping[str, str]]: @property @abstractmethod - def concurrency_key(self) -> Optional[str]: + def concurrency_group(self) -> Optional[str]: pass @property @@ -137,7 +137,7 @@ class ExecutionStep( ("tags", Mapping[str, str]), ("logging_tags", Mapping[str, str]), ("key", str), - ("concurrency_key", Optional[str]), + ("concurrency_group", Optional[str]), ], ), IExecutionStep, @@ -151,7 +151,7 @@ def __new__( step_inputs: Sequence[StepInput], step_outputs: Sequence[StepOutput], tags: Optional[Mapping[str, str]], - concurrency_key: Optional[str], + concurrency_group: Optional[str], logging_tags: Optional[Mapping[str, str]] = None, key: Optional[str] = None, ): @@ -168,7 +168,7 @@ def __new__( for so in check.sequence_param(step_outputs, "step_outputs", of_type=StepOutput) }, tags=tags or {}, - concurrency_key=check.opt_str_param(concurrency_key, "concurrency_key"), + concurrency_group=check.opt_str_param(concurrency_group, "concurrency_group"), logging_tags=merge_dicts( { "step_key": handle.to_key(), @@ -239,7 +239,7 @@ class UnresolvedMappedExecutionStep( ("step_input_dict", Mapping[str, Union[StepInput, UnresolvedMappedStepInput]]), ("step_output_dict", Mapping[str, StepOutput]), ("tags", Mapping[str, str]), - ("concurrency_key", Optional[str]), + ("concurrency_group", Optional[str]), ], ), IExecutionStep, @@ -253,7 +253,7 @@ def __new__( step_inputs: Sequence[Union[StepInput, UnresolvedMappedStepInput]], step_outputs: Sequence[StepOutput], tags: Optional[Mapping[str, str]], - concurrency_key: Optional[str], + concurrency_group: Optional[str], ): return super(UnresolvedMappedExecutionStep, cls).__new__( cls, @@ -270,7 +270,7 @@ def __new__( for so in check.sequence_param(step_outputs, "step_outputs", of_type=StepOutput) }, tags=check.opt_mapping_param(tags, "tags", key_type=str), - concurrency_key=check.opt_str_param(concurrency_key, "concurrency_key"), + concurrency_group=check.opt_str_param(concurrency_group, "concurrency_group"), ) @property @@ -375,7 +375,7 @@ def resolve( step_inputs=resolved_inputs, step_outputs=self.step_outputs, tags=self.tags, - concurrency_key=self.concurrency_key, + concurrency_group=self.concurrency_group, ) ) @@ -401,7 +401,7 @@ class UnresolvedCollectExecutionStep( ("step_input_dict", Mapping[str, Union[StepInput, UnresolvedCollectStepInput]]), ("step_output_dict", Mapping[str, StepOutput]), ("tags", Mapping[str, str]), - ("concurrency_key", Optional[str]), + ("concurrency_group", Optional[str]), ], ), IExecutionStep, @@ -415,7 +415,7 @@ def __new__( step_inputs: Sequence[Union[StepInput, UnresolvedCollectStepInput]], step_outputs: Sequence[StepOutput], tags: Optional[Mapping[str, str]], - concurrency_key: Optional[str], + concurrency_group: Optional[str], ): return super(UnresolvedCollectExecutionStep, cls).__new__( cls, @@ -432,7 +432,7 @@ def __new__( for so in check.sequence_param(step_outputs, "step_outputs", of_type=StepOutput) }, tags=check.opt_mapping_param(tags, "tags", key_type=str), - concurrency_key=check.opt_str_param(concurrency_key, "concurrency_key"), + concurrency_group=check.opt_str_param(concurrency_group, "concurrency_group"), ) @property @@ -514,5 +514,5 @@ def resolve( step_inputs=resolved_inputs, step_outputs=self.step_outputs, tags=self.tags, - concurrency_key=self.concurrency_key, + concurrency_group=self.concurrency_group, ) diff --git a/python_modules/dagster/dagster/_core/snap/execution_plan_snapshot.py b/python_modules/dagster/dagster/_core/snap/execution_plan_snapshot.py index 188868da98b35..7421b93fffef0 100644 --- a/python_modules/dagster/dagster/_core/snap/execution_plan_snapshot.py +++ b/python_modules/dagster/dagster/_core/snap/execution_plan_snapshot.py @@ -149,7 +149,7 @@ class ExecutionStepSnap( ("metadata_items", Sequence["ExecutionPlanMetadataItemSnap"]), ("tags", Optional[Mapping[str, str]]), ("step_handle", Optional[StepHandleUnion]), - ("step_concurrency_key", Optional[str]), + ("concurrency_group", Optional[str]), ], ) ): @@ -163,7 +163,7 @@ def __new__( metadata_items: Sequence["ExecutionPlanMetadataItemSnap"], tags: Optional[Mapping[str, str]] = None, step_handle: Optional[StepHandleUnion] = None, - step_concurrency_key: Optional[str] = None, + concurrency_group: Optional[str] = None, ): return super(ExecutionStepSnap, cls).__new__( cls, @@ -177,18 +177,18 @@ def __new__( ), tags=check.opt_nullable_mapping_param(tags, "tags", key_type=str, value_type=str), step_handle=check.opt_inst_param(step_handle, "step_handle", StepHandleTypes), - # stores the concurrency key arg as separate from the concurrency_key property since the + # stores the concurrency group arg as separate from the concurrency_key property since the # snapshot may have been generated before concurrency_key was added as a separate # argument - step_concurrency_key=check.opt_str_param(step_concurrency_key, "step_concurrency_key"), + concurrency_group=check.opt_str_param(concurrency_group, "concurrency_group"), ) @property def concurrency_key(self): # Need to sthere in case the snapshot was created before concurrency_key was added as # a separate argument from tags - if self.step_concurrency_key: - return self.step_concurrency_key + if self.concurrency_group: + return self.concurrency_group if not self.tags: return None return self.tags.get(GLOBAL_CONCURRENCY_TAG) @@ -325,7 +325,7 @@ def _snapshot_from_execution_step(execution_step: IExecutionStep) -> ExecutionSt ), tags=execution_step.tags, step_handle=execution_step.handle, - step_concurrency_key=execution_step.concurrency_key, + concurrency_group=execution_step.concurrency_group, ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py index 36a60a6fb5d8b..2f5faebb6fec1 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py @@ -169,11 +169,11 @@ def define_concurrency_job(use_tags): tags = None concurrency_kwarg = "foo" - @op(tags=tags, concurrency_key=concurrency_kwarg) + @op(tags=tags, concurrency_group=concurrency_kwarg) def foo_op(): pass - @op(tags=tags, concurrency_key=concurrency_kwarg) + @op(tags=tags, concurrency_group=concurrency_kwarg) def bar_op(): pass @@ -274,7 +274,7 @@ def define_concurrency_retry_job(use_tags): tags = None concurrency_kwarg = "foo" - @op(tags=tags, concurrency_key=concurrency_kwarg) + @op(tags=tags, concurrency_group=concurrency_kwarg) def foo_op(): pass diff --git a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_op_concurrency.py b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_op_concurrency.py index 48a6eea6e62a8..c30496e56b624 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_op_concurrency.py +++ b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_op_concurrency.py @@ -18,12 +18,12 @@ ) -@op(concurrency_key="foo") +@op(concurrency_group="foo") def should_never_execute(_x): assert False # this should never execute -@op(concurrency_key="foo") +@op(concurrency_group="foo") def throw_error(): raise Exception("bad programmer") @@ -33,14 +33,14 @@ def error_graph(): should_never_execute(throw_error()) -@op(concurrency_key="foo") +@op(concurrency_group="foo") def simple_op(context): time.sleep(0.1) foo_info = context.instance.event_log_storage.get_concurrency_info("foo") return {"active": foo_info.active_slot_count, "pending": foo_info.pending_step_count} -@op(concurrency_key="foo") +@op(concurrency_group="foo") def second_op(context, _): time.sleep(0.1) foo_info = context.instance.event_log_storage.get_concurrency_info("foo") @@ -66,7 +66,7 @@ def two_tier_graph(): second_op(simple_op()) -@op(concurrency_key="foo", retry_policy=RetryPolicy(max_retries=1)) +@op(concurrency_group="foo", retry_policy=RetryPolicy(max_retries=1)) def retry_op(): raise Failure("I fail")