Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 22, 2024
1 parent b9e7487 commit 346ee2b
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ def evaluate(self, context: AutomationContext) -> AutomationResult:
return HardcodedCondition(), true_set


@pytest.mark.asyncio
@pytest.mark.parametrize("is_any", [True, False])
@pytest.mark.parametrize("blocking_only", [True, False])
def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None:
async def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None:
inner_condition, true_set = get_hardcoded_condition()
condition = (
AutomationCondition.any_checks_match(inner_condition, blocking_only=blocking_only)
Expand All @@ -79,25 +80,26 @@ def test_check_operators_partitioned(is_any: bool, blocking_only: bool) -> None:
).with_asset_properties(partitions_def=two_partitions_def)

# no checks true
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 0

true_set.add(AssetCheckKey(AssetKey("A"), "a1"))
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
if is_any:
assert result.true_subset.size == 2
else:
assert result.true_subset.size == (2 if blocking_only else 0)

true_set.add(AssetCheckKey(AssetKey("A"), "a2"))
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
if is_any:
assert result.true_subset.size == 2
else:
assert result.true_subset.size == 2


def test_any_checks_match_basic() -> None:
@pytest.mark.asyncio
async def test_any_checks_match_basic() -> None:
# always true
true_condition = AutomationCondition.cron_tick_passed(
"* * * * *"
Expand All @@ -110,11 +112,11 @@ def test_any_checks_match_basic() -> None:
state = AutomationConditionScenarioState(downstream_of_check, automation_condition=condition)

# there is an upstream check for C
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == 1

# there is no upstream check for D
state, result = state.evaluate("D")
state, result = await state.evaluate("D")
assert result.true_subset.size == 0


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from dagster import AutomationCondition

from dagster_tests.definitions_tests.declarative_automation_tests.scenario_utils.automation_condition_scenario import (
Expand All @@ -8,38 +9,39 @@
)


def test_code_version_changed_condition() -> None:
@pytest.mark.asyncio
async def test_code_version_changed_condition() -> None:
state = AutomationConditionScenarioState(
one_asset, automation_condition=AutomationCondition.code_version_changed()
).with_asset_properties(code_version="1")

# not changed
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 0

# still not changed
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 0

# newly changed
state = state.with_asset_properties(code_version="2")
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 1

# not newly changed
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 0

# newly changed
state = state.with_asset_properties(code_version="3")
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 1

# newly changed
state = state.with_asset_properties(code_version="2")
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 1

# not newly changed
state, result = state.evaluate("A")
state, result = await state.evaluate("A")
assert result.true_subset.size == 0
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def evaluate(self, context: AutomationContext) -> AutomationResult:
return HardcodedCondition(), true_set


@pytest.mark.asyncio
@pytest.mark.parametrize("is_any", [True, False])
def test_dep_missing_unpartitioned(is_any: bool) -> None:
async def test_dep_missing_unpartitioned(is_any: bool) -> None:
inner_condition, true_set = get_hardcoded_condition()
condition = (
AutomationCondition.any_deps_match(inner_condition)
Expand All @@ -62,25 +63,26 @@ def test_dep_missing_unpartitioned(is_any: bool) -> None:
)

# neither parent is true
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == 0

# one parent true, still one false
true_set.add(AssetKeyPartitionKey(AssetKey("A")))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
if is_any:
assert result.true_subset.size == 1
else:
assert result.true_subset.size == 0

# both parents true
true_set.add(AssetKeyPartitionKey(AssetKey("B")))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == 1


@pytest.mark.asyncio
@pytest.mark.parametrize("is_any", [True, False])
def test_dep_missing_partitioned(is_any: bool) -> None:
async def test_dep_missing_partitioned(is_any: bool) -> None:
inner_condition, true_set = get_hardcoded_condition()
condition = (
AutomationCondition.any_deps_match(inner_condition)
Expand All @@ -92,11 +94,11 @@ def test_dep_missing_partitioned(is_any: bool) -> None:
).with_asset_properties(partitions_def=two_partitions_def)

# no parents true
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == 0

true_set.add(AssetKeyPartitionKey(AssetKey("A"), "1"))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
if is_any:
# one parent is true for partition 1
assert result.true_subset.size == 1
Expand All @@ -105,7 +107,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None:
assert result.true_subset.size == 0

true_set.add(AssetKeyPartitionKey(AssetKey("A"), "2"))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
if is_any:
# both partitions 1 and 2 have at least one true parent
assert result.true_subset.size == 2
Expand All @@ -114,22 +116,23 @@ def test_dep_missing_partitioned(is_any: bool) -> None:
assert result.true_subset.size == 0

true_set.add(AssetKeyPartitionKey(AssetKey("B"), "1"))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
if is_any:
assert result.true_subset.size == 2
else:
# now partition 1 has all parents true
assert result.true_subset.size == 1

true_set.add(AssetKeyPartitionKey(AssetKey("B"), "2"))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
if is_any:
assert result.true_subset.size == 2
else:
# now partition 2 has all parents true
assert result.true_subset.size == 2


@pytest.mark.asyncio
@pytest.mark.parametrize("is_any", [True, False])
@pytest.mark.parametrize("is_include", [True, False])
@pytest.mark.parametrize(
Expand All @@ -143,7 +146,7 @@ def test_dep_missing_partitioned(is_any: bool) -> None:
(2, ["B1", "B2"], 2),
],
)
def test_dep_missing_partitioned_selections(
async def test_dep_missing_partitioned_selections(
is_any: bool,
is_include: bool,
expected_initial_result_size: int,
Expand All @@ -166,10 +169,10 @@ def test_dep_missing_partitioned_selections(
one_asset_depends_on_two, automation_condition=condition
).with_asset_properties(partitions_def=two_partitions_def)
# all parents are missing
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == expected_initial_result_size
state = state.with_runs(*(run_request(s[0], s[1]) for s in materialized_asset_partitions))
state, result = state.evaluate("C")
state, result = await state.evaluate("C")
assert result.true_subset.size == expected_final_result_size


Expand All @@ -185,45 +188,47 @@ def test_dep_missing_partitioned_selections(
)


def test_dep_missing_complex_include() -> None:
@pytest.mark.asyncio
async def test_dep_missing_complex_include() -> None:
# true if any dependencies within the "bar" group are missing, or "A" is missing
condition = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).allow(AssetSelection.keys("A") | AssetSelection.groups("bar"))
state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition)

# all start off as missing
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 1

# A materialized, D and E still missing
state = state.with_runs(run_request(["A"]))
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 1

# D and E materialized, and all the other missing things are in the exclude selection
state = state.with_runs(run_request(["D", "E"]))
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 0


def test_dep_missing_complex_exclude() -> None:
@pytest.mark.asyncio
async def test_dep_missing_complex_exclude() -> None:
# true if any dependencies are missing, ignoring A and anything in the "bar" group
condition = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).ignore(AssetSelection.keys("A") | AssetSelection.groups("bar"))
state = AutomationConditionScenarioState(complex_scenario_spec, automation_condition=condition)

# all start off as missing
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 1

# B materialized, C still missing
state = state.with_runs(run_request(["B"]))
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 1

# C materialized, and all the other missing things are in the exclude selection
state = state.with_runs(run_request(["C"]))
state, result = state.evaluate("downstream")
state, result = await state.evaluate("downstream")
assert result.true_subset.size == 0
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,29 @@
)


def test_eager_unpartitioned() -> None:
@pytest.mark.asyncio
async def test_eager_unpartitioned() -> None:
state = AutomationConditionScenarioState(
two_assets_in_sequence,
automation_condition=AutomationCondition.eager(),
ensure_empty_result=False,
)

# parent hasn't updated yet
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# parent updated, now can execute
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 1

# B has not yet materialized, but it has been requested, so don't request again
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# same as above
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# now B has been materialized, so really shouldn't execute again
Expand All @@ -62,22 +63,23 @@ def test_eager_unpartitioned() -> None:
for ak, pk in result.true_subset.expensively_compute_asset_partitions()
)
)
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# A gets materialized again before the hour, execute B again
state = state.with_runs(run_request("A"))
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 1
# however, B fails
state = state.with_failed_run_for_asset("B")

# do not try to materialize B again immediately
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0


def test_eager_hourly_partitioned() -> None:
@pytest.mark.asyncio
async def test_eager_hourly_partitioned() -> None:
state = (
AutomationConditionScenarioState(
two_assets_in_sequence,
Expand All @@ -89,17 +91,17 @@ def test_eager_hourly_partitioned() -> None:
)

# parent hasn't updated yet
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# historical parent updated, doesn't matter
state = state.with_runs(run_request("A", "2019-07-05-00:00"))
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# latest parent updated, now can execute
state = state.with_runs(run_request("A", "2020-02-02-00:00"))
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 1
state = state.with_runs(
*(
Expand All @@ -109,23 +111,23 @@ def test_eager_hourly_partitioned() -> None:
)

# now B has been materialized, so don't execute again
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# new partition comes into being, parent hasn't been materialized yet
state = state.with_current_time_advanced(hours=1)
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0

# parent gets materialized, B requested
state = state.with_runs(run_request("A", "2020-02-02-01:00"))
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 1
# but it fails
state = state.with_failed_run_for_asset("B", "2020-02-02-01:00")

# B does not get immediately requested again
state, result = state.evaluate("B")
state, result = await state.evaluate("B")
assert result.true_subset.size == 0


Expand Down
Loading

0 comments on commit 346ee2b

Please sign in to comment.