Skip to content

Commit 1816e2f

Browse files
committed
test run blocking op concurrency
1 parent 2e3847a commit 1816e2f

File tree

3 files changed

+79
-6
lines changed

3 files changed

+79
-6
lines changed

python_modules/dagster/dagster/_core/run_coordinator/queued_run_coordinator.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(
8282
max_user_code_failure_retries: Optional[int] = None,
8383
user_code_failure_retry_delay: Optional[int] = None,
8484
block_op_concurrency_limited_runs: Optional[Mapping[str, Any]] = None,
85+
concurrency_group_granularity: str = "op",
8586
inst_data: Optional[ConfigurableClassData] = None,
8687
):
8788
self._inst_data: Optional[ConfigurableClassData] = check.opt_inst_param(
@@ -128,7 +129,7 @@ def __init__(
128129
"op_concurrency_slot_buffer can only be set if block_op_concurrency_limited_runs "
129130
"is enabled",
130131
)
131-
132+
self._concurrency_group_granularity = ConcurrencyGranularity(concurrency_group_granularity)
132133
self._logger = logging.getLogger("dagster.run_coordinator.queued_run_coordinator")
133134
super().__init__()
134135

@@ -144,6 +145,7 @@ def get_run_queue_config(self) -> RunQueueConfig:
144145
user_code_failure_retry_delay=self._user_code_failure_retry_delay,
145146
should_block_op_concurrency_limited_runs=self._should_block_op_concurrency_limited_runs,
146147
op_concurrency_slot_buffer=self._op_concurrency_slot_buffer,
148+
concurrency_group_granularity=self._concurrency_group_granularity,
147149
)
148150

149151
@property
@@ -264,6 +266,17 @@ def config_type(cls) -> UserConfigSchema:
264266
),
265267
}
266268
),
269+
"concurrency_group_granularity": Field(
270+
str,
271+
is_required=False,
272+
default_value="op",
273+
description=(
274+
"Determines the granularity at which concurrency limits are applied. If set to "
275+
"'op', dequeues runs as long as any op in the run can make progress. If set to "
276+
"'run', dequeues runs as long as all of the concurrency groups assigned to the run "
277+
"have free slots available."
278+
),
279+
),
267280
}
268281

269282
@classmethod
@@ -280,6 +293,7 @@ def from_config_value(
280293
max_user_code_failure_retries=config_value.get("max_user_code_failure_retries"),
281294
user_code_failure_retry_delay=config_value.get("user_code_failure_retry_delay"),
282295
block_op_concurrency_limited_runs=config_value.get("block_op_concurrency_limited_runs"),
296+
concurrency_group_granularity=config_value.get("concurrency_group_granularity", "op"),
283297
)
284298

285299
def submit_run(self, context: SubmitRunContext) -> DagsterRun:

python_modules/dagster/dagster_tests/daemon_tests/test_locations/concurrency_limited_workspace.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,36 @@
33
from dagster._core.storage.tags import GLOBAL_CONCURRENCY_TAG
44

55

6-
@asset(op_tags={GLOBAL_CONCURRENCY_TAG: "foo"}, key_prefix=["prefix"])
6+
@asset(concurrency_group="foo", key_prefix=["prefix"])
77
def foo_limited_asset():
88
return 1
99

1010

11-
@asset(op_tags={GLOBAL_CONCURRENCY_TAG: "bar"}, key_prefix=["prefix"])
11+
@asset(concurrency_group="bar", key_prefix=["prefix"])
1212
def bar_limited_asset():
1313
return 1
1414

1515

1616
@asset(
17-
op_tags={GLOBAL_CONCURRENCY_TAG: "baz"},
17+
concurrency_group="baz",
1818
key_prefix=["prefix"],
1919
ins={"foo_limited_asset": AssetIn(key_prefix="prefix")},
2020
)
2121
def baz_limited_asset_depends_on_foo(foo_limited_asset):
2222
return 1
2323

2424

25+
@asset(concurrency_group="baz", key_prefix=["prefix"])
26+
def baz_limited_asset():
27+
return 1
28+
29+
2530
concurrency_limited_asset_job = define_asset_job(
2631
"concurrency_limited_asset_job",
27-
[foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo],
32+
[foo_limited_asset, bar_limited_asset, baz_limited_asset, baz_limited_asset_depends_on_foo],
2833
).resolve(
2934
asset_graph=AssetGraph.from_assets(
30-
[foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo]
35+
[foo_limited_asset, bar_limited_asset, baz_limited_asset_depends_on_foo, baz_limited_asset]
3136
)
3237
)
3338

python_modules/dagster/dagster_tests/daemon_tests/test_queued_run_coordinator_daemon.py

+54
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,60 @@ def test_concurrency_buffer_with_default_slot(
11791179
assert set(self.get_run_ids(instance.run_launcher.queue())) == {run_id_1}
11801180
caplog.text.count(f"Run {run_id_2} is blocked by global concurrency limits") == 1 # pyright: ignore[reportUnusedExpression]
11811181

1182+
@pytest.mark.parametrize(
1183+
"run_coordinator_config",
1184+
[
1185+
{
1186+
"block_op_concurrency_limited_runs": {
1187+
"enabled": True,
1188+
},
1189+
"concurrency_group_granularity": "run",
1190+
},
1191+
],
1192+
)
1193+
def test_concurrency_run_granularity(
1194+
self,
1195+
concurrency_limited_workspace_context,
1196+
daemon,
1197+
instance,
1198+
):
1199+
run_id_1, run_id_2 = [make_new_run_id() for _ in range(2)]
1200+
workspace = concurrency_limited_workspace_context.create_request_context()
1201+
# concurrency_limited_asset_job
1202+
remote_job = self.get_concurrency_job(workspace)
1203+
foo_key = AssetKey(["prefix", "foo_limited_asset"])
1204+
bar_key = AssetKey(["prefix", "bar_limited_asset"])
1205+
baz_key = AssetKey(["prefix", "baz_limited_asset"])
1206+
downstream_baz_key = AssetKey(["prefix", "baz_limited_asset_depends_on_foo"])
1207+
1208+
# first submit a run that occupies the baz slot
1209+
self.submit_run(
1210+
instance, remote_job, workspace, run_id=run_id_1, asset_selection=set([baz_key])
1211+
)
1212+
list(daemon.run_iteration(concurrency_limited_workspace_context))
1213+
assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1])
1214+
1215+
# submit a run that has 2 root nodes, respectively with foo, bar concurrency groups
1216+
# also with a downstream extra baz concurrency group asset
1217+
self.submit_run(
1218+
instance,
1219+
remote_job,
1220+
workspace,
1221+
run_id=run_id_2,
1222+
asset_selection=set([foo_key, bar_key, downstream_baz_key]),
1223+
)
1224+
1225+
# even though the root nodes have slots available, the second run is not launched
1226+
list(daemon.run_iteration(concurrency_limited_workspace_context))
1227+
assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1])
1228+
1229+
instance.event_log_storage.set_concurrency_slots("baz", 2)
1230+
1231+
# all group slots available, run launched, even though there is only one slot open for baz
1232+
# and the run has two baz-grouped nodes
1233+
list(daemon.run_iteration(concurrency_limited_workspace_context))
1234+
assert set(self.get_run_ids(instance.run_launcher.queue())) == set([run_id_1, run_id_2])
1235+
11821236

11831237
class TestQueuedRunCoordinatorDaemon(QueuedRunCoordinatorDaemonTests):
11841238
@pytest.fixture

0 commit comments

Comments
 (0)