Skip to content

Commit

Permalink
add properties to get the runs and assets that were submitted by a tick
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 21, 2025
1 parent 5708bee commit 3f750a2
Showing 1 changed file with 58 additions and 12 deletions.
70 changes: 58 additions & 12 deletions python_modules/dagster/dagster/_core/scheduler/instigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,28 @@ def dynamic_partitions_request_results(
return self.tick_data.dynamic_partitions_request_results

@property
def requested_asset_materialization_count(self) -> int:
if self.tick_data.status != TickStatus.SUCCESS:
return 0
def submitted_run_requests(self) -> Sequence[RunRequest]:
"""The run requests that were successfully submitted for execution by this tick.
May not be the same as self.run_requests if the tick was terminated mid-iteration.
"""
submitted = []
requested_run_ids_and_requests = zip(
self.tick_data.reserved_run_ids or [], self.run_requests or []
)
submitted_run_ids = self.run_ids
for run_id, run_request in requested_run_ids_and_requests:
if run_id in submitted_run_ids:
submitted.append(run_request)

return submitted

def _asset_materialization_count_for_run_requests(
self, run_requests: Optional[Sequence[RunRequest]]
):
asset_partitions_from_single_runs = set()
num_assets_requested_from_backfill_runs = 0
num_requested_checks = 0
for run_request in self.tick_data.run_requests or []:
for run_request in run_requests or []:
if run_request.requires_backfill_daemon():
asset_graph_subset = check.not_none(run_request.asset_graph_subset)
num_assets_requested_from_backfill_runs += (
Expand All @@ -466,7 +480,7 @@ def requested_asset_materialization_count(self) -> int:
asset_partitions_from_single_runs.add(
AssetKeyPartitionKey(asset_key, run_request.partition_key)
)
for asset_check_key in run_request.asset_check_keys or []:
for _ in run_request.asset_check_keys or []:
num_requested_checks += 1
return (
len(asset_partitions_from_single_runs)
Expand All @@ -475,12 +489,28 @@ def requested_asset_materialization_count(self) -> int:
)

@property
def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]:
if self.tick_data.status != TickStatus.SUCCESS:
return {}
def requested_asset_materialization_count(self) -> int:
"""The number of asset materializations requested by this tick."""
if self.tick_data.status == TickStatus.FAILURE:
return 0

return self._asset_materialization_count_for_run_requests(self.run_requests)

@property
def submitted_asset_materialization_count(self) -> int:
"""The number of asset materializations that were successfully submitted for materialization
by this tick. May not be the same as requested_asset_materialization_count if the tick was terminated mid-iteration.
"""
if self.tick_data.status == TickStatus.FAILURE:
return 0

return self._asset_materialization_count_for_run_requests(self.submitted_run_requests)

def _assets_and_partitions_for_run_requests(
self, run_requests: Optional[Sequence[RunRequest]]
) -> Mapping[AssetKey, AbstractSet[str]]:
partitions_by_asset_key = {}
for run_request in self.tick_data.run_requests or []:
for run_request in run_requests or []:
if run_request.requires_backfill_daemon():
asset_graph_subset = check.not_none(run_request.asset_graph_subset)
for asset_key_partition_key in asset_graph_subset.iterate_asset_partitions():
Expand All @@ -494,21 +524,37 @@ def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]
for asset_key in run_request.asset_selection or []:
if asset_key not in partitions_by_asset_key:
partitions_by_asset_key[asset_key] = set()

if run_request.partition_key:
partitions_by_asset_key[asset_key].add(run_request.partition_key)
for asset_check_key in run_request.asset_check_keys or []:
asset_key = asset_check_key.asset_key
if asset_key not in partitions_by_asset_key:
partitions_by_asset_key[asset_key] = set()

partitions_by_asset_key[asset_key].add(asset_check_key.name)

return partitions_by_asset_key

@property
def requested_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]:
"""The assets and partitions that are requested by this tick."""
if self.tick_data.status == TickStatus.FAILURE:
return {}

return self._assets_and_partitions_for_run_requests(self.run_requests)

@property
def submitted_assets_and_partitions(self) -> Mapping[AssetKey, AbstractSet[str]]:
"""The assets and partitions that were successfully submitted for materialization by this tick.
May not be the same as requested_assets_and_partitions if the tick was terminated mid-iteration.
"""
if self.tick_data.status == TickStatus.FAILURE:
return {}

return self._assets_and_partitions_for_run_requests(self.submitted_run_requests)

@property
def requested_asset_keys(self) -> AbstractSet[AssetKey]:
if self.tick_data.status != TickStatus.SUCCESS:
if self.tick_data.status == TickStatus.FAILURE:
return set()

return set(self.requested_assets_and_partitions.keys())
Expand Down

0 comments on commit 3f750a2

Please sign in to comment.