-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add support for run-granularity op concurrency #26458
base: prha/concurrency_config
Are you sure you want to change the base?
Conversation
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
571ead4
to
238ba49
Compare
34f9dcd
to
1816e2f
Compare
238ba49
to
a36d157
Compare
1816e2f
to
e7e5973
Compare
67085a7
to
21ae854
Compare
add1323
to
57f2db2
Compare
a32b759
to
be83c16
Compare
57f2db2
to
896bb19
Compare
f52347d
to
ce97f66
Compare
896bb19
to
960f7c2
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 960f7c2. |
f572156
to
8cfee9c
Compare
960f7c2
to
a85aa24
Compare
8cfee9c
to
152beca
Compare
a85aa24
to
d217d12
Compare
152beca
to
068a084
Compare
d217d12
to
72da007
Compare
068a084
to
6927f22
Compare
72da007
to
8782d96
Compare
6927f22
to
cb8dd1e
Compare
8782d96
to
5c83e2c
Compare
cb8dd1e
to
bf19552
Compare
5c83e2c
to
9e5e36d
Compare
bf19552
to
17ed4d9
Compare
9e5e36d
to
eb04b1b
Compare
17ed4d9
to
6a116d5
Compare
eb04b1b
to
d80e191
Compare
6a116d5
to
7309b15
Compare
d80e191
to
e5ad4c3
Compare
7309b15
to
256d726
Compare
e5ad4c3
to
111c97c
Compare
256d726
to
3efd17a
Compare
111c97c
to
4e2eb36
Compare
3efd17a
to
3d2b288
Compare
4e2eb36
to
647d5df
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note comments, but looks good
@@ -307,6 +344,7 @@ def submit_run(self, context: SubmitRunContext) -> DagsterRun: | |||
run = self._instance.get_run_by_id(dagster_run.run_id) | |||
if run is None: | |||
check.failed(f"Failed to reload run {dagster_run.run_id}") | |||
assert run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
if not isinstance(workspace_process_context.instance.run_coordinator, QueuedRunCoordinator): | ||
check.failed( | ||
f"Expected QueuedRunCoordinator, got {workspace_process_context.instance.run_coordinator}" | ||
) | ||
run_coordinator = cast( | ||
QueuedRunCoordinator, workspace_process_context.instance.run_coordinator | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this fundamentally different from
run_coordinator = check.inst(workspace_process_context.instance.run_coordinator, QueuedRunCoordinator)
?
return {**run.run_op_concurrency.root_key_counts} | ||
|
||
else: | ||
assert self._pool_granularity == PoolGranularity.RUN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: prefer check.invariant() to assert, or potentially just:
if granularity == OP: ...
elif granularity == RUN: ...
else: raise Exception()
|
||
def get_blocked_run_debug_info(self, run: DagsterRun) -> Mapping: | ||
if not run.run_op_concurrency: | ||
return {} | ||
|
||
log_info = {} | ||
for pool in run.run_op_concurrency.root_key_counts.keys(): | ||
concurrency_info = self._concurrency_info_by_pool.get(pool) | ||
concurrency_info = self._concurrency_info_by_key.get(pool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any particular reason for this name change? nbd either way just curious
Summary & Motivation
Enables setting run granularity enforcement of pools, to limit the concurrency of in-progress runs containing a particular op / asset.
How I Tested These Changes
BK
Changelog