diff --git a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py index 0d0f20cdd3032..561125cbe8fc4 100644 --- a/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py +++ b/python_modules/dagster-test/dagster_test/toys/auto_materializing/large_graph.py @@ -2,16 +2,21 @@ from typing import List, NamedTuple, Optional, Sequence from dagster import ( + AssetCheckResult, + AssetCheckSpec, + AssetKey, AssetsDefinition, + AutoMaterializePolicy, AutomationCondition, DailyPartitionsDefinition, HourlyPartitionsDefinition, + MaterializeResult, PartitionsDefinition, StaticPartitionsDefinition, asset, repository, ) -from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext from dagster._utils.warnings import disable_dagster_warnings @@ -19,17 +24,18 @@ class AssetLayerConfig(NamedTuple): n_assets: int n_upstreams_per_asset: int = 0 partitions_def: Optional[PartitionsDefinition] = None + n_checks_per_asset: int = 0 def build_assets( id: str, layer_configs: Sequence[AssetLayerConfig], - automation_condition: Optional[AutomationCondition], + automation_condition: Optional[AutomationCondition] = AutomationCondition.eager(), ) -> List[AssetsDefinition]: layers = [] with disable_dagster_warnings(): - for layer_num, layer_config in enumerate(layer_configs): + for layer_config in layer_configs: parent_index = 0 layer = [] for i in range(layer_config.n_assets): @@ -44,15 +50,30 @@ def build_assets( else: non_argument_deps = set() + name = f"{id}_{len(layers)}_{i}" + @asset( partitions_def=layer_config.partitions_def, - name=f"{id}_{len(layers)}_{i}", + name=name, automation_condition=automation_condition, non_argument_deps=non_argument_deps, - group_name=f"g{layer_num}", + check_specs=[ + AssetCheckSpec( + name=f"check{k}", + asset=AssetKey(name), + automation_condition=automation_condition, + ) + for k in range(layer_config.n_checks_per_asset) + ], ) - def _asset(): - pass + def _asset(context: AssetExecutionContext) -> MaterializeResult: + return MaterializeResult( + asset_key=context.asset_key, + check_results=[ + AssetCheckResult(check_name=key.name, passed=True) + for key in context.selected_asset_check_keys + ], + ) layer.append(_asset) layers.append(layer) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py index 4d950fea52bf7..3b4817223dd74 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/perf_tests/test_perf.py @@ -1,40 +1,71 @@ +import datetime import time from dagster import ( AutomationCondition, + DagsterInstance, DailyPartitionsDefinition, + Definitions, HourlyPartitionsDefinition, evaluate_automation_conditions, ) -from dagster._core.instance import DagsterInstance +from dagster._core.definitions.automation_tick_evaluation_context import build_run_requests +from dagster._time import get_current_datetime from dagster_test.toys.auto_materializing.large_graph import AssetLayerConfig, build_assets -def test_eager_perf() -> None: +def run_declarative_automation_perf_simulation(instance: DagsterInstance) -> None: hourly_partitions_def = HourlyPartitionsDefinition("2020-01-01-00:00") daily_partitions_def = DailyPartitionsDefinition("2020-01-01") assets = build_assets( id="perf_test", layer_configs=[ - AssetLayerConfig(100, 0, hourly_partitions_def), - AssetLayerConfig(200, 2, hourly_partitions_def), - AssetLayerConfig(200, 4, hourly_partitions_def), - AssetLayerConfig(200, 4, daily_partitions_def), - AssetLayerConfig(200, 2, daily_partitions_def), + AssetLayerConfig(50, 0, hourly_partitions_def), + AssetLayerConfig(100, 2, hourly_partitions_def, n_checks_per_asset=1), + AssetLayerConfig(100, 4, hourly_partitions_def, n_checks_per_asset=2), + AssetLayerConfig(100, 4, daily_partitions_def, n_checks_per_asset=2), AssetLayerConfig(100, 2, daily_partitions_def), + AssetLayerConfig(50, 2, daily_partitions_def), ], - automation_condition=AutomationCondition.eager(), + automation_condition=AutomationCondition.eager() + & AutomationCondition.all_deps_blocking_checks_passed(), ) + defs = Definitions(assets=assets) + asset_job = defs.get_implicit_global_asset_job_def() - instance = DagsterInstance.ephemeral() cursor = None start = time.time() - for _ in range(2): - cursor = evaluate_automation_conditions( - defs=assets, instance=instance, cursor=cursor - ).cursor + evaluation_time = get_current_datetime() - datetime.timedelta(days=1) + for _ in range(3): + result = evaluate_automation_conditions( + defs=assets, instance=instance, cursor=cursor, evaluation_time=evaluation_time + ) + cursor = result.cursor end = time.time() duration = end - start # all iterations should take less than 20 seconds on this graph assert duration < 20.0 + + # simulate the new events that would come from the requested runs + run_requests = build_run_requests( + entity_subsets=[r.true_subset for r in result.results], + asset_graph=defs.get_asset_graph(), + run_tags={}, + emit_backfills=False, + ) + for run_request in run_requests: + asset_job.get_subset( + asset_selection=set(run_request.asset_selection) + if run_request.asset_selection + else None, + asset_check_selection=set(run_request.asset_check_keys) + if run_request.asset_check_keys + else None, + ).execute_in_process(instance=instance, partition_key=run_request.partition_key) + + evaluation_time += datetime.timedelta(hours=1) start = time.time() + + +def test_eager_perf() -> None: + run_declarative_automation_perf_simulation(DagsterInstance.ephemeral())