Skip to content

Commit

Permalink
always fetch storage_ids by partition for latest materializations call (
Browse files Browse the repository at this point in the history
#25511)

## Summary & Motivation
Switches the latest materializations by partition resolver to always use
the optimized latest storage ids by partition call.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Oct 28, 2024
1 parent 6288d02 commit 093f87d
Showing 1 changed file with 19 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -985,49 +985,29 @@ def resolve_latestMaterializationByPartition(
graphene_info: ResolveInfo,
partitions: Optional[Sequence[str]] = None,
) -> Sequence[Optional[GrapheneMaterializationEvent]]:
get_partition = (
lambda event: event.dagster_event.step_materialization_data.materialization.partition
latest_storage_ids = sorted(
(
graphene_info.context.instance.event_log_storage.get_latest_storage_id_by_partition(
self._asset_node_snap.asset_key,
DagsterEventType.ASSET_MATERIALIZATION,
set(partitions) if partitions else None,
)
).values()
)

query_all_partitions = partitions is None
partitions = self.get_partition_keys() if query_all_partitions else partitions

if query_all_partitions or len(partitions) > 1000:
# when there are many partitions, it's much more efficient to grab the latest storage
# id for all partitions and then query for the materialization events based on those
# storage ids
latest_storage_ids = sorted(
(
graphene_info.context.instance.event_log_storage.get_latest_storage_id_by_partition(
self._asset_node_snap.asset_key, DagsterEventType.ASSET_MATERIALIZATION
)
).values()
)
events_for_partitions = get_asset_materializations(
graphene_info,
asset_key=self._asset_node_snap.asset_key,
storage_ids=latest_storage_ids,
)
latest_materialization_by_partition = {
get_partition(event): event for event in events_for_partitions
}
else:
events_for_partitions = get_asset_materializations(
graphene_info,
self._asset_node_snap.asset_key,
partitions,
)

latest_materialization_by_partition = {}
for event in events_for_partitions: # events are sorted in order of newest to oldest
event_partition = get_partition(event)
if event_partition not in latest_materialization_by_partition:
latest_materialization_by_partition[event_partition] = event
if len(latest_materialization_by_partition) == len(partitions):
break
events_for_partitions = get_asset_materializations(
graphene_info,
asset_key=self._asset_node_snap.asset_key,
storage_ids=latest_storage_ids,
)
latest_materialization_by_partition = {
event.dagster_event.step_materialization_data.materialization.partition: event
for event in events_for_partitions
if event.dagster_event
}

# return materializations in the same order as the provided partitions, None if
# materialization does not exist
partitions = self.get_partition_keys() if partitions is None else partitions
ordered_materializations = [
latest_materialization_by_partition.get(partition) for partition in partitions
]
Expand Down

0 comments on commit 093f87d

Please sign in to comment.