Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pool as top-level definition arg #26256

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

prha
Copy link
Member

@prha prha commented Dec 3, 2024

Summary & Motivation

Tags are a bad mechanism for specifying concurrency controls. Adds a top-level argument pool to asset/op definitions to replace the use of op tags to specify concurrency conditions.

This provides more cues for what you can place concurrency limits on (e.g. op defs not graph defs, asset defs not asset specs)

How I Tested These Changes

BK

Changelog

Adds a top-level argument pool to asset/op definitions to replace the use of op tags to specify concurrency conditions.

@prha prha force-pushed the prha/asset_op_concurrency_key branch from 3e46d69 to 0c1183c Compare December 3, 2024 23:26
@prha prha force-pushed the prha/asset_op_concurrency_key branch from 0c1183c to 041872f Compare December 3, 2024 23:28
@prha prha force-pushed the prha/asset_op_concurrency_key branch 6 times, most recently from addf80e to d288e73 Compare December 4, 2024 19:30
@prha prha requested review from schrockn and deepyaman December 5, 2024 20:50
@prha prha force-pushed the prha/asset_op_concurrency_key branch 2 times, most recently from e51874e to 571ead4 Compare December 11, 2024 22:01
Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before we move forward with this I think we need to have a naming session of concurrency group versus pool versus something else. Important decision with long term ramifications.

@prha prha removed the request for review from deepyaman December 13, 2024 17:45
@prha prha force-pushed the prha/asset_op_concurrency_key branch from 571ead4 to 238ba49 Compare December 13, 2024 17:46
@prha
Copy link
Member Author

prha commented Jan 9, 2025

Discussed offline, but putting here for posterity:

I don't have plans to add PoolDefinition, because that automatically ties pools to a code location, and we specifically want pools to be a concept governing across code locations. If we come up with a richer API at the deployment layer, pools would be a great use case for it.

@prha prha force-pushed the prha/asset_op_concurrency_key branch from ba49ba7 to 80d098b Compare January 10, 2025 01:17
@prha
Copy link
Member Author

prha commented Jan 10, 2025

Instead of hiding the pool arg in kwarg, just issued a preview_warning which I will remove once this is ready to go in 1.10.0

Copy link
Contributor

@OwenKephart OwenKephart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments

@@ -808,6 +808,13 @@ def tags(self) -> Mapping[str, str]:
"""The tags associated with the graph."""
return super().tags

@property
def pools(self) -> set[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer from collections.abc import Set

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that everyone's preference? the incidence of set is like 4x the count of Set:

(dagster) ➜  dagster git:(prha/asset_op_concurrency_key) s '\bset\[' | wc
     423    2458   58005
(dagster) ➜  dagster git:(prha/asset_op_concurrency_key) s '\bSet\[' | wc
      99     773   15437

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for typing yeah -- I imagine almost all of those lower case sets are at runtime

until recently this was from typing import AbstractSet, from collections.abc import Set is the same thing but is now preferred by our linter -- this marks the return value as immutable, which has some value, but just in general I haven't seen set[x] used as a return type anywhere else in the code so we should be consistent


@property
@abstractmethod
def pools(self) -> set[str]: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

@@ -286,6 +294,17 @@ def with_retry_policy(self, retry_policy: RetryPolicy) -> "PendingNodeInvocation
"""Creates a copy of this op with the given retry policy."""
return super().with_retry_policy(retry_policy)

@property
def pool(self) -> Optional[str]:
"""Optional[str]: The concurrency group for this op."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should the docstring say pool instead of group?

@@ -644,9 +642,9 @@ def concurrency_event_iterator(
):
step = self.get_step_by_key(step_key)
step_context = plan_context.for_step(step)
step_concurrency_key = cast(str, step.tags.get(GLOBAL_CONCURRENCY_TAG))
pool = cast(str, step.pool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pre-existing, but should this be a check.str() instead of a cast?

@@ -159,6 +159,7 @@ class GraphDefSnap:
dep_structure_snapshot: DependencyStructureSnapshot
input_mapping_snaps: Sequence[InputMappingSnap]
output_mapping_snaps: Sequence[OutputMappingSnap]
pools: set[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

@@ -176,15 +177,39 @@ def get_output_snap(self, name: str) -> OutputDefSnap:


@whitelist_for_serdes(storage_name="SolidDefSnap")
@record
class OpDefSnap:
@record_custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this now need to be custom? does the backcompat not work if pool is just pool: Optional[str] = None?

@prha prha force-pushed the prha/asset_op_concurrency_key branch 2 times, most recently from a120802 to 58ac167 Compare January 16, 2025 00:18
@prha prha requested a review from OwenKephart January 16, 2025 06:26
@prha prha force-pushed the prha/asset_op_concurrency_key branch from 58ac167 to 20be47d Compare January 16, 2025 20:22
Copy link
Contributor

@OwenKephart OwenKephart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving but I do think the return types should be Set[str] instead of set[str]

@@ -177,14 +178,15 @@ def get_output_snap(self, name: str) -> OutputDefSnap:

@whitelist_for_serdes(storage_name="SolidDefSnap")
@record
class OpDefSnap:
class OpDefSnap(IHaveNew):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why IHaveNew here?

@prha prha force-pushed the prha/asset_op_concurrency_key branch 3 times, most recently from 197d7c1 to b7bfa67 Compare January 17, 2025 02:54
@prha prha force-pushed the prha/asset_op_concurrency_key branch from b7bfa67 to 9d20984 Compare January 17, 2025 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants