Skip to content

Commit

Permalink
group
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Dec 11, 2024
1 parent 849f8d3 commit e51874e
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
asset_in_map={},
asset_out_map={},
execution_type=None,
concurrency_key=None,
concurrency_group=None,
)

builder = DecoratorAssetsDefinitionBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def asset(
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
owners: Optional[Sequence[str]] = ...,
kinds: Optional[AbstractSet[str]] = ...,
concurrency_key: Optional[str] = ...,
concurrency_group: Optional[str] = ...,
**kwargs,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...

Expand Down Expand Up @@ -184,7 +184,7 @@ def asset(
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners: Optional[Sequence[str]] = None,
kinds: Optional[AbstractSet[str]] = None,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
**kwargs,
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
"""Create a definition for how to compute an asset.
Expand Down Expand Up @@ -260,7 +260,7 @@ def asset(
e.g. `team:finops`.
kinds (Optional[Set[str]]): A list of strings representing the kinds of the asset. These
will be made visible in the Dagster UI.
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs
this asset's execution.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the asset.
Expand Down Expand Up @@ -322,7 +322,7 @@ def my_asset(my_upstream_asset: int) -> int:
check_specs=check_specs,
key=key,
owners=owners,
concurrency_key=concurrency_key,
concurrency_group=concurrency_group,
)

if compute_fn is not None:
Expand Down Expand Up @@ -395,7 +395,7 @@ class AssetDecoratorArgs(NamedTuple):
key: Optional[CoercibleToAssetKey]
check_specs: Optional[Sequence[AssetCheckSpec]]
owners: Optional[Sequence[str]]
concurrency_key: Optional[str]
concurrency_group: Optional[str]


class ResourceRelatedState(NamedTuple):
Expand Down Expand Up @@ -520,7 +520,7 @@ def create_assets_def_from_fn_and_decorator_args(
can_subset=False,
decorator_name="@asset",
execution_type=AssetExecutionType.MATERIALIZATION,
concurrency_key=args.concurrency_key,
concurrency_group=args.concurrency_group,
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
Expand Down Expand Up @@ -567,7 +567,7 @@ def multi_asset(
code_version: Optional[str] = None,
specs: Optional[Sequence[AssetSpec]] = None,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
**kwargs: Any,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a combined definition of multiple assets that are computed using the same op and same
Expand Down Expand Up @@ -622,7 +622,7 @@ def multi_asset(
by this function.
check_specs (Optional[Sequence[AssetCheckSpec]]): Specs for asset checks that
execute in the decorated function after materializing the assets.
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that
governs this multi-asset's execution.
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
Set of asset keys that are upstream dependencies, but do not pass an input to the
Expand Down Expand Up @@ -699,7 +699,7 @@ def my_function(asset0):
backfill_policy=backfill_policy,
decorator_name="@multi_asset",
execution_type=AssetExecutionType.MATERIALIZATION,
concurrency_key=concurrency_key,
concurrency_group=concurrency_group,
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class DecoratorAssetsDefinitionBuilderArgs(NamedTuple):
specs: Sequence[AssetSpec]
upstream_asset_deps: Optional[Iterable[AssetDep]]
execution_type: Optional[AssetExecutionType]
concurrency_key: Optional[str]
concurrency_group: Optional[str]

@property
def check_specs(self) -> Sequence[AssetCheckSpec]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
retry_policy: Optional[RetryPolicy] = None,
ins: Optional[Mapping[str, In]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
):
self.name = check.opt_str_param(name, "name")
self.decorator_takes_context = check.bool_param(
Expand All @@ -66,7 +66,7 @@ def __init__(
self.tags = tags
self.code_version = code_version
self.retry_policy = retry_policy
self.concurrency_key = concurrency_key
self.concurrency_group = concurrency_group

# config will be checked within OpDefinition
self.config_schema = config_schema
Expand Down Expand Up @@ -134,7 +134,7 @@ def __call__(self, fn: Callable[..., Any]) -> "OpDefinition":
code_version=self.code_version,
retry_policy=self.retry_policy,
version=None, # code_version has replaced version
concurrency_key=self.concurrency_key,
concurrency_group=self.concurrency_group,
)
update_wrapper(op_def, compute_fn.decorated_fn)
return op_def
Expand All @@ -157,7 +157,7 @@ def op(
version: Optional[str] = ...,
retry_policy: Optional[RetryPolicy] = ...,
code_version: Optional[str] = ...,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
) -> _Op: ...


Expand All @@ -177,7 +177,7 @@ def op(
version: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
) -> Union["OpDefinition", _Op]:
"""Create an op with the specified parameters from the decorated function.
Expand Down Expand Up @@ -271,7 +271,7 @@ def multi_out() -> Tuple[str, int]:
retry_policy=retry_policy,
ins=ins,
out=out,
concurrency_key=concurrency_key,
concurrency_group=concurrency_group,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def my_function():
backfill_policy=None,
decorator_name="@multi_observable_source_asset",
execution_type=AssetExecutionType.OBSERVATION,
concurrency_key=None,
concurrency_group=None,
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
Expand Down
30 changes: 15 additions & 15 deletions python_modules/dagster/dagster/_core/definitions/op_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class OpDefinition(NodeDefinition, IHasInternalInit):
code_version (Optional[str]): (Experimental) Version of the code encapsulated by the op. If set,
this is used as a default code version for all outputs.
retry_policy (Optional[RetryPolicy]): The retry policy for this op.
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs
this op's execution.
Expand All @@ -114,7 +114,7 @@ def _add_one(_context, inputs):
_required_resource_keys: AbstractSet[str]
_version: Optional[str]
_retry_policy: Optional[RetryPolicy]
_concurrency_key: Optional[str]
_concurrency_group: Optional[str]

def __init__(
self,
Expand All @@ -129,7 +129,7 @@ def __init__(
version: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
concurrency_key: Optional[str] = None,
concurrency_group: Optional[str] = None,
):
from dagster._core.definitions.decorators.op_decorator import (
DecoratedOpFunction,
Expand Down Expand Up @@ -174,7 +174,7 @@ def __init__(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
)
self._retry_policy = check.opt_inst_param(retry_policy, "retry_policy", RetryPolicy)
self._concurrency_key = _validate_concurrency_key(concurrency_key, tags)
self._concurrency_group = _validate_concurrency_group(concurrency_group, tags)

positional_inputs = (
self._compute_fn.positional_inputs()
Expand Down Expand Up @@ -204,7 +204,7 @@ def dagster_internal_init(
version: Optional[str],
retry_policy: Optional[RetryPolicy],
code_version: Optional[str],
concurrency_key: Optional[str],
concurrency_group: Optional[str],
) -> "OpDefinition":
return OpDefinition(
compute_fn=compute_fn,
Expand All @@ -218,7 +218,7 @@ def dagster_internal_init(
version=version,
retry_policy=retry_policy,
code_version=code_version,
concurrency_key=concurrency_key,
concurrency_group=concurrency_group,
)

@property
Expand Down Expand Up @@ -305,9 +305,9 @@ def with_retry_policy(self, retry_policy: RetryPolicy) -> "PendingNodeInvocation
return super(OpDefinition, self).with_retry_policy(retry_policy)

@property
def concurrency_key(self) -> Optional[str]:
def concurrency_group(self) -> Optional[str]:
"""Optional[str]: The concurrency key for this op."""
return self._concurrency_key
return self._concurrency_group

def is_from_decorator(self) -> bool:
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
Expand Down Expand Up @@ -393,7 +393,7 @@ def with_replaced_properties(
code_version=self._version,
retry_policy=self.retry_policy,
version=None, # code_version replaces version
concurrency_key=self.concurrency_key,
concurrency_group=self.concurrency_group,
)

def copy_for_configured(
Expand Down Expand Up @@ -602,19 +602,19 @@ def _is_result_object_type(ttype):
return ttype in (MaterializeResult, ObserveResult, AssetCheckResult)


def _validate_concurrency_key(concurrency_key, tags):
def _validate_concurrency_group(concurrency_group, tags):
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG

check.opt_str_param(concurrency_key, "concurrency_key")
check.opt_str_param(concurrency_group, "concurrency_group")
tags = check.opt_mapping_param(tags, "tags")
tag_concurrency_key = tags.get(GLOBAL_CONCURRENCY_TAG)
if concurrency_key and tag_concurrency_key and concurrency_key != tag_concurrency_key:
if concurrency_group and tag_concurrency_key and concurrency_group != tag_concurrency_key:
raise DagsterInvalidDefinitionError(
f'Concurrency key "{concurrency_key}" that conflicts with the concurrency key tag "{tag_concurrency_key}".'
f'Concurrency group "{concurrency_group}" that conflicts with the concurrency key tag "{tag_concurrency_key}".'
)

if concurrency_key:
return concurrency_key
if concurrency_group:
return concurrency_group

if tag_concurrency_key:
return tag_concurrency_key
Expand Down
8 changes: 4 additions & 4 deletions python_modules/dagster/dagster/_core/execution/plan/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ def get_steps_to_execute(
if run_scoped_concurrency_limits_counter:
run_scoped_concurrency_limits_counter.update_counters_with_launched_item(step)

if step.concurrency_key and self._instance_concurrency_context:
if step.concurrency_group and self._instance_concurrency_context:
try:
step_priority = int(step.tags.get(PRIORITY_TAG, 0))
except ValueError:
step_priority = 0

if not self._instance_concurrency_context.claim(
step.concurrency_key, step.key, step_priority
step.concurrency_group, step.key, step_priority
):
continue

Expand Down Expand Up @@ -655,9 +655,9 @@ def concurrency_event_iterator(
):
step = self.get_step_by_key(step_key)
step_context = plan_context.for_step(step)
step_concurrency_key = cast(str, step.concurrency_key)
concurrency_group = cast(str, step.concurrency_group)
self._messaged_concurrency_slots[step_key] = time.time()
is_initial_message = last_messaged_timestamp is None
yield DagsterEvent.step_concurrency_blocked(
step_context, step_concurrency_key, initial=is_initial_message
step_context, concurrency_group, initial=is_initial_message
)
12 changes: 6 additions & 6 deletions python_modules/dagster/dagster/_core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def _build_from_sorted_nodes(
),
step_outputs=step_outputs,
tags=node.tags,
concurrency_key=node.definition.concurrency_key,
concurrency_group=node.definition.concurrency_group,
)
elif has_pending_input:
new_step = UnresolvedCollectExecutionStep(
Expand All @@ -327,7 +327,7 @@ def _build_from_sorted_nodes(
),
step_outputs=step_outputs,
tags=node.tags,
concurrency_key=node.definition.concurrency_key,
concurrency_group=node.definition.concurrency_group,
)
else:
new_step = ExecutionStep(
Expand All @@ -336,7 +336,7 @@ def _build_from_sorted_nodes(
step_inputs=cast(List[StepInput], step_inputs),
step_outputs=step_outputs,
tags=node.tags,
concurrency_key=node.definition.concurrency_key,
concurrency_group=node.definition.concurrency_group,
)

self.add_step(new_step)
Expand Down Expand Up @@ -1031,7 +1031,7 @@ def rebuild_from_snapshot(
step_inputs, # type: ignore # (plain StepInput only)
step_outputs,
step_snap.tags,
step_snap.concurrency_key,
step_snap.concurrency_group,
)
elif step_snap.kind == StepKind.UNRESOLVED_MAPPED:
step = UnresolvedMappedExecutionStep(
Expand All @@ -1043,7 +1043,7 @@ def rebuild_from_snapshot(
step_inputs, # type: ignore # (StepInput or UnresolvedMappedStepInput only)
step_outputs,
step_snap.tags,
step_snap.concurrency_key,
step_snap.concurrency_group,
)
elif step_snap.kind == StepKind.UNRESOLVED_COLLECT:
step = UnresolvedCollectExecutionStep(
Expand All @@ -1052,7 +1052,7 @@ def rebuild_from_snapshot(
step_inputs, # type: ignore # (StepInput or UnresolvedCollectStepInput only)
step_outputs,
step_snap.tags,
step_snap.concurrency_key,
step_snap.concurrency_group,
)
else:
raise Exception(f"Unexpected step kind {step_snap.kind}")
Expand Down
Loading

0 comments on commit e51874e

Please sign in to comment.