From f44ac2fc107ba5262d18d5eec453c8d7e2035d9b Mon Sep 17 00:00:00 2001 From: Brian Tu Date: Tue, 22 Oct 2024 18:15:41 -0400 Subject: [PATCH] Make automation condition evaluator async (#25014) ## Summary & Motivation Start to making the automation condition evaluator evaluate asynchronously. This won't reduce any latency yet, but will be useful once we can actually batch database queries instead of doing `AssetRecord.blocking_get`. ## How I Tested These Changes Existing tests should pass --- .../_core/definitions/base_asset_graph.py | 10 +++----- .../automation_condition_evaluator.py | 23 +++++++++++++++---- .../asset_defs_tests/test_asset_graph.py | 6 ++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 1f07e290baf41..f7f30317042ae 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -303,16 +303,12 @@ def toposorted_asset_keys(self) -> Sequence[AssetKey]: ] @cached_property - def toposorted_entity_keys(self) -> Sequence[EntityKey]: - """Return topologically sorted entity keys in graph. Keys with the same topological level are + def toposorted_entity_keys_by_level(self) -> Sequence[Sequence[EntityKey]]: + """Return topologically sorted levels for entity keys in graph. Keys with the same topological level are sorted alphabetically to provide stability. """ sort_key = lambda e: (e, None) if isinstance(e, AssetKey) else (e.asset_key, e.name) - return [ - item - for items_in_level in toposort(self.entity_dep_graph["upstream"], sort_key=sort_key) - for item in sorted(items_in_level, key=sort_key) - ] + return toposort(self.entity_dep_graph["upstream"], sort_key=sort_key) @cached_property def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]: diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index 1f71c9dd4dd7a..51f7c175c4eb6 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -1,3 +1,4 @@ +import asyncio import datetime import logging from collections import defaultdict @@ -102,15 +103,18 @@ def prefetch(self) -> None: self.logger.info("Done prefetching asset records.") def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]: + return asyncio.run(self.async_evaluate()) + + async def async_evaluate( + self, + ) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]: self.prefetch() num_conditions = len(self.entity_keys) num_evaluated = 0 - for entity_key in self.asset_graph.toposorted_entity_keys: - if entity_key not in self.entity_keys: - continue + async def _evaluate_entity_async(entity_key: EntityKey, offset: int): self.logger.debug( - f"Evaluating {entity_key.to_user_string()} ({num_evaluated+1}/{num_conditions})" + f"Evaluating {entity_key.to_user_string()} ({num_evaluated+offset}/{num_conditions})" ) try: @@ -132,7 +136,16 @@ def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[En f"requested ({requested_str}) " f"({format(result.end_timestamp - result.start_timestamp, '.3f')} seconds)" ) - num_evaluated += 1 + + for topo_level in self.asset_graph.toposorted_entity_keys_by_level: + coroutines = [ + _evaluate_entity_async(entity_key, offset) + for offset, entity_key in enumerate(topo_level) + if entity_key in self.entity_keys + ] + await asyncio.gather(*coroutines) + num_evaluated += len(coroutines) + return list(self.current_results_by_key.values()), [ v for v in self.request_subsets_by_key.values() if not v.is_empty ] diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 3601587676d64..ab1724dddb8bd 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -757,7 +757,11 @@ def Bc(): ... asset_graph = asset_graph_from_assets([A, B, Ac, Bc]) assert asset_graph.toposorted_asset_keys == [A.key, B.key] - assert asset_graph.toposorted_entity_keys == [A.key, Ac.check_key, B.key, Bc.check_key] + assert asset_graph.toposorted_entity_keys_by_level == [ + [A.key], + [Ac.check_key, B.key], + [Bc.check_key], + ] def test_required_assets_and_checks_by_key_asset_decorator(