From 093f87dbb467e7c68ca933816ae20b5e22b848f2 Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:10:50 -0700 Subject: [PATCH] always fetch storage_ids by partition for latest materializations call (#25511) ## Summary & Motivation Switches the latest materializations by partition resolver to always use the optimized latest storage ids by partition call. ## How I Tested These Changes BK --- .../dagster_graphql/schema/asset_graph.py | 58 ++++++------------- 1 file changed, 19 insertions(+), 39 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index 89b0d2e063c7b..2aca55e8c52a4 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -985,49 +985,29 @@ def resolve_latestMaterializationByPartition( graphene_info: ResolveInfo, partitions: Optional[Sequence[str]] = None, ) -> Sequence[Optional[GrapheneMaterializationEvent]]: - get_partition = ( - lambda event: event.dagster_event.step_materialization_data.materialization.partition + latest_storage_ids = sorted( + ( + graphene_info.context.instance.event_log_storage.get_latest_storage_id_by_partition( + self._asset_node_snap.asset_key, + DagsterEventType.ASSET_MATERIALIZATION, + set(partitions) if partitions else None, + ) + ).values() ) - - query_all_partitions = partitions is None - partitions = self.get_partition_keys() if query_all_partitions else partitions - - if query_all_partitions or len(partitions) > 1000: - # when there are many partitions, it's much more efficient to grab the latest storage - # id for all partitions and then query for the materialization events based on those - # storage ids - latest_storage_ids = sorted( - ( - graphene_info.context.instance.event_log_storage.get_latest_storage_id_by_partition( - self._asset_node_snap.asset_key, DagsterEventType.ASSET_MATERIALIZATION - ) - ).values() - ) - events_for_partitions = get_asset_materializations( - graphene_info, - asset_key=self._asset_node_snap.asset_key, - storage_ids=latest_storage_ids, - ) - latest_materialization_by_partition = { - get_partition(event): event for event in events_for_partitions - } - else: - events_for_partitions = get_asset_materializations( - graphene_info, - self._asset_node_snap.asset_key, - partitions, - ) - - latest_materialization_by_partition = {} - for event in events_for_partitions: # events are sorted in order of newest to oldest - event_partition = get_partition(event) - if event_partition not in latest_materialization_by_partition: - latest_materialization_by_partition[event_partition] = event - if len(latest_materialization_by_partition) == len(partitions): - break + events_for_partitions = get_asset_materializations( + graphene_info, + asset_key=self._asset_node_snap.asset_key, + storage_ids=latest_storage_ids, + ) + latest_materialization_by_partition = { + event.dagster_event.step_materialization_data.materialization.partition: event + for event in events_for_partitions + if event.dagster_event + } # return materializations in the same order as the provided partitions, None if # materialization does not exist + partitions = self.get_partition_keys() if partitions is None else partitions ordered_materializations = [ latest_materialization_by_partition.get(partition) for partition in partitions ]