Skip to content

Commit add1323

Browse files
committed
group => pool
1 parent ddea8f4 commit add1323

File tree

5 files changed

+28
-28
lines changed

5 files changed

+28
-28
lines changed

python_modules/dagster/dagster/_core/op_concurrency_limits_counter.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Sequence, Set
44

55
from dagster._core.instance import DagsterInstance
6-
from dagster._core.run_coordinator.queued_run_coordinator import ConcurrencyGranularity
6+
from dagster._core.run_coordinator.queued_run_coordinator import PoolGranularity
77
from dagster._core.snap.execution_plan_snapshot import ExecutionPlanSnapshot
88
from dagster._core.storage.dagster_run import (
99
IN_PROGRESS_RUN_STATUSES,
@@ -58,14 +58,14 @@ def __init__(
5858
runs: Sequence[DagsterRun],
5959
in_progress_run_records: Sequence[RunRecord],
6060
slot_count_offset: int = 0,
61-
concurrency_group_granularity: ConcurrencyGranularity = ConcurrencyGranularity.OP,
61+
pool_granularity: PoolGranularity = PoolGranularity.OP,
6262
):
6363
self._root_concurrency_keys_by_run = {}
6464
self._concurrency_info_by_key: Dict[str, "ConcurrencyKeyInfo"] = {}
6565
self._launched_concurrency_key_counts = defaultdict(int)
6666
self._in_progress_concurrency_key_counts = defaultdict(int)
6767
self._slot_count_offset = slot_count_offset
68-
self._concurrency_group_granularity = concurrency_group_granularity
68+
self._pool_granularity = pool_granularity
6969
self._in_progress_run_ids: Set[str] = set(
7070
[record.dagster_run.run_id for record in in_progress_run_records]
7171
)
@@ -91,7 +91,7 @@ def _fetch_concurrency_info(self, instance: DagsterInstance, queued_runs: Sequen
9191
# if using op granularity, consider only the root keys
9292
run_concurrency_keys = (
9393
run.run_op_concurrency.root_key_counts.keys()
94-
if self._concurrency_group_granularity == ConcurrencyGranularity.OP
94+
if self._pool_granularity == PoolGranularity.OP
9595
else run.run_op_concurrency.all_keys or []
9696
)
9797
all_concurrency_keys.update(run_concurrency_keys)
@@ -115,7 +115,7 @@ def _should_allocate_slots_for_in_progress_run(self, record: RunRecord):
115115
if status not in IN_PROGRESS_RUN_STATUSES:
116116
return False
117117

118-
if self._concurrency_group_granularity == ConcurrencyGranularity.RUN:
118+
if self._pool_granularity == PoolGranularity.RUN:
119119
return True
120120

121121
if status == DagsterRunStatus.STARTING:
@@ -132,11 +132,11 @@ def _slot_counts_for_run(self, run: DagsterRun) -> Mapping[str, int]:
132132
if not run.run_op_concurrency:
133133
return {}
134134

135-
if self._concurrency_group_granularity == ConcurrencyGranularity.OP:
135+
if self._pool_granularity == PoolGranularity.OP:
136136
return {**run.run_op_concurrency.root_key_counts}
137137

138138
else:
139-
assert self._concurrency_group_granularity == ConcurrencyGranularity.RUN
139+
assert self._pool_granularity == PoolGranularity.RUN
140140
return {concurrency_key: 1 for concurrency_key in run.run_op_concurrency.all_keys or []}
141141

142142
def _process_in_progress_runs(self, in_progress_records: Sequence[RunRecord]):
@@ -154,14 +154,14 @@ def is_blocked(self, run: DagsterRun) -> bool:
154154
return False
155155

156156
if (
157-
self._concurrency_group_granularity == ConcurrencyGranularity.OP
157+
self._pool_granularity == PoolGranularity.OP
158158
and run.run_op_concurrency.has_unconstrained_root_nodes
159159
):
160160
# if the granularity is at the op level and there exists a root node that is not
161161
# concurrency blocked, we should dequeue.
162162
return False
163163

164-
if self._concurrency_group_granularity == ConcurrencyGranularity.OP:
164+
if self._pool_granularity == PoolGranularity.OP:
165165
# we just need to check all of the root concurrency keys, instead of all the concurrency keys
166166
# in the run
167167
for concurrency_key in run.run_op_concurrency.root_key_counts.keys():
@@ -189,7 +189,7 @@ def is_blocked(self, run: DagsterRun) -> bool:
189189
return True
190190

191191
else:
192-
assert self._concurrency_group_granularity == ConcurrencyGranularity.RUN
192+
assert self._pool_granularity == PoolGranularity.RUN
193193

194194
# if the granularity is at the run level, we should check if any of the concurrency
195195
# keys are blocked
@@ -228,7 +228,7 @@ def get_blocked_run_debug_info(self, run: DagsterRun) -> Mapping:
228228
continue
229229

230230
log_info[concurrency_key] = {
231-
"granularity": self._concurrency_group_granularity.value,
231+
"granularity": self._pool_granularity.value,
232232
"slot_count": concurrency_info.slot_count,
233233
"pending_step_count": len(concurrency_info.pending_steps),
234234
"pending_step_run_ids": list(

python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from dagster._serdes import ConfigurableClass, ConfigurableClassData
2121

2222

23-
class ConcurrencyGranularity(Enum):
23+
class PoolGranularity(Enum):
2424
OP = "op"
2525
RUN = "run"
2626

@@ -35,7 +35,7 @@ class RunQueueConfig(
3535
("user_code_failure_retry_delay", int),
3636
("should_block_op_concurrency_limited_runs", bool),
3737
("op_concurrency_slot_buffer", int),
38-
("concurrency_group_granularity", ConcurrencyGranularity),
38+
("pool_granularity", PoolGranularity),
3939
],
4040
)
4141
):
@@ -47,7 +47,7 @@ def __new__(
4747
user_code_failure_retry_delay: int = 60,
4848
should_block_op_concurrency_limited_runs: bool = False,
4949
op_concurrency_slot_buffer: int = 0,
50-
concurrency_group_granularity: ConcurrencyGranularity = ConcurrencyGranularity.OP,
50+
pool_granularity: PoolGranularity = PoolGranularity.OP,
5151
):
5252
return super(RunQueueConfig, cls).__new__(
5353
cls,
@@ -60,9 +60,9 @@ def __new__(
6060
),
6161
check.int_param(op_concurrency_slot_buffer, "op_concurrency_slot_buffer"),
6262
check.inst_param(
63-
concurrency_group_granularity,
64-
"concurrency_group_granularity",
65-
ConcurrencyGranularity,
63+
pool_granularity,
64+
"pool_granularity",
65+
PoolGranularity,
6666
),
6767
)
6868

@@ -82,7 +82,7 @@ def __init__(
8282
max_user_code_failure_retries: Optional[int] = None,
8383
user_code_failure_retry_delay: Optional[int] = None,
8484
block_op_concurrency_limited_runs: Optional[Mapping[str, Any]] = None,
85-
concurrency_group_granularity: str = "op",
85+
pool_granularity: str = "op",
8686
inst_data: Optional[ConfigurableClassData] = None,
8787
):
8888
self._inst_data: Optional[ConfigurableClassData] = check.opt_inst_param(
@@ -129,7 +129,7 @@ def __init__(
129129
"op_concurrency_slot_buffer can only be set if block_op_concurrency_limited_runs "
130130
"is enabled",
131131
)
132-
self._concurrency_group_granularity = ConcurrencyGranularity(concurrency_group_granularity)
132+
self._pool_granularity = PoolGranularity(pool_granularity)
133133
self._logger = logging.getLogger("dagster.run_coordinator.queued_run_coordinator")
134134
super().__init__()
135135

@@ -145,7 +145,7 @@ def get_run_queue_config(self) -> RunQueueConfig:
145145
user_code_failure_retry_delay=self._user_code_failure_retry_delay,
146146
should_block_op_concurrency_limited_runs=self._should_block_op_concurrency_limited_runs,
147147
op_concurrency_slot_buffer=self._op_concurrency_slot_buffer,
148-
concurrency_group_granularity=self._concurrency_group_granularity,
148+
pool_granularity=self._pool_granularity,
149149
)
150150

151151
@property
@@ -266,7 +266,7 @@ def config_type(cls) -> UserConfigSchema:
266266
),
267267
}
268268
),
269-
"concurrency_group_granularity": Field(
269+
"pool_granularity": Field(
270270
str,
271271
is_required=False,
272272
default_value="op",
@@ -293,7 +293,7 @@ def from_config_value(
293293
max_user_code_failure_retries=config_value.get("max_user_code_failure_retries"),
294294
user_code_failure_retry_delay=config_value.get("user_code_failure_retry_delay"),
295295
block_op_concurrency_limited_runs=config_value.get("block_op_concurrency_limited_runs"),
296-
concurrency_group_granularity=config_value.get("concurrency_group_granularity", "op"),
296+
pool_granularity=config_value.get("pool_granularity", "op"),
297297
)
298298

299299
def submit_run(self, context: SubmitRunContext) -> DagsterRun:

python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def _get_runs_to_dequeue(
269269
batch,
270270
in_progress_run_records,
271271
run_queue_config.op_concurrency_slot_buffer,
272-
run_queue_config.concurrency_group_granularity,
272+
run_queue_config.pool_granularity,
273273
)
274274
except:
275275
self._logger.exception("Failed to initialize op concurrency counter")

python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,26 @@
33
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG
44

55

6-
@asset(concurrency_group="foo", key_prefix=["prefix"])
6+
@asset(pool="foo", key_prefix=["prefix"])
77
def foo_limited_asset():
88
return 1
99

1010

11-
@asset(concurrency_group="bar", key_prefix=["prefix"])
11+
@asset(pool="bar", key_prefix=["prefix"])
1212
def bar_limited_asset():
1313
return 1
1414

1515

1616
@asset(
17-
concurrency_group="baz",
17+
pool="baz",
1818
key_prefix=["prefix"],
1919
ins={"foo_limited_asset": AssetIn(key_prefix="prefix")},
2020
)
2121
def baz_limited_asset_depends_on_foo(foo_limited_asset):
2222
return 1
2323

2424

25-
@asset(concurrency_group="baz", key_prefix=["prefix"])
25+
@asset(pool="baz", key_prefix=["prefix"])
2626
def baz_limited_asset():
2727
return 1
2828

python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ def test_concurrency_buffer_with_default_slot(
11861186
"block_op_concurrency_limited_runs": {
11871187
"enabled": True,
11881188
},
1189-
"concurrency_group_granularity": "run",
1189+
"pool_granularity": "run",
11901190
},
11911191
],
11921192
)

0 commit comments

Comments
 (0)