Skip to content

Commit

Permalink
Make automation condition evaluator async (dagster-io#25014)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Start to making the automation condition evaluator evaluate asynchronously. This won't reduce any latency yet, but will be useful once we can actually batch database queries instead of doing `AssetRecord.blocking_get`.

## How I Tested These Changes
Existing tests should pass
  • Loading branch information
briantu authored and Grzyblon committed Oct 26, 2024
1 parent 5725271 commit f44ac2f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,12 @@ def toposorted_asset_keys(self) -> Sequence[AssetKey]:
]

@cached_property
def toposorted_entity_keys(self) -> Sequence[EntityKey]:
"""Return topologically sorted entity keys in graph. Keys with the same topological level are
def toposorted_entity_keys_by_level(self) -> Sequence[Sequence[EntityKey]]:
"""Return topologically sorted levels for entity keys in graph. Keys with the same topological level are
sorted alphabetically to provide stability.
"""
sort_key = lambda e: (e, None) if isinstance(e, AssetKey) else (e.asset_key, e.name)
return [
item
for items_in_level in toposort(self.entity_dep_graph["upstream"], sort_key=sort_key)
for item in sorted(items_in_level, key=sort_key)
]
return toposort(self.entity_dep_graph["upstream"], sort_key=sort_key)

@cached_property
def toposorted_asset_keys_by_level(self) -> Sequence[AbstractSet[AssetKey]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
import logging
from collections import defaultdict
Expand Down Expand Up @@ -102,15 +103,18 @@ def prefetch(self) -> None:
self.logger.info("Done prefetching asset records.")

def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]:
return asyncio.run(self.async_evaluate())

async def async_evaluate(
self,
) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[EntityKey]]]:
self.prefetch()
num_conditions = len(self.entity_keys)
num_evaluated = 0
for entity_key in self.asset_graph.toposorted_entity_keys:
if entity_key not in self.entity_keys:
continue

async def _evaluate_entity_async(entity_key: EntityKey, offset: int):
self.logger.debug(
f"Evaluating {entity_key.to_user_string()} ({num_evaluated+1}/{num_conditions})"
f"Evaluating {entity_key.to_user_string()} ({num_evaluated+offset}/{num_conditions})"
)

try:
Expand All @@ -132,7 +136,16 @@ def evaluate(self) -> Tuple[Sequence[AutomationResult], Sequence[EntitySubset[En
f"requested ({requested_str}) "
f"({format(result.end_timestamp - result.start_timestamp, '.3f')} seconds)"
)
num_evaluated += 1

for topo_level in self.asset_graph.toposorted_entity_keys_by_level:
coroutines = [
_evaluate_entity_async(entity_key, offset)
for offset, entity_key in enumerate(topo_level)
if entity_key in self.entity_keys
]
await asyncio.gather(*coroutines)
num_evaluated += len(coroutines)

return list(self.current_results_by_key.values()), [
v for v in self.request_subsets_by_key.values() if not v.is_empty
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,11 @@ def Bc(): ...
asset_graph = asset_graph_from_assets([A, B, Ac, Bc])

assert asset_graph.toposorted_asset_keys == [A.key, B.key]
assert asset_graph.toposorted_entity_keys == [A.key, Ac.check_key, B.key, Bc.check_key]
assert asset_graph.toposorted_entity_keys_by_level == [
[A.key],
[Ac.check_key, B.key],
[Bc.check_key],
]


def test_required_assets_and_checks_by_key_asset_decorator(
Expand Down

0 comments on commit f44ac2f

Please sign in to comment.