Skip to content

Commit

Permalink
new parititon key test
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Dec 31, 2024
1 parent 85dbf52 commit 0362640
Showing 1 changed file with 179 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
from dagster import AssetKey, AssetSpec, Definitions, asset, multi_asset
from typing import cast

from dagster import (
AssetDep,
AssetKey,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
TimeWindow,
TimeWindowPartitionMapping,
TimeWindowPartitionsDefinition,
asset,
multi_asset,
)
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView
from dagster._core.asset_graph_view.bfs import (
AssetGraphViewBfsFilterConditionResult,
Expand All @@ -7,6 +21,7 @@
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._time import create_datetime


def _get_subset_with_keys(graph_view, keys):
Expand Down Expand Up @@ -251,3 +266,166 @@ def condition_fn(subset, _visited):
"b is not welcome here",
),
]


def test_bfs_filter_time_window_partitions():
# Create assets with daily partitions
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
hourly_partitions = HourlyPartitionsDefinition(start_date="2023-01-01-00:00")

@asset(partitions_def=daily_partitions)
def upstream():
pass

@asset(partitions_def=hourly_partitions)
def downstream(upstream) -> None:
pass

graph_view = AssetGraphView.for_test(Definitions(assets=[upstream, downstream]))

# Initial subset with multiple days
initial_subset = AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(AssetKey(["upstream"]), "2023-01-01"),
AssetKeyPartitionKey(AssetKey(["upstream"]), "2023-01-02"),
AssetKeyPartitionKey(AssetKey(["upstream"]), "2023-01-03"),
},
graph_view.asset_graph,
)

def condition_fn(subset, visited):
# Filter out weekends
included = set()
excluded = set()

for entity_subset in graph_view.iterate_asset_subsets(subset):
for asset_partition in entity_subset.expensively_compute_asset_partitions():
partition_date = (
cast(
TimeWindowPartitionsDefinition,
graph_view.asset_graph.get(entity_subset.key).partitions_def,
)
.time_window_for_partition_key(asset_partition.partition_key)
.start
)
if partition_date.weekday() >= 5: # Saturday = 5, Sunday = 6
excluded.add(asset_partition)
else:
included.add(asset_partition)

return AssetGraphViewBfsFilterConditionResult(
AssetGraphSubset.from_asset_partition_set(included, graph_view.asset_graph),
(
[
(
AssetGraphSubset.from_asset_partition_set(excluded, graph_view.asset_graph),
"Weekend partitions are excluded",
)
]
if excluded
else []
),
)

result, failed = bfs_filter_asset_graph_view(
graph_view, condition_fn, initial_subset, include_full_execution_set=True
)

# Jan 1, 2023 was a Sunday
assert result == AssetGraphSubset(
partitions_subsets_by_asset_key={
AssetKey(["upstream"]): daily_partitions.get_partition_subset_in_time_window(
TimeWindow(start=create_datetime(2023, 1, 2), end=create_datetime(2023, 1, 4))
),
AssetKey(["downstream"]): hourly_partitions.get_partition_subset_in_time_window(
TimeWindow(start=create_datetime(2023, 1, 2), end=create_datetime(2023, 1, 4))
),
}
)

assert failed == [
(
AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(AssetKey(["upstream"]), "2023-01-01"),
},
graph_view.asset_graph,
),
"Weekend partitions are excluded",
),
]


def test_bfs_filter_self_dependent_asset():
daily_partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")

@asset(
partitions_def=daily_partitions_def,
deps=[
AssetDep(
"self_dependent",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
),
],
)
def self_dependent(context) -> None:
pass

graph_view = AssetGraphView.for_test(Definitions([self_dependent]))

initial_subset = AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(AssetKey(["self_dependent"]), "2023-01-01"),
AssetKeyPartitionKey(AssetKey(["self_dependent"]), "2023-01-02"),
},
graph_view.asset_graph,
)

def condition_fn(subset, _visited):
included = set()
excluded = set()

for asset_partition in subset.iterate_asset_partitions():
partition_key = asset_partition.partition_key
if partition_key == "2023-01-05":
excluded.add(asset_partition)
else:
included.add(asset_partition)

return AssetGraphViewBfsFilterConditionResult(
AssetGraphSubset.from_asset_partition_set(included, graph_view.asset_graph),
(
[
(
AssetGraphSubset.from_asset_partition_set(excluded, graph_view.asset_graph),
"2023-01-05 excluded",
)
]
if excluded
else []
),
)

result, failed = bfs_filter_asset_graph_view(
graph_view, condition_fn, initial_subset, include_full_execution_set=True
)

assert result == AssetGraphSubset(
partitions_subsets_by_asset_key={
AssetKey("self_dependent"): daily_partitions_def.get_partition_subset_in_time_window(
TimeWindow(start=create_datetime(2023, 1, 1), end=create_datetime(2023, 1, 5))
),
}
)

assert failed == [
(
AssetGraphSubset.from_asset_partition_set(
{
AssetKeyPartitionKey(AssetKey("self_dependent"), "2023-01-05"),
},
graph_view.asset_graph,
),
"2023-01-05 excluded",
),
]

0 comments on commit 0362640

Please sign in to comment.