Skip to content

Commit 571ead4

Browse files
committed
group
1 parent 849f8d3 commit 571ead4

File tree

14 files changed

+81
-83
lines changed

14 files changed

+81
-83
lines changed

python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
240240
asset_in_map={},
241241
asset_out_map={},
242242
execution_type=None,
243-
concurrency_key=None,
243+
concurrency_group=None,
244244
)
245245

246246
builder = DecoratorAssetsDefinitionBuilder(

python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def asset(
106106
check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
107107
owners: Optional[Sequence[str]] = ...,
108108
kinds: Optional[AbstractSet[str]] = ...,
109-
concurrency_key: Optional[str] = ...,
109+
concurrency_group: Optional[str] = ...,
110110
**kwargs,
111111
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
112112

@@ -184,7 +184,7 @@ def asset(
184184
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
185185
owners: Optional[Sequence[str]] = None,
186186
kinds: Optional[AbstractSet[str]] = None,
187-
concurrency_key: Optional[str] = None,
187+
concurrency_group: Optional[str] = None,
188188
**kwargs,
189189
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
190190
"""Create a definition for how to compute an asset.
@@ -260,7 +260,7 @@ def asset(
260260
e.g. `team:finops`.
261261
kinds (Optional[Set[str]]): A list of strings representing the kinds of the asset. These
262262
will be made visible in the Dagster UI.
263-
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs
263+
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs
264264
this asset's execution.
265265
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
266266
Set of asset keys that are upstream dependencies, but do not pass an input to the asset.
@@ -322,7 +322,7 @@ def my_asset(my_upstream_asset: int) -> int:
322322
check_specs=check_specs,
323323
key=key,
324324
owners=owners,
325-
concurrency_key=concurrency_key,
325+
concurrency_group=concurrency_group,
326326
)
327327

328328
if compute_fn is not None:
@@ -395,7 +395,7 @@ class AssetDecoratorArgs(NamedTuple):
395395
key: Optional[CoercibleToAssetKey]
396396
check_specs: Optional[Sequence[AssetCheckSpec]]
397397
owners: Optional[Sequence[str]]
398-
concurrency_key: Optional[str]
398+
concurrency_group: Optional[str]
399399

400400

401401
class ResourceRelatedState(NamedTuple):
@@ -520,7 +520,7 @@ def create_assets_def_from_fn_and_decorator_args(
520520
can_subset=False,
521521
decorator_name="@asset",
522522
execution_type=AssetExecutionType.MATERIALIZATION,
523-
concurrency_key=args.concurrency_key,
523+
concurrency_group=args.concurrency_group,
524524
)
525525

526526
builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
@@ -567,7 +567,7 @@ def multi_asset(
567567
code_version: Optional[str] = None,
568568
specs: Optional[Sequence[AssetSpec]] = None,
569569
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
570-
concurrency_key: Optional[str] = None,
570+
concurrency_group: Optional[str] = None,
571571
**kwargs: Any,
572572
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
573573
"""Create a combined definition of multiple assets that are computed using the same op and same
@@ -622,7 +622,7 @@ def multi_asset(
622622
by this function.
623623
check_specs (Optional[Sequence[AssetCheckSpec]]): Specs for asset checks that
624624
execute in the decorated function after materializing the assets.
625-
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that
625+
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that
626626
governs this multi-asset's execution.
627627
non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead.
628628
Set of asset keys that are upstream dependencies, but do not pass an input to the
@@ -699,7 +699,7 @@ def my_function(asset0):
699699
backfill_policy=backfill_policy,
700700
decorator_name="@multi_asset",
701701
execution_type=AssetExecutionType.MATERIALIZATION,
702-
concurrency_key=concurrency_key,
702+
concurrency_group=concurrency_group,
703703
)
704704

705705
def inner(fn: Callable[..., Any]) -> AssetsDefinition:

python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class DecoratorAssetsDefinitionBuilderArgs(NamedTuple):
237237
specs: Sequence[AssetSpec]
238238
upstream_asset_deps: Optional[Iterable[AssetDep]]
239239
execution_type: Optional[AssetExecutionType]
240-
concurrency_key: Optional[str]
240+
concurrency_group: Optional[str]
241241

242242
@property
243243
def check_specs(self) -> Sequence[AssetCheckSpec]:

python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def __init__(
5252
retry_policy: Optional[RetryPolicy] = None,
5353
ins: Optional[Mapping[str, In]] = None,
5454
out: Optional[Union[Out, Mapping[str, Out]]] = None,
55-
concurrency_key: Optional[str] = None,
55+
concurrency_group: Optional[str] = None,
5656
):
5757
self.name = check.opt_str_param(name, "name")
5858
self.decorator_takes_context = check.bool_param(
@@ -66,7 +66,7 @@ def __init__(
6666
self.tags = tags
6767
self.code_version = code_version
6868
self.retry_policy = retry_policy
69-
self.concurrency_key = concurrency_key
69+
self.concurrency_group = concurrency_group
7070

7171
# config will be checked within OpDefinition
7272
self.config_schema = config_schema
@@ -134,7 +134,7 @@ def __call__(self, fn: Callable[..., Any]) -> "OpDefinition":
134134
code_version=self.code_version,
135135
retry_policy=self.retry_policy,
136136
version=None, # code_version has replaced version
137-
concurrency_key=self.concurrency_key,
137+
concurrency_group=self.concurrency_group,
138138
)
139139
update_wrapper(op_def, compute_fn.decorated_fn)
140140
return op_def
@@ -157,7 +157,7 @@ def op(
157157
version: Optional[str] = ...,
158158
retry_policy: Optional[RetryPolicy] = ...,
159159
code_version: Optional[str] = ...,
160-
concurrency_key: Optional[str] = None,
160+
concurrency_group: Optional[str] = None,
161161
) -> _Op: ...
162162

163163

@@ -177,7 +177,7 @@ def op(
177177
version: Optional[str] = None,
178178
retry_policy: Optional[RetryPolicy] = None,
179179
code_version: Optional[str] = None,
180-
concurrency_key: Optional[str] = None,
180+
concurrency_group: Optional[str] = None,
181181
) -> Union["OpDefinition", _Op]:
182182
"""Create an op with the specified parameters from the decorated function.
183183
@@ -271,7 +271,7 @@ def multi_out() -> Tuple[str, int]:
271271
retry_policy=retry_policy,
272272
ins=ins,
273273
out=out,
274-
concurrency_key=concurrency_key,
274+
concurrency_group=concurrency_group,
275275
)
276276

277277

python_modules/dagster/dagster/_core/definitions/decorators/source_asset_decorator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def my_function():
300300
backfill_policy=None,
301301
decorator_name="@multi_observable_source_asset",
302302
execution_type=AssetExecutionType.OBSERVATION,
303-
concurrency_key=None,
303+
concurrency_group=None,
304304
)
305305

306306
def inner(fn: Callable[..., Any]) -> AssetsDefinition:

python_modules/dagster/dagster/_core/definitions/op_definition.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class OpDefinition(NodeDefinition, IHasInternalInit):
9191
code_version (Optional[str]): (Experimental) Version of the code encapsulated by the op. If set,
9292
this is used as a default code version for all outputs.
9393
retry_policy (Optional[RetryPolicy]): The retry policy for this op.
94-
concurrency_key (Optional[str]): A string that identifies the concurrency limit group that governs
94+
concurrency_group (Optional[str]): A string that identifies the concurrency limit group that governs
9595
this op's execution.
9696
9797
@@ -114,7 +114,7 @@ def _add_one(_context, inputs):
114114
_required_resource_keys: AbstractSet[str]
115115
_version: Optional[str]
116116
_retry_policy: Optional[RetryPolicy]
117-
_concurrency_key: Optional[str]
117+
_concurrency_group: Optional[str]
118118

119119
def __init__(
120120
self,
@@ -129,7 +129,7 @@ def __init__(
129129
version: Optional[str] = None,
130130
retry_policy: Optional[RetryPolicy] = None,
131131
code_version: Optional[str] = None,
132-
concurrency_key: Optional[str] = None,
132+
concurrency_group: Optional[str] = None,
133133
):
134134
from dagster._core.definitions.decorators.op_decorator import (
135135
DecoratedOpFunction,
@@ -174,7 +174,7 @@ def __init__(
174174
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
175175
)
176176
self._retry_policy = check.opt_inst_param(retry_policy, "retry_policy", RetryPolicy)
177-
self._concurrency_key = _validate_concurrency_key(concurrency_key, tags)
177+
self._concurrency_group = _validate_concurrency_group(concurrency_group, tags)
178178

179179
positional_inputs = (
180180
self._compute_fn.positional_inputs()
@@ -204,7 +204,7 @@ def dagster_internal_init(
204204
version: Optional[str],
205205
retry_policy: Optional[RetryPolicy],
206206
code_version: Optional[str],
207-
concurrency_key: Optional[str],
207+
concurrency_group: Optional[str],
208208
) -> "OpDefinition":
209209
return OpDefinition(
210210
compute_fn=compute_fn,
@@ -218,7 +218,7 @@ def dagster_internal_init(
218218
version=version,
219219
retry_policy=retry_policy,
220220
code_version=code_version,
221-
concurrency_key=concurrency_key,
221+
concurrency_group=concurrency_group,
222222
)
223223

224224
@property
@@ -305,9 +305,9 @@ def with_retry_policy(self, retry_policy: RetryPolicy) -> "PendingNodeInvocation
305305
return super(OpDefinition, self).with_retry_policy(retry_policy)
306306

307307
@property
308-
def concurrency_key(self) -> Optional[str]:
308+
def concurrency_group(self) -> Optional[str]:
309309
"""Optional[str]: The concurrency key for this op."""
310-
return self._concurrency_key
310+
return self._concurrency_group
311311

312312
def is_from_decorator(self) -> bool:
313313
from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
@@ -393,7 +393,7 @@ def with_replaced_properties(
393393
code_version=self._version,
394394
retry_policy=self.retry_policy,
395395
version=None, # code_version replaces version
396-
concurrency_key=self.concurrency_key,
396+
concurrency_group=self.concurrency_group,
397397
)
398398

399399
def copy_for_configured(
@@ -602,19 +602,19 @@ def _is_result_object_type(ttype):
602602
return ttype in (MaterializeResult, ObserveResult, AssetCheckResult)
603603

604604

605-
def _validate_concurrency_key(concurrency_key, tags):
605+
def _validate_concurrency_group(concurrency_group, tags):
606606
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG
607607

608-
check.opt_str_param(concurrency_key, "concurrency_key")
608+
check.opt_str_param(concurrency_group, "concurrency_group")
609609
tags = check.opt_mapping_param(tags, "tags")
610610
tag_concurrency_key = tags.get(GLOBAL_CONCURRENCY_TAG)
611-
if concurrency_key and tag_concurrency_key and concurrency_key != tag_concurrency_key:
611+
if concurrency_group and tag_concurrency_key and concurrency_group != tag_concurrency_key:
612612
raise DagsterInvalidDefinitionError(
613-
f'Concurrency key "{concurrency_key}" that conflicts with the concurrency key tag "{tag_concurrency_key}".'
613+
f'Concurrency group "{concurrency_group}" that conflicts with the concurrency key tag "{tag_concurrency_key}".'
614614
)
615615

616-
if concurrency_key:
617-
return concurrency_key
616+
if concurrency_group:
617+
return concurrency_group
618618

619619
if tag_concurrency_key:
620620
return tag_concurrency_key

python_modules/dagster/dagster/_core/execution/plan/active.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,14 @@ def get_steps_to_execute(
350350
if run_scoped_concurrency_limits_counter:
351351
run_scoped_concurrency_limits_counter.update_counters_with_launched_item(step)
352352

353-
if step.concurrency_key and self._instance_concurrency_context:
353+
if step.concurrency_group and self._instance_concurrency_context:
354354
try:
355355
step_priority = int(step.tags.get(PRIORITY_TAG, 0))
356356
except ValueError:
357357
step_priority = 0
358358

359359
if not self._instance_concurrency_context.claim(
360-
step.concurrency_key, step.key, step_priority
360+
step.concurrency_group, step.key, step_priority
361361
):
362362
continue
363363

@@ -655,9 +655,9 @@ def concurrency_event_iterator(
655655
):
656656
step = self.get_step_by_key(step_key)
657657
step_context = plan_context.for_step(step)
658-
step_concurrency_key = cast(str, step.concurrency_key)
658+
concurrency_group = cast(str, step.concurrency_group)
659659
self._messaged_concurrency_slots[step_key] = time.time()
660660
is_initial_message = last_messaged_timestamp is None
661661
yield DagsterEvent.step_concurrency_blocked(
662-
step_context, step_concurrency_key, initial=is_initial_message
662+
step_context, concurrency_group, initial=is_initial_message
663663
)

python_modules/dagster/dagster/_core/execution/plan/plan.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ def _build_from_sorted_nodes(
316316
),
317317
step_outputs=step_outputs,
318318
tags=node.tags,
319-
concurrency_key=node.definition.concurrency_key,
319+
concurrency_group=node.definition.concurrency_group,
320320
)
321321
elif has_pending_input:
322322
new_step = UnresolvedCollectExecutionStep(
@@ -327,7 +327,7 @@ def _build_from_sorted_nodes(
327327
),
328328
step_outputs=step_outputs,
329329
tags=node.tags,
330-
concurrency_key=node.definition.concurrency_key,
330+
concurrency_group=node.definition.concurrency_group,
331331
)
332332
else:
333333
new_step = ExecutionStep(
@@ -336,7 +336,7 @@ def _build_from_sorted_nodes(
336336
step_inputs=cast(List[StepInput], step_inputs),
337337
step_outputs=step_outputs,
338338
tags=node.tags,
339-
concurrency_key=node.definition.concurrency_key,
339+
concurrency_group=node.definition.concurrency_group,
340340
)
341341

342342
self.add_step(new_step)
@@ -1031,7 +1031,7 @@ def rebuild_from_snapshot(
10311031
step_inputs, # type: ignore # (plain StepInput only)
10321032
step_outputs,
10331033
step_snap.tags,
1034-
step_snap.concurrency_key,
1034+
step_snap.concurrency_group,
10351035
)
10361036
elif step_snap.kind == StepKind.UNRESOLVED_MAPPED:
10371037
step = UnresolvedMappedExecutionStep(
@@ -1043,7 +1043,7 @@ def rebuild_from_snapshot(
10431043
step_inputs, # type: ignore # (StepInput or UnresolvedMappedStepInput only)
10441044
step_outputs,
10451045
step_snap.tags,
1046-
step_snap.concurrency_key,
1046+
step_snap.concurrency_group,
10471047
)
10481048
elif step_snap.kind == StepKind.UNRESOLVED_COLLECT:
10491049
step = UnresolvedCollectExecutionStep(
@@ -1052,7 +1052,7 @@ def rebuild_from_snapshot(
10521052
step_inputs, # type: ignore # (StepInput or UnresolvedCollectStepInput only)
10531053
step_outputs,
10541054
step_snap.tags,
1055-
step_snap.concurrency_key,
1055+
step_snap.concurrency_group,
10561056
)
10571057
else:
10581058
raise Exception(f"Unexpected step kind {step_snap.kind}")

0 commit comments

Comments
 (0)