Skip to content

Commit

Permalink
Update test_perf.py to include asset checks (dagster-io#25382)
Browse files Browse the repository at this point in the history
## Summary & Motivation
As title. Copied from dagster-io#25302

## How I Tested These Changes
Run test_perf.py
  • Loading branch information
briantu authored and Grzyblon committed Oct 26, 2024
1 parent e97c61b commit 5725271
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@
from typing import List, NamedTuple, Optional, Sequence

from dagster import (
AssetCheckResult,
AssetCheckSpec,
AssetKey,
AssetsDefinition,
AutoMaterializePolicy,
AutomationCondition,
DailyPartitionsDefinition,
HourlyPartitionsDefinition,
MaterializeResult,
PartitionsDefinition,
StaticPartitionsDefinition,
asset,
repository,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._utils.warnings import disable_dagster_warnings


class AssetLayerConfig(NamedTuple):
n_assets: int
n_upstreams_per_asset: int = 0
partitions_def: Optional[PartitionsDefinition] = None
n_checks_per_asset: int = 0


def build_assets(
id: str,
layer_configs: Sequence[AssetLayerConfig],
automation_condition: Optional[AutomationCondition],
automation_condition: Optional[AutomationCondition] = AutomationCondition.eager(),
) -> List[AssetsDefinition]:
layers = []

with disable_dagster_warnings():
for layer_num, layer_config in enumerate(layer_configs):
for layer_config in layer_configs:
parent_index = 0
layer = []
for i in range(layer_config.n_assets):
Expand All @@ -44,15 +50,30 @@ def build_assets(
else:
non_argument_deps = set()

name = f"{id}_{len(layers)}_{i}"

@asset(
partitions_def=layer_config.partitions_def,
name=f"{id}_{len(layers)}_{i}",
name=name,
automation_condition=automation_condition,
non_argument_deps=non_argument_deps,
group_name=f"g{layer_num}",
check_specs=[
AssetCheckSpec(
name=f"check{k}",
asset=AssetKey(name),
automation_condition=automation_condition,
)
for k in range(layer_config.n_checks_per_asset)
],
)
def _asset():
pass
def _asset(context: AssetExecutionContext) -> MaterializeResult:
return MaterializeResult(
asset_key=context.asset_key,
check_results=[
AssetCheckResult(check_name=key.name, passed=True)
for key in context.selected_asset_check_keys
],
)

layer.append(_asset)
layers.append(layer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,71 @@
import datetime
import time

from dagster import (
AutomationCondition,
DagsterInstance,
DailyPartitionsDefinition,
Definitions,
HourlyPartitionsDefinition,
evaluate_automation_conditions,
)
from dagster._core.instance import DagsterInstance
from dagster._core.definitions.automation_tick_evaluation_context import build_run_requests
from dagster._time import get_current_datetime
from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets


def test_eager_perf() -> None:
def run_declarative_automation_perf_simulation(instance: DagsterInstance) -> None:
hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00")
daily_partitions_def = DailyPartitionsDefinition("2020-01-01")
assets = build_assets(
id="perf_test",
layer_configs=[
AssetLayerConfig(100, 0, hourly_partitions_def),
AssetLayerConfig(200, 2, hourly_partitions_def),
AssetLayerConfig(200, 4, hourly_partitions_def),
AssetLayerConfig(200, 4, daily_partitions_def),
AssetLayerConfig(200, 2, daily_partitions_def),
AssetLayerConfig(50, 0, hourly_partitions_def),
AssetLayerConfig(100, 2, hourly_partitions_def, n_checks_per_asset=1),
AssetLayerConfig(100, 4, hourly_partitions_def, n_checks_per_asset=2),
AssetLayerConfig(100, 4, daily_partitions_def, n_checks_per_asset=2),
AssetLayerConfig(100, 2, daily_partitions_def),
AssetLayerConfig(50, 2, daily_partitions_def),
],
automation_condition=AutomationCondition.eager(),
automation_condition=AutomationCondition.eager()
& AutomationCondition.all_deps_blocking_checks_passed(),
)
defs = Definitions(assets=assets)
asset_job = defs.get_implicit_global_asset_job_def()

instance = DagsterInstance.ephemeral()
cursor = None
start = time.time()
for _ in range(2):
cursor = evaluate_automation_conditions(
defs=assets, instance=instance, cursor=cursor
).cursor
evaluation_time = get_current_datetime() - datetime.timedelta(days=1)
for _ in range(3):
result = evaluate_automation_conditions(
defs=assets, instance=instance, cursor=cursor, evaluation_time=evaluation_time
)
cursor = result.cursor
end = time.time()
duration = end - start
# all iterations should take less than 20 seconds on this graph
assert duration < 20.0

# simulate the new events that would come from the requested runs
run_requests = build_run_requests(
entity_subsets=[r.true_subset for r in result.results],
asset_graph=defs.get_asset_graph(),
run_tags={},
emit_backfills=False,
)
for run_request in run_requests:
asset_job.get_subset(
asset_selection=set(run_request.asset_selection)
if run_request.asset_selection
else None,
asset_check_selection=set(run_request.asset_check_keys)
if run_request.asset_check_keys
else None,
).execute_in_process(instance=instance, partition_key=run_request.partition_key)

evaluation_time += datetime.timedelta(hours=1)
start = time.time()


def test_eager_perf() -> None:
run_declarative_automation_perf_simulation(DagsterInstance.ephemeral())

0 comments on commit 5725271

Please sign in to comment.