diff --git a/python_modules/dagster/dagster/_core/scheduler/instigation.py b/python_modules/dagster/dagster/_core/scheduler/instigation.py index 536c147d73886..a42bc92381ac3 100644 --- a/python_modules/dagster/dagster/_core/scheduler/instigation.py +++ b/python_modules/dagster/dagster/_core/scheduler/instigation.py @@ -448,14 +448,28 @@ def dynamic_partitions_request_results( return self.tick_data.dynamic_partitions_request_results @property - def requested_asset_materialization_count(self) -> int: - if self.tick_data.status != TickStatus.SUCCESS: - return 0 + def submitted_run_requests(self) -> Sequence[RunRequest]: + """The run requests that were successfully submitted for execution by this tick. + May not be the same as self.run_requests if the tick was terminated mid-iteration. + """ + submitted = [] + requested_run_ids_and_requests = zip( + self.tick_data.reserved_run_ids or [], self.run_requests or [] + ) + submitted_run_ids = self.run_ids + for run_id, run_request in requested_run_ids_and_requests: + if run_id in submitted_run_ids: + submitted.append(run_request) + return submitted + + def _asset_materialization_count_for_run_requests( + self, run_requests: Optional[Sequence[RunRequest]] + ): asset_partitions_from_single_runs = set() num_assets_requested_from_backfill_runs = 0 num_requested_checks = 0 - for run_request in self.tick_data.run_requests or []: + for run_request in run_requests or []: if run_request.requires_backfill_daemon(): asset_graph_subset = check.not_none(run_request.asset_graph_subset) num_assets_requested_from_backfill_runs += ( @@ -466,7 +480,7 @@ def requested_asset_materialization_count(self) -> int: asset_partitions_from_single_runs.add( AssetKeyPartitionKey(asset_key, run_request.partition_key) ) - for asset_check_key in run_request.asset_check_keys or []: + for _ in run_request.asset_check_keys or []: num_requested_checks += 1 return ( len(asset_partitions_from_single_runs) @@ -475,12 +489,28 @@ def requested_asset_materialization_count(self) -> int: ) @property - def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]: - if self.tick_data.status != TickStatus.SUCCESS: - return {} + def requested_asset_materialization_count(self) -> int: + """The number of asset materializations requested by this tick.""" + if self.tick_data.status == TickStatus.FAILURE: + return 0 + + return self._asset_materialization_count_for_run_requests(self.run_requests) + @property + def submitted_asset_materialization_count(self) -> int: + """The number of asset materializations that were successfully submitted for materialization + by this tick. May not be the same as requested_asset_materialization_count if the tick was terminated mid-iteration. + """ + if self.tick_data.status == TickStatus.FAILURE: + return 0 + + return self._asset_materialization_count_for_run_requests(self.submitted_run_requests) + + def _assets_and_partitions_for_run_requests( + self, run_requests: Optional[Sequence[RunRequest]] + ) -> Mapping[AssetKey, AbstractSet[str]]: partitions_by_asset_key = {} - for run_request in self.tick_data.run_requests or []: + for run_request in run_requests or []: if run_request.requires_backfill_daemon(): asset_graph_subset = check.not_none(run_request.asset_graph_subset) for asset_key_partition_key in asset_graph_subset.iterate_asset_partitions(): @@ -494,21 +524,37 @@ def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]] for asset_key in run_request.asset_selection or []: if asset_key not in partitions_by_asset_key: partitions_by_asset_key[asset_key] = set() - if run_request.partition_key: partitions_by_asset_key[asset_key].add(run_request.partition_key) for asset_check_key in run_request.asset_check_keys or []: asset_key = asset_check_key.asset_key if asset_key not in partitions_by_asset_key: partitions_by_asset_key[asset_key] = set() - partitions_by_asset_key[asset_key].add(asset_check_key.name) return partitions_by_asset_key + @property + def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]: + """The assets and partitions that are requested by this tick.""" + if self.tick_data.status == TickStatus.FAILURE: + return {} + + return self._assets_and_partitions_for_run_requests(self.run_requests) + + @property + def submitted_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]: + """The assets and partitions that were successfully submitted for materialization by this tick. + May not be the same as requested_assets_and_partitions if the tick was terminated mid-iteration. + """ + if self.tick_data.status == TickStatus.FAILURE: + return {} + + return self._assets_and_partitions_for_run_requests(self.submitted_run_requests) + @property def requested_asset_keys(self) -> AbstractSet[AssetKey]: - if self.tick_data.status != TickStatus.SUCCESS: + if self.tick_data.status == TickStatus.FAILURE: return set() return set(self.requested_assets_and_partitions.keys())