@@ -386,10 +386,9 @@ func (s *Snapshotter) OnDeltaStreamOpen(ctx context.Context, id int64, typ strin
386
386
}
387
387
388
388
func (s * Snapshotter ) OnStreamDeltaRequest (id int64 , r * discovery.DeltaDiscoveryRequest ) error {
389
- log .Logger .Info ("OnStreamDeltaRequest" )
390
389
st , _ := s .streams .Load (id )
391
390
stream := st .(* Stream )
392
- log .Logger .Info ("OnStreamRequest " ,
391
+ log .Logger .Info ("OnStreamDeltaRequest " ,
393
392
"id" , id ,
394
393
"peer" , stream .peerAddress ,
395
394
"received" , r .GetTypeUrl (),
@@ -401,16 +400,20 @@ func (s *Snapshotter) OnStreamDeltaRequest(id int64, r *discovery.DeltaDiscovery
401
400
)
402
401
403
402
s .addOrUpdateNode (r .GetNode ().GetId (), stream .peerAddress , id )
404
- if len (r .GetResourceNamesSubscribe ()) > 0 || len ( r .GetResourceNamesUnsubscribe ()) > 0 {
403
+ if s . needToUpdateDeltaSubscriptions (r .GetNode (). GetId (), r . GetTypeUrl (), id , r . GetResourceNamesSubscribe (), r .GetResourceNamesUnsubscribe ()) {
405
404
if err := s .updateDeltaStreamNodeResources (r .GetNode ().GetId (), r .GetTypeUrl (), id , r .GetResourceNamesSubscribe (), r .GetResourceNamesUnsubscribe ()); err != nil {
406
405
return err
407
406
}
408
407
}
409
408
return nil
410
409
}
411
410
412
- func (s * Snapshotter ) OnStreamDeltaResponse (i int64 , request * discovery.DeltaDiscoveryRequest , response * discovery.DeltaDiscoveryResponse ) {
413
- log .Logger .Info ("OnStreamDeltaResponse" )
411
+ func (s * Snapshotter ) OnStreamDeltaResponse (id int64 , req * discovery.DeltaDiscoveryRequest , resp * discovery.DeltaDiscoveryResponse ) {
412
+ log .Logger .Info ("OnStreamDeltaResponse" ,
413
+ "id" , id ,
414
+ "type" , resp .GetTypeUrl (),
415
+ "resources" , len (resp .GetResources ()),
416
+ )
414
417
}
415
418
416
419
func (s * Snapshotter ) OnFetchResponse (req * discovery.DiscoveryRequest , resp * discovery.DiscoveryResponse ) {
@@ -587,42 +590,55 @@ func (s *Snapshotter) updateNodeDeltaStreamServiceResources(nodeID, typeURL stri
587
590
}
588
591
// Calculate new resources to track based on subscribed and unsubscribed list
589
592
currTypedResources := nodeResources [streamID ].servicesNames [typeURL ]
593
+ for _ , resourceName := range subscribe {
594
+ if ! slices .Contains (currTypedResources , resourceName ) {
595
+ currTypedResources = append (currTypedResources , resourceName )
596
+ // Snap on every change
597
+ newSnapResources , err := s .getResourcesFromCache (typeURL , currTypedResources )
598
+ if err != nil {
599
+ return fmt .Errorf ("Cannot get resources from cache: %s" , err )
600
+ }
601
+ nodeResources [streamID ].services [typeURL ] = newSnapResources
602
+ nodeResources [streamID ].servicesNames [typeURL ] = currTypedResources
603
+ updatedNode := & Node {
604
+ address : node .address ,
605
+ resources : nodeResources ,
606
+ serviceSnapVersion : node .serviceSnapVersion ,
607
+ endpointsSnapVersion : node .endpointsSnapVersion ,
608
+ }
609
+ s .nodes .Store (nodeID , updatedNode )
610
+ if err := s .nodeServiceSnapshot (nodeID ); err != nil {
611
+ return fmt .Errorf ("Failed to snap resources to cache for node %s: %s" , nodeID , err )
612
+ }
613
+ }
614
+
615
+ }
616
+
617
+ // Unsubscribe resources
590
618
newTypedResources := []string {}
591
619
for _ , resourceName := range currTypedResources {
592
620
if ! slices .Contains (unsubscribe , resourceName ) {
593
621
newTypedResources = append (newTypedResources , resourceName )
594
622
}
595
623
}
596
- for _ , resourceName := range subscribe {
597
- if ! slices .Contains (newTypedResources , resourceName ) {
598
- newTypedResources = append (newTypedResources , resourceName )
599
- }
600
- }
601
-
602
- var newSnapResources []types.Resource
603
- var err error
604
- if s .localhostEndpoints {
605
- newSnapResources , err = s .makeDummyResources (typeURL , newTypedResources )
606
- if err != nil {
607
- return fmt .Errorf ("Cannot make dummy resources for localhost mode: %s" , err )
608
- }
609
- log .Logger .Debug ("Created dummy resources" , "type" , typeURL , "resources" , newSnapResources , "count" , len (newSnapResources ))
610
- } else {
611
- newSnapResources , err = s .getResourcesFromCache (typeURL , newTypedResources )
624
+ if len (newTypedResources ) < len (currTypedResources ) {
625
+ newSnapResources , err := s .getResourcesFromCache (typeURL , newTypedResources )
612
626
if err != nil {
613
627
return fmt .Errorf ("Cannot get resources from cache: %s" , err )
614
628
}
629
+ nodeResources [streamID ].services [typeURL ] = newSnapResources
630
+ nodeResources [streamID ].servicesNames [typeURL ] = newTypedResources
631
+ updatedNode := & Node {
632
+ address : node .address ,
633
+ resources : nodeResources ,
634
+ serviceSnapVersion : node .serviceSnapVersion ,
635
+ endpointsSnapVersion : node .endpointsSnapVersion ,
636
+ }
637
+ s .nodes .Store (nodeID , updatedNode )
638
+ if err := s .nodeServiceSnapshot (nodeID ); err != nil {
639
+ return fmt .Errorf ("Failed to snap resources to cache for node %s: %s" , nodeID , err )
640
+ }
615
641
}
616
-
617
- nodeResources [streamID ].services [typeURL ] = newSnapResources
618
- nodeResources [streamID ].servicesNames [typeURL ] = newTypedResources
619
- updatedNode := & Node {
620
- address : node .address ,
621
- resources : nodeResources ,
622
- serviceSnapVersion : node .serviceSnapVersion ,
623
- endpointsSnapVersion : node .endpointsSnapVersion ,
624
- }
625
- s .nodes .Store (nodeID , updatedNode )
626
642
return nil
627
643
}
628
644
@@ -805,7 +821,7 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
805
821
node := n .(* Node )
806
822
sNodeResources , ok := node .resources [streamID ]
807
823
if ! ok {
808
- log .Logger .Warn ("Cannot check if snapshot needs updating, strema not found" , "id" , streamID )
824
+ log .Logger .Warn ("Cannot check if snapshot needs updating, stream not found" , "id" , streamID )
809
825
return false
810
826
}
811
827
if mapTypeURL (typeURL ) == "services" {
@@ -817,6 +833,33 @@ func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int6
817
833
return false
818
834
}
819
835
836
+ // needToUpdateDeltaSubscriptions returns true if a delta request changes the
837
+ // list of subscribed resources
838
+ func (s * Snapshotter ) needToUpdateDeltaSubscriptions (nodeID , typeURL string , streamID int64 , subscribe , unsubscribe []string ) bool {
839
+ n , ok := s .nodes .Load (nodeID )
840
+ if ! ok {
841
+ return false
842
+ }
843
+ node := n .(* Node )
844
+ sNodeResources , ok := node .resources [streamID ]
845
+ if ! ok {
846
+ log .Logger .Warn ("Cannot check if delta subscriptions changed, stream not found" , "id" , streamID )
847
+ return false
848
+ }
849
+ resources := sNodeResources .servicesNames [typeURL ]
850
+ for _ , r := range subscribe {
851
+ if ! slices .Contains (resources , r ) {
852
+ return true
853
+ }
854
+ }
855
+ for _ , r := range unsubscribe {
856
+ if slices .Contains (resources , r ) {
857
+ return true
858
+ }
859
+ }
860
+ return false
861
+ }
862
+
820
863
// makeDummyResources blindly creates default configuration resources to snapshot based on the requested names.
821
864
// For endpoints it will create a single localhost one
822
865
func (s * Snapshotter ) makeDummyResources (typeURL string , resources []string ) ([]types.Resource , error ) {
0 commit comments