From 9e39cfa89d58918bc47ed5a1b846afaf102cfa21 Mon Sep 17 00:00:00 2001 From: Brian Tu Date: Tue, 22 Oct 2024 18:19:43 -0400 Subject: [PATCH] Make AutomationCondition evaluate optionally async (#25238) ## Summary & Motivation We now want to make each the `AutomationCondition.evaluate` function optionally async. This is because we still want to keep it non async if users want to make their own automation conditions. To handle this, added a wrapper async evaluate function on `AutomationContext` that checks if the evaluate function is async or not. ## How I Tested These Changes Existing tests should pass --- .../automation_condition_evaluator.py | 7 ++- .../automation_context.py | 7 +++ .../any_downstream_conditions_operator.py | 29 +++++------- .../operators/boolean_operators.py | 38 +++++++++------ .../operators/check_operators.py | 42 ++++++++--------- .../operators/dep_operators.py | 40 ++++++++-------- .../operators/newly_true_operator.py | 7 ++- .../operators/since_operator.py | 25 +++++----- .../builtins/test_check_condition.py | 16 ++++--- .../test_code_version_changed_condition.py | 18 +++---- .../builtins/test_dep_condition.py | 47 ++++++++++--------- .../builtins/test_eager_condition.py | 34 +++++++------- .../builtins/test_failed_condition.py | 23 +++++---- .../builtins/test_in_progress_condition.py | 25 +++++----- .../test_latest_time_window_condition.py | 39 ++++++++------- .../builtins/test_missing_condition.py | 19 ++++---- .../test_newly_requested_condition.py | 14 +++--- .../builtins/test_newly_true_condition.py | 22 +++++---- .../builtins/test_newly_updated_condition.py | 35 +++++++------- .../builtins/test_on_cron_condition.py | 37 ++++++++------- .../builtins/test_on_missing_condition.py | 46 +++++++++--------- .../builtins/test_since_condition.py | 20 ++++---- .../test_updated_since_cron_condition.py | 35 +++++++------- .../test_will_be_requested_condition.py | 26 +++++----- .../fundamentals/test_automation_condition.py | 22 +++++---- .../fundamentals/test_result_value_hash.py | 7 +-- .../automation_condition_scenario.py | 4 +- 27 files changed, 374 insertions(+), 310 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py index 51f7c175c4eb6..4818b419e6dfb 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_condition_evaluator.py @@ -118,7 +118,7 @@ async def _evaluate_entity_async(entity_key: EntityKey, offset: int): ) try: - self.evaluate_entity(entity_key) + await self.evaluate_entity(entity_key) except Exception as e: raise Exception( f"Error while evaluating conditions for {entity_key.to_user_string()}" @@ -150,10 +150,9 @@ async def _evaluate_entity_async(entity_key: EntityKey, offset: int): v for v in self.request_subsets_by_key.values() if not v.is_empty ] - def evaluate_entity(self, key: EntityKey) -> None: + async def evaluate_entity(self, key: EntityKey) -> None: # evaluate the condition of this asset - context = AutomationContext.create(key=key, evaluator=self) - result = context.condition.evaluate(context) + result = await AutomationContext.create(key=key, evaluator=self).evaluate_async() # update dictionaries to keep track of this result self.current_results_by_key[key] = result diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py index 940a11b4bafcd..b0ff274a51f18 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py @@ -1,4 +1,5 @@ import datetime +import inspect import logging from dataclasses import dataclass from typing import TYPE_CHECKING, Generic, Mapping, Optional, Type, TypeVar @@ -9,6 +10,7 @@ from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, EntityKey, T_EntityKey from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, + AutomationResult, ) from dagster._core.definitions.declarative_automation.legacy.legacy_context import ( LegacyRuleEvaluationContext, @@ -109,6 +111,11 @@ def for_child_condition( _root_log=self._root_log, ) + async def evaluate_async(self) -> AutomationResult[T_EntityKey]: + if inspect.iscoroutinefunction(self.condition.evaluate): + return await self.condition.evaluate(self) + return self.condition.evaluate(self) + @property def log(self) -> logging.Logger: """The logger for the current condition evaluation.""" diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py index 30cd79f1c1fbf..978a170ff21e0 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/any_downstream_conditions_operator.py @@ -33,14 +33,13 @@ def children(self) -> Sequence[AutomationCondition]: def requires_cursor(self) -> bool: return False - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: - child_result = self.operand.evaluate( - context.for_child_condition( - child_condition=self.operand, - child_index=0, - candidate_subset=context.candidate_subset, - ) - ) + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: + child_result = await context.for_child_condition( + child_condition=self.operand, + child_index=0, + candidate_subset=context.candidate_subset, + ).evaluate_async() + return AutomationResult( context=context, true_subset=child_result.true_subset, child_results=[child_result] ) @@ -82,7 +81,7 @@ def _get_validated_downstream_conditions( if not condition.has_rule_condition } - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: ignored_conditions = self._get_ignored_conditions(context) downstream_conditions = self._get_validated_downstream_conditions( context.asset_graph.get_downstream_automation_conditions(asset_key=context.key) @@ -95,15 +94,13 @@ def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[Ass ): if downstream_condition in ignored_conditions: continue - child_condition = DownstreamConditionWrapperCondition( - downstream_keys=list(sorted(asset_keys)), operand=downstream_condition - ) - child_context = context.for_child_condition( - child_condition=child_condition, + child_result = await context.for_child_condition( + child_condition=DownstreamConditionWrapperCondition( + downstream_keys=list(sorted(asset_keys)), operand=downstream_condition + ), child_index=i, candidate_subset=context.candidate_subset, - ) - child_result = child_condition.evaluate(child_context) + ).evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_union(child_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py index 23cf97fe84e20..564767f611a2b 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/boolean_operators.py @@ -1,3 +1,4 @@ +import asyncio from typing import List, Sequence import dagster._check as check @@ -35,14 +36,15 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: def requires_cursor(self) -> bool: return False - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: child_results: List[AutomationResult] = [] true_subset = context.candidate_subset for i, child in enumerate(self.children): - child_context = context.for_child_condition( + child_result = await context.for_child_condition( child_condition=child, child_index=i, candidate_subset=true_subset - ) - child_result = child.evaluate(child_context) + ).evaluate_async() child_results.append(child_result) true_subset = true_subset.compute_intersection(child_result.true_subset) return AutomationResult(context, true_subset, child_results=child_results) @@ -83,15 +85,20 @@ def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: def requires_cursor(self) -> bool: return False - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: - child_results: List[AutomationResult] = [] + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: true_subset = context.get_empty_subset() - for i, child in enumerate(self.children): - child_context = context.for_child_condition( + + coroutines = [ + context.for_child_condition( child_condition=child, child_index=i, candidate_subset=context.candidate_subset - ) - child_result = child.evaluate(child_context) - child_results.append(child_result) + ).evaluate_async() + for i, child in enumerate(self.children) + ] + + child_results = await asyncio.gather(*coroutines) + for child_result in child_results: true_subset = true_subset.compute_union(child_result.true_subset) return AutomationResult(context, true_subset, child_results=child_results) @@ -116,11 +123,12 @@ def name(self) -> str: def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: return [self.operand] - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: - child_context = context.for_child_condition( + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: + child_result = await context.for_child_condition( child_condition=self.operand, child_index=0, candidate_subset=context.candidate_subset - ) - child_result = self.operand.evaluate(child_context) + ).evaluate_async() true_subset = context.candidate_subset.compute_difference(child_result.true_subset) return AutomationResult(context, true_subset, child_results=[child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py index 0f069c5f22bd0..fbf5cc5bc5784 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/check_operators.py @@ -1,3 +1,4 @@ +import asyncio from abc import abstractmethod from typing import AbstractSet @@ -54,22 +55,22 @@ class AnyChecksCondition(ChecksAutomationCondition): def base_name(self) -> str: return "ANY_CHECKS_MATCH" - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: - check_results = [] + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: true_subset = context.get_empty_subset() - for i, check_key in enumerate( - sorted(self._get_check_keys(context.key, context.asset_graph)) - ): - check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = check_condition.evaluate( - context.for_child_condition( - child_condition=check_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) + coroutines = [ + context.for_child_condition( + child_condition=EntityMatchesCondition(key=check_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() + for i, check_key in enumerate( + sorted(self._get_check_keys(context.key, context.asset_graph)) ) - check_results.append(check_result) + ] + + check_results = await asyncio.gather(*coroutines) + for check_result in check_results: true_subset = true_subset.compute_union(check_result.true_subset) true_subset = context.candidate_subset.compute_intersection(true_subset) @@ -83,21 +84,18 @@ class AllChecksCondition(ChecksAutomationCondition): def base_name(self) -> str: return "ALL_CHECKS_MATCH" - def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: + async def evaluate(self, context: AutomationContext[AssetKey]) -> AutomationResult[AssetKey]: check_results = [] true_subset = context.candidate_subset for i, check_key in enumerate( sorted(self._get_check_keys(context.key, context.asset_graph)) ): - check_condition = EntityMatchesCondition(key=check_key, operand=self.operand) - check_result = check_condition.evaluate( - context.for_child_condition( - child_condition=check_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + check_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=check_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() check_results.append(check_result) true_subset = true_subset.compute_intersection(check_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py index 6bdb684fdab5c..d217f6e765ef3 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/dep_operators.py @@ -30,7 +30,9 @@ class EntityMatchesCondition( def name(self) -> str: return self.key.to_user_string() - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: # if the key we're mapping to is a child of the key we're mapping from and is not # self-dependent, use the downstream mapping function, otherwise use upstream if ( @@ -48,7 +50,7 @@ def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[ child_condition=self.operand, child_index=0, candidate_subset=to_candidate_subset ) - to_result = self.operand.evaluate(to_context) + to_result = await to_context.evaluate_async() true_subset = to_result.true_subset.compute_mapped_subset( context.key, direction=directions[1] @@ -126,19 +128,18 @@ class AnyDepsCondition(DepsAutomationCondition[T_EntityKey]): def base_name(self) -> str: return "ANY_DEPS_MATCH" - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: dep_results = [] true_subset = context.get_empty_subset() for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): - dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = dep_condition.evaluate( - context.for_child_condition( - child_condition=dep_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + dep_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() dep_results.append(dep_result) true_subset = true_subset.compute_union(dep_result.true_subset) @@ -152,19 +153,18 @@ class AllDepsCondition(DepsAutomationCondition[T_EntityKey]): def base_name(self) -> str: return "ALL_DEPS_MATCH" - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: dep_results = [] true_subset = context.candidate_subset for i, dep_key in enumerate(sorted(self._get_dep_keys(context.key, context.asset_graph))): - dep_condition = EntityMatchesCondition(key=dep_key, operand=self.operand) - dep_result = dep_condition.evaluate( - context.for_child_condition( - child_condition=dep_condition, - child_index=i, - candidate_subset=context.candidate_subset, - ) - ) + dep_result = await context.for_child_condition( + child_condition=EntityMatchesCondition(key=dep_key, operand=self.operand), + child_index=i, + candidate_subset=context.candidate_subset, + ).evaluate_async() dep_results.append(dep_result) true_subset = true_subset.compute_intersection(dep_result.true_subset) diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py index 32f7be9cccf38..1f359cf5f5a32 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/newly_true_operator.py @@ -37,15 +37,14 @@ def _get_previous_child_true_subset( return None return context.asset_graph_view.get_subset_from_serializable_subset(true_subset) - def evaluate(self, context: AutomationContext) -> AutomationResult: + async def evaluate(self, context: AutomationContext) -> AutomationResult: # evaluate child condition - child_context = context.for_child_condition( + child_result = await context.for_child_condition( self.operand, child_index=0, # must evaluate child condition over the entire subset to avoid missing state transitions candidate_subset=context.asset_graph_view.get_full_subset(key=context.key), - ) - child_result = self.operand.evaluate(child_context) + ).evaluate_async() # get the set of asset partitions of the child which newly became true newly_true_child_subset = child_result.true_subset.compute_difference( diff --git a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py index de869baaad5ca..9bde13745a2d2 100644 --- a/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py +++ b/python_modules/dagster/dagster/_core/definitions/declarative_automation/operators/since_operator.py @@ -1,3 +1,4 @@ +import asyncio from typing import Sequence from dagster._core.definitions.asset_key import T_EntityKey @@ -25,21 +26,23 @@ def name(self) -> str: def children(self) -> Sequence[AutomationCondition[T_EntityKey]]: return [self.trigger_condition, self.reset_condition] - def evaluate(self, context: AutomationContext[T_EntityKey]) -> AutomationResult[T_EntityKey]: + async def evaluate( + self, context: AutomationContext[T_EntityKey] + ) -> AutomationResult[T_EntityKey]: # must evaluate child condition over the entire subset to avoid missing state transitions child_candidate_subset = context.asset_graph_view.get_full_subset(key=context.key) - # compute result for trigger condition - trigger_context = context.for_child_condition( - self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset + # compute result for trigger and reset conditions + trigger_result, reset_result = await asyncio.gather( + *[ + context.for_child_condition( + self.trigger_condition, child_index=0, candidate_subset=child_candidate_subset + ).evaluate_async(), + context.for_child_condition( + self.reset_condition, child_index=1, candidate_subset=child_candidate_subset + ).evaluate_async(), + ] ) - trigger_result = self.trigger_condition.evaluate(trigger_context) - - # compute result for reset condition - reset_context = context.for_child_condition( - self.reset_condition, child_index=1, candidate_subset=child_candidate_subset - ) - reset_result = self.reset_condition.evaluate(reset_context) # take the previous subset that this was true for true_subset = context.previous_true_subset or context.get_empty_subset() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py index 71ce3823fad90..de0890fe8653b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py @@ -65,9 +65,10 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: return HardcodedCondition(), true_set +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) @pytest.mark.parametrize("blocking_only", [True, False]) -def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: +async def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_checks_match(inner_condition, blocking_only=blocking_only) @@ -79,25 +80,26 @@ def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: ).with_asset_properties(partitions_def=two_partitions_def) # no checks true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set.add(AssetCheckKey(AssetKey("A"), "a1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") if is_any: assert result.true_subset.size == 2 else: assert result.true_subset.size == (2 if blocking_only else 0) true_set.add(AssetCheckKey(AssetKey("A"), "a2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") if is_any: assert result.true_subset.size == 2 else: assert result.true_subset.size == 2 -def test_any_checks_match_basic() -> None: +@pytest.mark.asyncio +async def test_any_checks_match_basic() -> None: # always true true_condition = AutomationCondition.cron_tick_passed( "* * * * *" @@ -110,11 +112,11 @@ def test_any_checks_match_basic() -> None: state = AutomationConditionScenarioState(downstream_of_check, automation_condition=condition) # there is an upstream check for C - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 1 # there is no upstream check for D - state, result = state.evaluate("D") + state, result = await state.evaluate("D") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py index f0ac8c46fb295..4a8525c5be073 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -8,38 +9,39 @@ ) -def test_code_version_changed_condition() -> None: +@pytest.mark.asyncio +async def test_code_version_changed_condition() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.code_version_changed() ).with_asset_properties(code_version="1") # not changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # still not changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly changed state = state.with_asset_properties(code_version="2") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly changed state = state.with_asset_properties(code_version="3") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # newly changed state = state.with_asset_properties(code_version="2") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py index 150581699518d..d9a2d95b877e9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py @@ -49,8 +49,9 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: return HardcodedCondition(), true_set +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) -def test_dep_missing_unpartitioned(is_any: bool) -> None: +async def test_dep_missing_unpartitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_deps_match(inner_condition) @@ -62,12 +63,12 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None: ) # neither parent is true - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 0 # one parent true, still one false true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 1 else: @@ -75,12 +76,13 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None: # both parents true true_set.add(AssetKeyPartitionKey(AssetKey("B"))) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 1 +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) -def test_dep_missing_partitioned(is_any: bool) -> None: +async def test_dep_missing_partitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_deps_match(inner_condition) @@ -92,11 +94,11 @@ def test_dep_missing_partitioned(is_any: bool) -> None: ).with_asset_properties(partitions_def=two_partitions_def) # no parents true - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("A"), "1")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: # one parent is true for partition 1 assert result.true_subset.size == 1 @@ -105,7 +107,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("A"), "2")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: # both partitions 1 and 2 have at least one true parent assert result.true_subset.size == 2 @@ -114,7 +116,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("B"), "1")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 2 else: @@ -122,7 +124,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 1 true_set.add(AssetKeyPartitionKey(AssetKey("B"), "2")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 2 else: @@ -130,6 +132,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 2 +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) @pytest.mark.parametrize("is_include", [True, False]) @pytest.mark.parametrize( @@ -143,7 +146,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: (2, ["B1", "B2"], 2), ], ) -def test_dep_missing_partitioned_selections( +async def test_dep_missing_partitioned_selections( is_any: bool, is_include: bool, expected_initial_result_size: int, @@ -166,10 +169,10 @@ def test_dep_missing_partitioned_selections( one_asset_depends_on_two, automation_condition=condition ).with_asset_properties(partitions_def=two_partitions_def) # all parents are missing - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == expected_initial_result_size state = state.with_runs(*(run_request(s[0], s[1]) for s in materialized_asset_partitions)) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == expected_final_result_size @@ -185,7 +188,8 @@ def test_dep_missing_partitioned_selections( ) -def test_dep_missing_complex_include() -> None: +@pytest.mark.asyncio +async def test_dep_missing_complex_include() -> None: # true if any dependencies within the "bar" group are missing, or "A" is missing condition = AutomationCondition.any_deps_match( AutomationCondition.missing(), @@ -193,21 +197,22 @@ def test_dep_missing_complex_include() -> None: state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition) # all start off as missing - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # A materialized, D and E still missing state = state.with_runs(run_request(["A"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # D and E materialized, and all the other missing things are in the exclude selection state = state.with_runs(run_request(["D", "E"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 0 -def test_dep_missing_complex_exclude() -> None: +@pytest.mark.asyncio +async def test_dep_missing_complex_exclude() -> None: # true if any dependencies are missing, ignoring A and anything in the "bar" group condition = AutomationCondition.any_deps_match( AutomationCondition.missing(), @@ -215,15 +220,15 @@ def test_dep_missing_complex_exclude() -> None: state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition) # all start off as missing - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # B materialized, C still missing state = state.with_runs(run_request(["B"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # C materialized, and all the other missing things are in the exclude selection state = state.with_runs(run_request(["C"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 6cae4d24ed374..163c6e2d213bb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -31,7 +31,8 @@ ) -def test_eager_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_eager_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.eager(), @@ -39,20 +40,20 @@ def test_eager_unpartitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent updated, now can execute state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has not yet materialized, but it has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same as above - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now B has been materialized, so really shouldn't execute again @@ -62,22 +63,23 @@ def test_eager_unpartitioned() -> None: for ak, pk in result.true_subset.expensively_compute_asset_partitions() ) ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again before the hour, execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # however, B fails state = state.with_failed_run_for_asset("B") # do not try to materialize B again immediately - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_eager_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_eager_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -89,17 +91,17 @@ def test_eager_hourly_partitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -109,23 +111,23 @@ def test_eager_hourly_partitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new partition comes into being, parent hasn't been materialized yet state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent gets materialized, B requested state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # but it fails state = state.with_failed_run_for_asset("B", "2020-02-02-01:00") # B does not get immediately requested again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py index 9f20b4eb9b2b5..62c52c2247b92 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -14,38 +15,40 @@ ) -def test_failed_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_failed_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.execution_failed() ) # no failed partitions - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now a partition fails state = state.with_failed_run_for_asset("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # the next run completes successfully state = state.with_runs(run_request("A")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_in_progress_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.execution_failed() ).with_asset_properties(partitions_def=two_partitions_def) # no failed_runs - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now one partition fails state = state.with_failed_run_for_asset("A", partition_key="1") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "1") @@ -53,7 +56,7 @@ def test_in_progress_static_partitioned() -> None: # now that partition succeeds state = state.with_runs(run_request("A", partition_key="1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now both partitions fail @@ -64,7 +67,7 @@ def test_in_progress_static_partitioned() -> None: "A", partition_key="2", ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # now both partitions succeed @@ -72,5 +75,5 @@ def test_in_progress_static_partitioned() -> None: run_request("A", partition_key="1"), run_request("A", partition_key="2"), ) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py index 0d375d3e26631..a6f781cb25a50 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -11,39 +12,41 @@ ) -def test_in_progress_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_progress() ) # no run in progress - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # run now in progress state = state.with_in_progress_run_for_asset("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # run completes state = state.with_in_progress_runs_completed() - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_in_progress_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_progress() ).with_asset_properties(partitions_def=two_partitions_def) # no run in progress - state, result = state.evaluate("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now in progress state = state.with_in_progress_run_for_asset("A", partition_key="1") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "1") @@ -51,7 +54,7 @@ def test_in_progress_static_partitioned() -> None: # run completes state = state.with_in_progress_runs_completed() - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now both in progress @@ -62,10 +65,10 @@ def test_in_progress_static_partitioned() -> None: "A", partition_key="2", ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # both runs complete state = state.with_in_progress_runs_completed() - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py index 51b65219c39a7..0f7dfa4eed171 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py @@ -1,5 +1,6 @@ import datetime +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -15,16 +16,18 @@ ) -def test_in_latest_time_window_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 -def test_in_latest_time_window_unpartitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_unpartitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -32,20 +35,22 @@ def test_in_latest_time_window_unpartitioned_lookback() -> None: ), ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 -def test_in_latest_time_window_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 -def test_in_latest_time_window_static_partitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_static_partitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -53,36 +58,38 @@ def test_in_latest_time_window_static_partitioned_lookback() -> None: ), ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 -def test_in_latest_time_window_time_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_time_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ).with_asset_properties(partitions_def=daily_partitions_def) # no partitions exist yet state = state.with_current_time(time_partitions_start_datetime) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 state = state.with_current_time("2020-02-02T01:00:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-01") } state = state.with_current_time_advanced(days=5) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-06") } -def test_in_latest_time_window_time_partitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_time_partitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -92,11 +99,11 @@ def test_in_latest_time_window_time_partitioned_lookback() -> None: # no partitions exist yet state = state.with_current_time(time_partitions_start_datetime) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 state = state.with_current_time("2020-02-07T01:00:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 3 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-06"), @@ -105,7 +112,7 @@ def test_in_latest_time_window_time_partitioned_lookback() -> None: } state = state.with_current_time_advanced(days=5) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 3 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-11"), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py index 81b90b575096b..3e6538c815942 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -12,36 +13,38 @@ ) -def test_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_runs(run_request("A")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_missing_partitioned() -> None: +@pytest.mark.asyncio +async def test_missing_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # same partition materialized again state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_runs(run_request("A", "2")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py index 5ee3f15d88431..d363a8d676fdf 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.operands import NewlyRequestedCondition from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey @@ -13,7 +14,8 @@ ) -def test_requested_previous_tick() -> None: +@pytest.mark.asyncio +async def test_requested_previous_tick() -> None: false_condition, _ = get_hardcoded_condition() hardcoded_condition, true_set = get_hardcoded_condition() state = AutomationConditionScenarioState( @@ -29,24 +31,24 @@ def get_result(result: AutomationResult) -> AutomationResult: return result.child_results[0].child_results[0] # was not requested on the previous tick, as there was no tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 # still was not requested on the previous tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 # now we ensure that the asset does get requested this tick true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") # requested this tick, not the previous tick assert get_result(result).true_subset.size == 0 true_set.remove(AssetKeyPartitionKey(AssetKey("A"))) # requested on the previous tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 1 # requested two ticks ago - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py index b2e378d8a5cc4..f1a329181d247 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -12,47 +13,48 @@ ) -def test_newly_true_condition() -> None: +@pytest.mark.asyncio +async def test_newly_true_condition() -> None: inner_condition, true_set = get_hardcoded_condition() condition = inner_condition.newly_true() state = AutomationConditionScenarioState(one_asset, automation_condition=condition) # nothing true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # becomes true true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # now on the next tick, this asset is no longer newly true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now condition becomes false, result still false true_set.remove(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # becomes true again true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # no longer newly true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py index 712b30c7c22c3..d838401d75a0c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import ( AssetCheckResult, AssetMaterialization, @@ -18,43 +19,45 @@ ) -def test_newly_updated_condition() -> None: +@pytest.mark.asyncio +async def test_newly_updated_condition() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.newly_updated() ) # not updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly updated state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # still not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly updated twice in a row state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_newly_updated_condition_data_version() -> None: +@pytest.mark.asyncio +async def test_newly_updated_condition_data_version() -> None: state = AutomationConditionScenarioState( one_upstream_observable_asset, automation_condition=AutomationCondition.any_deps_match( @@ -63,35 +66,35 @@ def test_newly_updated_condition_data_version() -> None: ) # not updated - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # newly updated state = state.with_reported_observation("A", data_version="1") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same data version, not newly updated state = state.with_reported_observation("A", data_version="1") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new data version state = state.with_reported_observation("A", data_version="2") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # new data version state = state.with_reported_observation("A", data_version="3") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # no new data version - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py index 02d9dddb5b8da..20eeda36c5042 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py @@ -1,5 +1,6 @@ import datetime +import pytest from dagster import ( AssetMaterialization, AutomationCondition, @@ -23,7 +24,8 @@ ) -def test_on_cron_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_on_cron_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.on_cron(cron_schedule="0 * * * *"), @@ -31,17 +33,17 @@ def test_on_cron_unpartitioned() -> None: ).with_current_time("2020-02-02T00:55:00") # no cron boundary crossed - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now crossed a cron boundary parent hasn't updated yet state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent updated, now can execute state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -51,26 +53,27 @@ def test_on_cron_unpartitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again before the hour, so don't execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now a new cron tick, but A still hasn't been materialized since the hour state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again after the hour, so execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 -def test_on_cron_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_on_cron_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -82,22 +85,22 @@ def test_on_cron_hourly_partitioned() -> None: ) # no cron boundary crossed - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now crossed a cron boundary parent hasn't updated yet state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -107,22 +110,22 @@ def test_on_cron_hourly_partitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now a new cron tick, but A still hasn't been materialized since the hour state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized with the previous partition after the hour, but that doesn't matter state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized with the latest partition, fire state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py index 578a3b78bb0f5..c3a96a285c262 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetMaterialization @@ -14,7 +15,8 @@ ) -def test_on_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_on_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.on_missing(), @@ -23,30 +25,30 @@ def test_on_missing_unpartitioned() -> None: # B starts off as materialized state.instance.report_runless_asset_event(AssetMaterialization(asset_key=AssetKey("B"))) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized, now could execute, but B is not missing state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now wipe B so that it is newly missing, should update state.instance.wipe_assets([AssetKey("B")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has not yet materialized, but it has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same as above - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized again, no impact state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now B has been materialized, so really shouldn't execute again @@ -56,16 +58,17 @@ def test_on_missing_unpartitioned() -> None: for ak, pk in result.true_subset.expensively_compute_asset_partitions() ) ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized again, no impact state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_on_missing_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_on_missing_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -77,42 +80,43 @@ def test_on_missing_hourly_partitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 state = state.with_current_time_advanced(hours=1) # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new partition comes into being, parent hasn't been materialized yet state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # latest parent updated again, don't re execute state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_on_missing_without_time_limit() -> None: +@pytest.mark.asyncio +async def test_on_missing_without_time_limit() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -126,7 +130,7 @@ def test_on_missing_without_time_limit() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 state = state.with_current_time_advanced(years=1) @@ -134,9 +138,9 @@ def test_on_missing_without_time_limit() -> None: # historical parents updated, matters state = state.with_runs(run_request("A", "2019-07-05-00:00")) state = state.with_runs(run_request("A", "2019-04-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 2 # B has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py index 43c1a2ed3913b..1f4388c8a8424 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.declarative_automation.operators import SinceCondition from dagster._core.definitions.events import AssetKeyPartitionKey @@ -13,7 +14,8 @@ ) -def test_since_condition_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_since_condition_unpartitioned() -> None: primary_condition, true_set_primary = get_hardcoded_condition() reference_condition, true_set_reference = get_hardcoded_condition() @@ -23,43 +25,43 @@ def test_since_condition_unpartitioned() -> None: state = AutomationConditionScenarioState(one_asset, automation_condition=condition) # nothing true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # primary becomes true, but reference has never been true true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # reference becomes true, and it's after primary true_set_reference.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set_reference.remove(AssetKeyPartitionKey(AssetKey("A"))) # primary becomes true again, and it's since reference has become true true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # remains true on the neprimaryt evaluation - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # primary becomes true again, still doesn't change anything true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # now reference becomes true again true_set_reference.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set_reference.remove(AssetKeyPartitionKey(AssetKey("A"))) # remains false on the neprimaryt evaluation - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py index 0e5f6fd2bfd62..fe1fb3e9a560c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -12,7 +13,8 @@ ) -def test_updated_since_cron_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_updated_since_cron_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.newly_updated().since( @@ -20,29 +22,30 @@ def test_updated_since_cron_unpartitioned() -> None: ), ).with_current_time("2020-02-02T00:55:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now pass a cron tick, still haven't updated since that time state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now A is updated, so have been updated since cron tick state = state.with_runs(run_request("A")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # new cron tick, no longer materialized since it state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_updated_since_cron_partitioned() -> None: +@pytest.mark.asyncio +async def test_updated_since_cron_partitioned() -> None: state = ( AutomationConditionScenarioState( one_asset, @@ -54,43 +57,43 @@ def test_updated_since_cron_partitioned() -> None: .with_current_time("2020-02-02T00:55:00") ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now pass a cron tick, still haven't updated since that time state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # one materialized state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # now both materialized state = state.with_runs(run_request("A", "2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # nothing changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # A 1 materialized again before the hour state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # new hour passes, nothing materialized since then state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # A 2 materialized again after the hour state = state.with_runs(run_request("A", "2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # nothing changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py index 815c31b71d507..76f1765a1ef1f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AssetKey, AutomationCondition from dagster._core.definitions.events import AssetKeyPartitionKey @@ -10,33 +11,35 @@ ) -def test_will_be_requested_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_unpartitioned() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState(two_assets_in_sequence, automation_condition=condition) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent is requested state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"))]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 -def test_will_be_requested_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_static_partitioned() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=condition ).with_asset_properties(partitions_def=two_partitions_def) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # one requested parent state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"), "1")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("B"), "1") @@ -46,28 +49,29 @@ def test_will_be_requested_static_partitioned() -> None: state = state.with_requested_asset_partitions( [AssetKeyPartitionKey(AssetKey("A"), "1"), AssetKeyPartitionKey(AssetKey("A"), "2")] ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 2 -def test_will_be_requested_different_partitions() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_different_partitions() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=condition ).with_asset_properties("A", partitions_def=two_partitions_def) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # one requested parent, but can't execute in same run state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"), "1")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # two requested parents, but can't execute in same run state = state.with_requested_asset_partitions( [AssetKeyPartitionKey(AssetKey("A"), "1"), AssetKeyPartitionKey(AssetKey("A"), "2")] ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py index 2f9096bdc1ae8..1486bfa1bec55 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py @@ -23,31 +23,33 @@ ) -def test_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 original_value_hash = result.value_hash # still true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.value_hash == original_value_hash # after a run of A it's now False - state, result = state.with_runs(run_request("A")).evaluate("A") + state, result = await state.with_runs(run_request("A")).evaluate("A") assert result.true_subset.size == 0 assert result.value_hash != original_value_hash # if we evaluate from scratch, it's also False - _, result = state.without_cursor().evaluate("A") + _, result = await state.without_cursor().evaluate("A") assert result.true_subset.size == 0 -def test_missing_time_partitioned() -> None: +@pytest.mark.asyncio +async def test_missing_time_partitioned() -> None: state = ( AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() @@ -57,22 +59,22 @@ def test_missing_time_partitioned() -> None: .with_current_time_advanced(days=6, minutes=1) ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 6 # still true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 6 # after two runs of A those partitions are now False - state, result = state.with_runs( + state, result = await state.with_runs( run_request("A", day_partition_key(time_partitions_start_datetime, 1)), run_request("A", day_partition_key(time_partitions_start_datetime, 3)), ).evaluate("A") assert result.true_subset.size == 4 # if we evaluate from scratch, they're still False - _, result = state.without_cursor().evaluate("A") + _, result = await state.without_cursor().evaluate("A") assert result.true_subset.size == 4 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py index a677bb327f3c7..d6c13b875a9f0 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py @@ -26,6 +26,7 @@ two_parents_daily = two_parents.with_asset_properties(partitions_def=daily_partitions) +@pytest.mark.asyncio @pytest.mark.parametrize( ["expected_value_hash", "condition", "scenario_spec", "materialize_A"], [ @@ -51,16 +52,16 @@ ("7f852ab7408c67e0830530d025505a37", SC.missing(), one_parent_daily, False), ], ) -def test_value_hash( +async def test_value_hash( condition: SC, scenario_spec: ScenarioSpec, expected_value_hash: str, materialize_A: bool ) -> None: state = AutomationConditionScenarioState( scenario_spec, automation_condition=condition ).with_current_time("2024-01-01T00:00") - state, _ = state.evaluate("downstream") + state, _ = await state.evaluate("downstream") if materialize_A: state = state.with_runs(run_request("A")) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.value_hash == expected_value_hash diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py index 38c7bf26f4fc9..ffaf223bc3e41 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py @@ -64,7 +64,7 @@ def _get_request_subsets_by_key( for asset_key, aps in ap_by_key.items() } - def evaluate( + async def evaluate( self, asset: CoercibleToAssetKey ) -> Tuple["AutomationConditionScenarioState", AutomationResult]: asset_key = AssetKey.from_coercible(asset) @@ -100,7 +100,7 @@ def evaluate( ) # type: ignore context = AutomationContext.create(key=asset_key, evaluator=evaluator) - full_result = asset_condition.evaluate(context) + full_result = await asset_condition.evaluate(context) # type: ignore new_state = dataclasses.replace(self, condition_cursor=full_result.get_new_cursor()) result = full_result.child_results[0] if self.ensure_empty_result else full_result