@@ -280,9 +280,14 @@ public DeltaWatch createDeltaWatch(
280
280
return watch ;
281
281
}
282
282
} else if (hasClusterChanged && requestResourceType .equals (ResourceType .ENDPOINT )) {
283
- ResponseState responseState = respondDeltaTracked (
283
+ Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
284
+ List <String > removedResources = findRemovedResources (watch ,
285
+ snapshotResources );
286
+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
287
+ ResponseState responseState = respondDelta (
284
288
watch ,
285
- snapshot .resources (request .getResourceType ()),
289
+ changedResources ,
290
+ removedResources ,
286
291
version ,
287
292
group );
288
293
if (responseState .equals (ResponseState .RESPONDED ) || responseState .equals (ResponseState .CANCELLED )) {
@@ -304,8 +309,13 @@ public DeltaWatch createDeltaWatch(
304
309
}
305
310
306
311
// Otherwise, version is different, the watch may be responded immediately
307
- ResponseState responseState = respondDeltaTracked (watch ,
308
- snapshot .resources (request .getResourceType ()),
312
+ Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
313
+ List <String > removedResources = findRemovedResources (watch ,
314
+ snapshotResources );
315
+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
316
+ ResponseState responseState = respondDelta (watch ,
317
+ changedResources ,
318
+ removedResources ,
309
319
version ,
310
320
group );
311
321
if (responseState .equals (ResponseState .RESPONDED ) || responseState .equals (ResponseState .CANCELLED )) {
@@ -470,8 +480,10 @@ protected void respondWithSpecificOrder(T group,
470
480
.filter (s -> watch .trackedResources ().get (s ) != null )
471
481
.collect (Collectors .toList ());
472
482
473
- ResponseState responseState = respondDeltaTracked (watch ,
474
- snapshotChangedResources ,
483
+ Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotChangedResources );
484
+
485
+ ResponseState responseState = respondDelta (watch ,
486
+ changedResources ,
475
487
removedResources ,
476
488
version ,
477
489
group );
@@ -551,22 +563,30 @@ private boolean respond(Watch watch, U snapshot, T group) {
551
563
return false ;
552
564
}
553
565
554
- /**
555
- * Responds a delta watch using resource version comparison.
556
- *
557
- * @return if the watch has been responded.
558
- */
559
- private ResponseState respondDeltaTracked (DeltaWatch watch ,
560
- Map <String , SnapshotResource <?>> snapshotResources ,
561
- String version ,
562
- T group ) {
566
+ private List <String > findRemovedResources (DeltaWatch watch , Map <String , SnapshotResource <?>> snapshotResources ) {
563
567
// remove resources for which client has a tracked version but do not exist in snapshot
564
- List < String > removedResources = watch .trackedResources ().keySet ()
568
+ return watch .trackedResources ().keySet ()
565
569
.stream ()
566
570
.filter (s -> !snapshotResources .containsKey (s ))
567
571
.collect (Collectors .toList ());
572
+ }
568
573
569
- return respondDeltaTracked (watch , snapshotResources , removedResources , version , group );
574
+ private Map <String , SnapshotResource <?>> findChangedResources (DeltaWatch watch ,
575
+ Map <String , SnapshotResource <?>> snapshotResources ) {
576
+ return snapshotResources .entrySet ()
577
+ .stream ()
578
+ .filter (entry -> {
579
+ if (watch .pendingResources ().contains (entry .getKey ())) {
580
+ return true ;
581
+ }
582
+ String resourceVersion = watch .trackedResources ().get (entry .getKey ());
583
+ if (resourceVersion == null ) {
584
+ // resource is not tracked, should respond it only if watch is wildcard
585
+ return watch .isWildcard ();
586
+ }
587
+ return !entry .getValue ().version ().equals (resourceVersion );
588
+ })
589
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
570
590
}
571
591
572
592
private ResponseState respondDeltaTracked (DeltaWatch watch ,
0 commit comments