@@ -802,10 +802,7 @@ def build_data_response(
802802        self , initial_result_record : InitialResultRecord , data : dict [str , Any ] |  None 
803803    ) ->  ExecutionResult  |  ExperimentalIncrementalExecutionResults :
804804        """Build response for the given data.""" 
805-         for  child  in  initial_result_record .children :
806-             if  child .filtered :
807-                 continue 
808-             self ._publish (child )
805+         pending_sources  =  self ._publish (initial_result_record .children )
809806
810807        errors  =  initial_result_record .errors  or  None 
811808        if  errors :
@@ -816,14 +813,7 @@ def build_data_response(
816813                    error .message ,
817814                )
818815            )
819-         pending  =  self ._pending 
820-         if  pending :
821-             pending_sources : RefSet [DeferredFragmentRecord  |  StreamRecord ] =  RefSet (
822-                 subsequent_result_record .stream_record 
823-                 if  isinstance (subsequent_result_record , StreamItemsRecord )
824-                 else  subsequent_result_record 
825-                 for  subsequent_result_record  in  pending 
826-             )
816+         if  pending_sources :
827817            return  ExperimentalIncrementalExecutionResults (
828818                initial_result = InitialIncrementalExecutionResult (
829819                    data ,
@@ -994,17 +984,7 @@ def _process_pending(
994984        completed_results : list [CompletedResult ] =  []
995985        to_result  =  self ._completed_record_to_result 
996986        for  subsequent_result_record  in  completed_records :
997-             for  child  in  subsequent_result_record .children :
998-                 if  child .filtered :
999-                     continue 
1000-                 pending_source : DeferredFragmentRecord  |  StreamRecord  =  (
1001-                     child .stream_record 
1002-                     if  isinstance (child , StreamItemsRecord )
1003-                     else  child 
1004-                 )
1005-                 if  not  pending_source .pending_sent :
1006-                     new_pending_sources .add (pending_source )
1007-                 self ._publish (child )
987+             self ._publish (subsequent_result_record .children , new_pending_sources )
1008988            incremental_result : IncrementalResult 
1009989            if  isinstance (subsequent_result_record , StreamItemsRecord ):
1010990                if  subsequent_result_record .is_final_record :
@@ -1060,7 +1040,7 @@ def _get_incremental_defer_result(
10601040        max_length : int  |  None  =  None 
10611041        id_with_longest_path : str  |  None  =  None 
10621042        for  fragment_record  in  fragment_records :
1063-             if  fragment_record .id  is  None :
1043+             if  fragment_record .id  is  None :   # pragma: no cover 
10641044                continue 
10651045            length  =  len (fragment_record .path )
10661046            if  max_length  is  None  or  length  >  max_length :
@@ -1090,20 +1070,45 @@ def _completed_record_to_result(
10901070            completed_record .errors  or  None ,
10911071        )
10921072
1093-     def  _publish (self , subsequent_result_record : SubsequentResultRecord ) ->  None :
1094-         """Publish the given incremental data record.""" 
1095-         if  isinstance (subsequent_result_record , StreamItemsRecord ):
1096-             if  subsequent_result_record .is_completed :
1097-                 self ._push (subsequent_result_record )
1098-             else :
1073+     def  _publish (
1074+         self ,
1075+         subsequent_result_records : dict [SubsequentResultRecord , None ],
1076+         pending_sources : RefSet [DeferredFragmentRecord  |  StreamRecord ] |  None  =  None ,
1077+     ) ->  RefSet [DeferredFragmentRecord  |  StreamRecord ]:
1078+         """Publish the given set of incremental data record.""" 
1079+         if  pending_sources  is  None :
1080+             pending_sources  =  RefSet ()
1081+         empty_records : list [SubsequentResultRecord ] =  []
1082+ 
1083+         for  subsequent_result_record  in  subsequent_result_records :
1084+             if  subsequent_result_record .filtered :
1085+                 continue 
1086+             if  isinstance (subsequent_result_record , StreamItemsRecord ):
1087+                 if  subsequent_result_record .is_completed :
1088+                     self ._push (subsequent_result_record )
1089+                 else :
1090+                     self ._introduce (subsequent_result_record )
1091+ 
1092+                 stream  =  subsequent_result_record .stream_record 
1093+                 if  not  stream .pending_sent :
1094+                     pending_sources .add (stream )
1095+                 continue 
1096+ 
1097+             if  subsequent_result_record ._pending :  # noqa: SLF001 
10991098                self ._introduce (subsequent_result_record )
1100-         elif  subsequent_result_record ._pending :  # noqa: SLF001 
1101-             self ._introduce (subsequent_result_record )
1102-         elif  (
1103-             subsequent_result_record .deferred_grouped_field_set_records 
1104-             or  subsequent_result_record .children 
1105-         ):
1106-             self ._push (subsequent_result_record )
1099+             elif  not  subsequent_result_record .deferred_grouped_field_set_records :
1100+                 empty_records .append (subsequent_result_record )
1101+                 continue 
1102+             else :
1103+                 self ._push (subsequent_result_record )
1104+ 
1105+             if  not  subsequent_result_record .pending_sent :
1106+                 pending_sources .add (subsequent_result_record )
1107+ 
1108+         for  empty_record  in  empty_records :
1109+             self ._publish (empty_record .children , pending_sources )
1110+ 
1111+         return  pending_sources 
11071112
11081113    @staticmethod  
11091114    def  _get_children (
0 commit comments