diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py index 71ce3823fad90..de0890fe8653b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_check_condition.py @@ -65,9 +65,10 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: return HardcodedCondition(), true_set +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) @pytest.mark.parametrize("blocking_only", [True, False]) -def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: +async def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_checks_match(inner_condition, blocking_only=blocking_only) @@ -79,25 +80,26 @@ def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None: ).with_asset_properties(partitions_def=two_partitions_def) # no checks true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set.add(AssetCheckKey(AssetKey("A"), "a1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") if is_any: assert result.true_subset.size == 2 else: assert result.true_subset.size == (2 if blocking_only else 0) true_set.add(AssetCheckKey(AssetKey("A"), "a2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") if is_any: assert result.true_subset.size == 2 else: assert result.true_subset.size == 2 -def test_any_checks_match_basic() -> None: +@pytest.mark.asyncio +async def test_any_checks_match_basic() -> None: # always true true_condition = AutomationCondition.cron_tick_passed( "* * * * *" @@ -110,11 +112,11 @@ def test_any_checks_match_basic() -> None: state = AutomationConditionScenarioState(downstream_of_check, automation_condition=condition) # there is an upstream check for C - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 1 # there is no upstream check for D - state, result = state.evaluate("D") + state, result = await state.evaluate("D") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py index f0ac8c46fb295..4a8525c5be073 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_code_version_changed_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -8,38 +9,39 @@ ) -def test_code_version_changed_condition() -> None: +@pytest.mark.asyncio +async def test_code_version_changed_condition() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.code_version_changed() ).with_asset_properties(code_version="1") # not changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # still not changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly changed state = state.with_asset_properties(code_version="2") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly changed state = state.with_asset_properties(code_version="3") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # newly changed state = state.with_asset_properties(code_version="2") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py index 150581699518d..d9a2d95b877e9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_dep_condition.py @@ -49,8 +49,9 @@ def evaluate(self, context: AutomationContext) -> AutomationResult: return HardcodedCondition(), true_set +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) -def test_dep_missing_unpartitioned(is_any: bool) -> None: +async def test_dep_missing_unpartitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_deps_match(inner_condition) @@ -62,12 +63,12 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None: ) # neither parent is true - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 0 # one parent true, still one false true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 1 else: @@ -75,12 +76,13 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None: # both parents true true_set.add(AssetKeyPartitionKey(AssetKey("B"))) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 1 +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) -def test_dep_missing_partitioned(is_any: bool) -> None: +async def test_dep_missing_partitioned(is_any: bool) -> None: inner_condition, true_set = get_hardcoded_condition() condition = ( AutomationCondition.any_deps_match(inner_condition) @@ -92,11 +94,11 @@ def test_dep_missing_partitioned(is_any: bool) -> None: ).with_asset_properties(partitions_def=two_partitions_def) # no parents true - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("A"), "1")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: # one parent is true for partition 1 assert result.true_subset.size == 1 @@ -105,7 +107,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("A"), "2")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: # both partitions 1 and 2 have at least one true parent assert result.true_subset.size == 2 @@ -114,7 +116,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 0 true_set.add(AssetKeyPartitionKey(AssetKey("B"), "1")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 2 else: @@ -122,7 +124,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 1 true_set.add(AssetKeyPartitionKey(AssetKey("B"), "2")) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") if is_any: assert result.true_subset.size == 2 else: @@ -130,6 +132,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: assert result.true_subset.size == 2 +@pytest.mark.asyncio @pytest.mark.parametrize("is_any", [True, False]) @pytest.mark.parametrize("is_include", [True, False]) @pytest.mark.parametrize( @@ -143,7 +146,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None: (2, ["B1", "B2"], 2), ], ) -def test_dep_missing_partitioned_selections( +async def test_dep_missing_partitioned_selections( is_any: bool, is_include: bool, expected_initial_result_size: int, @@ -166,10 +169,10 @@ def test_dep_missing_partitioned_selections( one_asset_depends_on_two, automation_condition=condition ).with_asset_properties(partitions_def=two_partitions_def) # all parents are missing - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == expected_initial_result_size state = state.with_runs(*(run_request(s[0], s[1]) for s in materialized_asset_partitions)) - state, result = state.evaluate("C") + state, result = await state.evaluate("C") assert result.true_subset.size == expected_final_result_size @@ -185,7 +188,8 @@ def test_dep_missing_partitioned_selections( ) -def test_dep_missing_complex_include() -> None: +@pytest.mark.asyncio +async def test_dep_missing_complex_include() -> None: # true if any dependencies within the "bar" group are missing, or "A" is missing condition = AutomationCondition.any_deps_match( AutomationCondition.missing(), @@ -193,21 +197,22 @@ def test_dep_missing_complex_include() -> None: state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition) # all start off as missing - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # A materialized, D and E still missing state = state.with_runs(run_request(["A"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # D and E materialized, and all the other missing things are in the exclude selection state = state.with_runs(run_request(["D", "E"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 0 -def test_dep_missing_complex_exclude() -> None: +@pytest.mark.asyncio +async def test_dep_missing_complex_exclude() -> None: # true if any dependencies are missing, ignoring A and anything in the "bar" group condition = AutomationCondition.any_deps_match( AutomationCondition.missing(), @@ -215,15 +220,15 @@ def test_dep_missing_complex_exclude() -> None: state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition) # all start off as missing - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # B materialized, C still missing state = state.with_runs(run_request(["B"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 1 # C materialized, and all the other missing things are in the exclude selection state = state.with_runs(run_request(["C"])) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py index 6cae4d24ed374..163c6e2d213bb 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_eager_condition.py @@ -31,7 +31,8 @@ ) -def test_eager_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_eager_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.eager(), @@ -39,20 +40,20 @@ def test_eager_unpartitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent updated, now can execute state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has not yet materialized, but it has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same as above - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now B has been materialized, so really shouldn't execute again @@ -62,22 +63,23 @@ def test_eager_unpartitioned() -> None: for ak, pk in result.true_subset.expensively_compute_asset_partitions() ) ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again before the hour, execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # however, B fails state = state.with_failed_run_for_asset("B") # do not try to materialize B again immediately - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_eager_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_eager_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -89,17 +91,17 @@ def test_eager_hourly_partitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -109,23 +111,23 @@ def test_eager_hourly_partitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new partition comes into being, parent hasn't been materialized yet state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent gets materialized, B requested state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # but it fails state = state.with_failed_run_for_asset("B", "2020-02-02-01:00") # B does not get immediately requested again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py index 9f20b4eb9b2b5..62c52c2247b92 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_failed_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -14,38 +15,40 @@ ) -def test_failed_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_failed_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.execution_failed() ) # no failed partitions - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now a partition fails state = state.with_failed_run_for_asset("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # the next run completes successfully state = state.with_runs(run_request("A")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_in_progress_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.execution_failed() ).with_asset_properties(partitions_def=two_partitions_def) # no failed_runs - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now one partition fails state = state.with_failed_run_for_asset("A", partition_key="1") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "1") @@ -53,7 +56,7 @@ def test_in_progress_static_partitioned() -> None: # now that partition succeeds state = state.with_runs(run_request("A", partition_key="1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now both partitions fail @@ -64,7 +67,7 @@ def test_in_progress_static_partitioned() -> None: "A", partition_key="2", ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # now both partitions succeed @@ -72,5 +75,5 @@ def test_in_progress_static_partitioned() -> None: run_request("A", partition_key="1"), run_request("A", partition_key="2"), ) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py index 0d375d3e26631..a6f781cb25a50 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_in_progress_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -11,39 +12,41 @@ ) -def test_in_progress_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_progress() ) # no run in progress - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # run now in progress state = state.with_in_progress_run_for_asset("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # run completes state = state.with_in_progress_runs_completed() - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_in_progress_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_progress_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_progress() ).with_asset_properties(partitions_def=two_partitions_def) # no run in progress - state, result = state.evaluate("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now in progress state = state.with_in_progress_run_for_asset("A", partition_key="1") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "1") @@ -51,7 +54,7 @@ def test_in_progress_static_partitioned() -> None: # run completes state = state.with_in_progress_runs_completed() - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now both in progress @@ -62,10 +65,10 @@ def test_in_progress_static_partitioned() -> None: "A", partition_key="2", ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # both runs complete state = state.with_in_progress_runs_completed() - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py index 51b65219c39a7..0f7dfa4eed171 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_latest_time_window_condition.py @@ -1,5 +1,6 @@ import datetime +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -15,16 +16,18 @@ ) -def test_in_latest_time_window_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 -def test_in_latest_time_window_unpartitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_unpartitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -32,20 +35,22 @@ def test_in_latest_time_window_unpartitioned_lookback() -> None: ), ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 -def test_in_latest_time_window_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_static_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 -def test_in_latest_time_window_static_partitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_static_partitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -53,36 +58,38 @@ def test_in_latest_time_window_static_partitioned_lookback() -> None: ), ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 -def test_in_latest_time_window_time_partitioned() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_time_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window() ).with_asset_properties(partitions_def=daily_partitions_def) # no partitions exist yet state = state.with_current_time(time_partitions_start_datetime) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 state = state.with_current_time("2020-02-02T01:00:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-01") } state = state.with_current_time_advanced(days=5) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-06") } -def test_in_latest_time_window_time_partitioned_lookback() -> None: +@pytest.mark.asyncio +async def test_in_latest_time_window_time_partitioned_lookback() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.in_latest_time_window( @@ -92,11 +99,11 @@ def test_in_latest_time_window_time_partitioned_lookback() -> None: # no partitions exist yet state = state.with_current_time(time_partitions_start_datetime) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 state = state.with_current_time("2020-02-07T01:00:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 3 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-06"), @@ -105,7 +112,7 @@ def test_in_latest_time_window_time_partitioned_lookback() -> None: } state = state.with_current_time_advanced(days=5) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 3 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("A"), "2020-02-11"), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py index 81b90b575096b..3e6538c815942 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_missing_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -12,36 +13,38 @@ ) -def test_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_runs(run_request("A")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_missing_partitioned() -> None: +@pytest.mark.asyncio +async def test_missing_partitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ).with_asset_properties(partitions_def=two_partitions_def) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # same partition materialized again state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_runs(run_request("A", "2")) - _, result = state.evaluate("A") + _, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py index 5ee3f15d88431..d363a8d676fdf 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_requested_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult from dagster._core.definitions.declarative_automation.operands import NewlyRequestedCondition from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey @@ -13,7 +14,8 @@ ) -def test_requested_previous_tick() -> None: +@pytest.mark.asyncio +async def test_requested_previous_tick() -> None: false_condition, _ = get_hardcoded_condition() hardcoded_condition, true_set = get_hardcoded_condition() state = AutomationConditionScenarioState( @@ -29,24 +31,24 @@ def get_result(result: AutomationResult) -> AutomationResult: return result.child_results[0].child_results[0] # was not requested on the previous tick, as there was no tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 # still was not requested on the previous tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 # now we ensure that the asset does get requested this tick true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") # requested this tick, not the previous tick assert get_result(result).true_subset.size == 0 true_set.remove(AssetKeyPartitionKey(AssetKey("A"))) # requested on the previous tick - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 1 # requested two ticks ago - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert get_result(result).true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py index b2e378d8a5cc4..f1a329181d247 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_true_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetKeyPartitionKey @@ -12,47 +13,48 @@ ) -def test_newly_true_condition() -> None: +@pytest.mark.asyncio +async def test_newly_true_condition() -> None: inner_condition, true_set = get_hardcoded_condition() condition = inner_condition.newly_true() state = AutomationConditionScenarioState(one_asset, automation_condition=condition) # nothing true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # becomes true true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # now on the next tick, this asset is no longer newly true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now condition becomes false, result still false true_set.remove(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # see above - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # becomes true again true_set.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # no longer newly true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py index 712b30c7c22c3..d838401d75a0c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_newly_updated_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import ( AssetCheckResult, AssetMaterialization, @@ -18,43 +19,45 @@ ) -def test_newly_updated_condition() -> None: +@pytest.mark.asyncio +async def test_newly_updated_condition() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.newly_updated() ) # not updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly updated state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # still not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # newly updated twice in a row state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 state = state.with_reported_materialization("A") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_newly_updated_condition_data_version() -> None: +@pytest.mark.asyncio +async def test_newly_updated_condition_data_version() -> None: state = AutomationConditionScenarioState( one_upstream_observable_asset, automation_condition=AutomationCondition.any_deps_match( @@ -63,35 +66,35 @@ def test_newly_updated_condition_data_version() -> None: ) # not updated - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # newly updated state = state.with_reported_observation("A", data_version="1") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # not newly updated - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same data version, not newly updated state = state.with_reported_observation("A", data_version="1") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new data version state = state.with_reported_observation("A", data_version="2") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # new data version state = state.with_reported_observation("A", data_version="3") - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # no new data version - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py index 02d9dddb5b8da..20eeda36c5042 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_cron_condition.py @@ -1,5 +1,6 @@ import datetime +import pytest from dagster import ( AssetMaterialization, AutomationCondition, @@ -23,7 +24,8 @@ ) -def test_on_cron_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_on_cron_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.on_cron(cron_schedule="0 * * * *"), @@ -31,17 +33,17 @@ def test_on_cron_unpartitioned() -> None: ).with_current_time("2020-02-02T00:55:00") # no cron boundary crossed - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now crossed a cron boundary parent hasn't updated yet state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent updated, now can execute state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -51,26 +53,27 @@ def test_on_cron_unpartitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again before the hour, so don't execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now a new cron tick, but A still hasn't been materialized since the hour state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized again after the hour, so execute B again state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 -def test_on_cron_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_on_cron_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -82,22 +85,22 @@ def test_on_cron_hourly_partitioned() -> None: ) # no cron boundary crossed - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now crossed a cron boundary parent hasn't updated yet state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 state = state.with_runs( *( @@ -107,22 +110,22 @@ def test_on_cron_hourly_partitioned() -> None: ) # now B has been materialized, so don't execute again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now a new cron tick, but A still hasn't been materialized since the hour state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized with the previous partition after the hour, but that doesn't matter state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # A gets materialized with the latest partition, fire state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py index 578a3b78bb0f5..c3a96a285c262 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_on_missing_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.events import AssetMaterialization @@ -14,7 +15,8 @@ ) -def test_on_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_on_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=AutomationCondition.on_missing(), @@ -23,30 +25,30 @@ def test_on_missing_unpartitioned() -> None: # B starts off as materialized state.instance.report_runless_asset_event(AssetMaterialization(asset_key=AssetKey("B"))) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized, now could execute, but B is not missing state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now wipe B so that it is newly missing, should update state.instance.wipe_assets([AssetKey("B")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has not yet materialized, but it has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # same as above - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized again, no impact state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # now B has been materialized, so really shouldn't execute again @@ -56,16 +58,17 @@ def test_on_missing_unpartitioned() -> None: for ak, pk in result.true_subset.expensively_compute_asset_partitions() ) ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent materialized again, no impact state = state.with_runs(run_request("A")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_on_missing_hourly_partitioned() -> None: +@pytest.mark.asyncio +async def test_on_missing_hourly_partitioned() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -77,42 +80,43 @@ def test_on_missing_hourly_partitioned() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 state = state.with_current_time_advanced(hours=1) # historical parent updated, doesn't matter state = state.with_runs(run_request("A", "2019-07-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # B has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # new partition comes into being, parent hasn't been materialized yet state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # latest parent updated, now can execute state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 # latest parent updated again, don't re execute state = state.with_runs(run_request("A", "2020-02-02-01:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 -def test_on_missing_without_time_limit() -> None: +@pytest.mark.asyncio +async def test_on_missing_without_time_limit() -> None: state = ( AutomationConditionScenarioState( two_assets_in_sequence, @@ -126,7 +130,7 @@ def test_on_missing_without_time_limit() -> None: ) # parent hasn't updated yet - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 state = state.with_current_time_advanced(years=1) @@ -134,9 +138,9 @@ def test_on_missing_without_time_limit() -> None: # historical parents updated, matters state = state.with_runs(run_request("A", "2019-07-05-00:00")) state = state.with_runs(run_request("A", "2019-04-05-00:00")) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 2 # B has been requested, so don't request again - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py index 43c1a2ed3913b..1f4388c8a8424 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_since_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.declarative_automation.operators import SinceCondition from dagster._core.definitions.events import AssetKeyPartitionKey @@ -13,7 +14,8 @@ ) -def test_since_condition_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_since_condition_unpartitioned() -> None: primary_condition, true_set_primary = get_hardcoded_condition() reference_condition, true_set_reference = get_hardcoded_condition() @@ -23,43 +25,43 @@ def test_since_condition_unpartitioned() -> None: state = AutomationConditionScenarioState(one_asset, automation_condition=condition) # nothing true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # primary becomes true, but reference has never been true true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # reference becomes true, and it's after primary true_set_reference.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set_reference.remove(AssetKeyPartitionKey(AssetKey("A"))) # primary becomes true again, and it's since reference has become true true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # remains true on the neprimaryt evaluation - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # primary becomes true again, still doesn't change anything true_set_primary.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 true_set_primary.remove(AssetKeyPartitionKey(AssetKey("A"))) # now reference becomes true again true_set_reference.add(AssetKeyPartitionKey(AssetKey("A"))) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 true_set_reference.remove(AssetKeyPartitionKey(AssetKey("A"))) # remains false on the neprimaryt evaluation - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py index 0e5f6fd2bfd62..fe1fb3e9a560c 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_updated_since_cron_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AutomationCondition from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import ( @@ -12,7 +13,8 @@ ) -def test_updated_since_cron_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_updated_since_cron_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.newly_updated().since( @@ -20,29 +22,30 @@ def test_updated_since_cron_unpartitioned() -> None: ), ).with_current_time("2020-02-02T00:55:00") - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now pass a cron tick, still haven't updated since that time state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now A is updated, so have been updated since cron tick state = state.with_runs(run_request("A")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # new cron tick, no longer materialized since it state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 -def test_updated_since_cron_partitioned() -> None: +@pytest.mark.asyncio +async def test_updated_since_cron_partitioned() -> None: state = ( AutomationConditionScenarioState( one_asset, @@ -54,43 +57,43 @@ def test_updated_since_cron_partitioned() -> None: .with_current_time("2020-02-02T00:55:00") ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # now pass a cron tick, still haven't updated since that time state = state.with_current_time_advanced(minutes=10) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # one materialized state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # now both materialized state = state.with_runs(run_request("A", "2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # nothing changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # A 1 materialized again before the hour state = state.with_runs(run_request("A", "1")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 2 # new hour passes, nothing materialized since then state = state.with_current_time_advanced(hours=1) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 0 # A 2 materialized again after the hour state = state.with_runs(run_request("A", "2")) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 # nothing changed - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py index 815c31b71d507..76f1765a1ef1f 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/builtins/test_will_be_requested_condition.py @@ -1,3 +1,4 @@ +import pytest from dagster import AssetKey, AutomationCondition from dagster._core.definitions.events import AssetKeyPartitionKey @@ -10,33 +11,35 @@ ) -def test_will_be_requested_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_unpartitioned() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState(two_assets_in_sequence, automation_condition=condition) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # parent is requested state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"))]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 -def test_will_be_requested_static_partitioned() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_static_partitioned() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=condition ).with_asset_properties(partitions_def=two_partitions_def) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # one requested parent state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"), "1")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 1 assert result.true_subset.expensively_compute_asset_partitions() == { AssetKeyPartitionKey(AssetKey("B"), "1") @@ -46,28 +49,29 @@ def test_will_be_requested_static_partitioned() -> None: state = state.with_requested_asset_partitions( [AssetKeyPartitionKey(AssetKey("A"), "1"), AssetKeyPartitionKey(AssetKey("A"), "2")] ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 2 -def test_will_be_requested_different_partitions() -> None: +@pytest.mark.asyncio +async def test_will_be_requested_different_partitions() -> None: condition = AutomationCondition.any_deps_match(AutomationCondition.will_be_requested()) state = AutomationConditionScenarioState( two_assets_in_sequence, automation_condition=condition ).with_asset_properties("A", partitions_def=two_partitions_def) # no requested parents - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # one requested parent, but can't execute in same run state = state.with_requested_asset_partitions([AssetKeyPartitionKey(AssetKey("A"), "1")]) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 # two requested parents, but can't execute in same run state = state.with_requested_asset_partitions( [AssetKeyPartitionKey(AssetKey("A"), "1"), AssetKeyPartitionKey(AssetKey("A"), "2")] ) - state, result = state.evaluate("B") + state, result = await state.evaluate("B") assert result.true_subset.size == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py index 2f9096bdc1ae8..1486bfa1bec55 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py @@ -23,31 +23,33 @@ ) -def test_missing_unpartitioned() -> None: +@pytest.mark.asyncio +async def test_missing_unpartitioned() -> None: state = AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 original_value_hash = result.value_hash # still true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 1 assert result.value_hash == original_value_hash # after a run of A it's now False - state, result = state.with_runs(run_request("A")).evaluate("A") + state, result = await state.with_runs(run_request("A")).evaluate("A") assert result.true_subset.size == 0 assert result.value_hash != original_value_hash # if we evaluate from scratch, it's also False - _, result = state.without_cursor().evaluate("A") + _, result = await state.without_cursor().evaluate("A") assert result.true_subset.size == 0 -def test_missing_time_partitioned() -> None: +@pytest.mark.asyncio +async def test_missing_time_partitioned() -> None: state = ( AutomationConditionScenarioState( one_asset, automation_condition=AutomationCondition.missing() @@ -57,22 +59,22 @@ def test_missing_time_partitioned() -> None: .with_current_time_advanced(days=6, minutes=1) ) - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 6 # still true - state, result = state.evaluate("A") + state, result = await state.evaluate("A") assert result.true_subset.size == 6 # after two runs of A those partitions are now False - state, result = state.with_runs( + state, result = await state.with_runs( run_request("A", day_partition_key(time_partitions_start_datetime, 1)), run_request("A", day_partition_key(time_partitions_start_datetime, 3)), ).evaluate("A") assert result.true_subset.size == 4 # if we evaluate from scratch, they're still False - _, result = state.without_cursor().evaluate("A") + _, result = await state.without_cursor().evaluate("A") assert result.true_subset.size == 4 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py index a677bb327f3c7..d6c13b875a9f0 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_result_value_hash.py @@ -26,6 +26,7 @@ two_parents_daily = two_parents.with_asset_properties(partitions_def=daily_partitions) +@pytest.mark.asyncio @pytest.mark.parametrize( ["expected_value_hash", "condition", "scenario_spec", "materialize_A"], [ @@ -51,16 +52,16 @@ ("7f852ab7408c67e0830530d025505a37", SC.missing(), one_parent_daily, False), ], ) -def test_value_hash( +async def test_value_hash( condition: SC, scenario_spec: ScenarioSpec, expected_value_hash: str, materialize_A: bool ) -> None: state = AutomationConditionScenarioState( scenario_spec, automation_condition=condition ).with_current_time("2024-01-01T00:00") - state, _ = state.evaluate("downstream") + state, _ = await state.evaluate("downstream") if materialize_A: state = state.with_runs(run_request("A")) - state, result = state.evaluate("downstream") + state, result = await state.evaluate("downstream") assert result.value_hash == expected_value_hash diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py index 38c7bf26f4fc9..ffaf223bc3e41 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/scenario_utils/automation_condition_scenario.py @@ -64,7 +64,7 @@ def _get_request_subsets_by_key( for asset_key, aps in ap_by_key.items() } - def evaluate( + async def evaluate( self, asset: CoercibleToAssetKey ) -> Tuple["AutomationConditionScenarioState", AutomationResult]: asset_key = AssetKey.from_coercible(asset) @@ -100,7 +100,7 @@ def evaluate( ) # type: ignore context = AutomationContext.create(key=asset_key, evaluator=evaluator) - full_result = asset_condition.evaluate(context) + full_result = await asset_condition.evaluate(context) # type: ignore new_state = dataclasses.replace(self, condition_cursor=full_result.get_new_cursor()) result = full_result.child_results[0] if self.ensure_empty_result else full_result