Skip to content

Commit f15c2e7

Browse files
committed
keep intermediate delete updates
Signed-off-by: Huabing Zhao <[email protected]>
1 parent 0968300 commit f15c2e7

File tree

3 files changed

+54
-46
lines changed

3 files changed

+54
-46
lines changed

internal/message/watchutil.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ func HandleSubscription[K comparable, V any](
106106
}
107107
}
108108

109+
// updateKey is a helper struct used to uniquely identify updates by their key and deletion status.
110+
// This is to ensure that deleted keys are treated distinctly from updated keys.
111+
type updateKey[K comparable] struct {
112+
Key K
113+
Delete bool
114+
}
115+
109116
// coalesceUpdates merges multiple updates for the same key into a single update,
110117
// preserving the latest state for each key.
111118
// This helps reduce redundant processing and ensures that only the most recent update per key is handled.
@@ -115,15 +122,16 @@ func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Upd
115122
}
116123

117124
result := make([]watchable.Update[K, V], 0, len(updates))
118-
indexByKey := make(map[K]int, len(updates))
125+
indexByKey := make(map[updateKey[K]]int, len(updates))
119126

120127
for i := len(updates) - 1; i >= 0; i-- {
121128
update := updates[i]
122-
if _, ok := indexByKey[update.Key]; ok {
129+
key := updateKey[K]{Key: update.Key, Delete: update.Delete}
130+
if _, ok := indexByKey[key]; ok {
123131
continue
124132
}
125133

126-
indexByKey[update.Key] = len(result)
134+
indexByKey[key] = len(result)
127135
result = append(result, update)
128136
}
129137

internal/message/watchutil_internal_test.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/telepresenceio/watchable"
1313
)
1414

15-
func TestMergeUpdates(t *testing.T) {
15+
func TestCoalesceUpdates(t *testing.T) {
1616
t.Parallel()
1717

1818
tests := []struct {
@@ -26,18 +26,51 @@ func TestMergeUpdates(t *testing.T) {
2626
expected: []watchable.Update[string, int]{},
2727
},
2828
{
29-
name: "latest update per key delete state wins",
29+
name: "simple updates without repeats",
3030
input: []watchable.Update[string, int]{
3131
{Key: "foo", Value: 1},
32-
{Key: "bar", Delete: true, Value: 10},
33-
{Key: "baz", Value: 5},
34-
{Key: "bar", Delete: true, Value: 11},
35-
{Key: "foo", Value: 2},
32+
{Key: "bar", Value: 2},
33+
{Key: "baz", Value: 3},
3634
},
3735
expected: []watchable.Update[string, int]{
38-
{Key: "baz", Value: 5},
39-
{Key: "bar", Delete: true, Value: 11},
40-
{Key: "foo", Value: 2},
36+
{Key: "foo", Value: 1},
37+
{Key: "bar", Value: 2},
38+
{Key: "baz", Value: 3},
39+
},
40+
},
41+
{
42+
name: "latest update per key wins",
43+
input: []watchable.Update[string, int]{
44+
{Key: "foo", Value: 1},
45+
{Key: "bar", Value: 2},
46+
{Key: "baz", Value: 3},
47+
{Key: "bar", Value: 4},
48+
{Key: "foo", Value: 5},
49+
{Key: "baz", Value: 6},
50+
},
51+
expected: []watchable.Update[string, int]{
52+
{Key: "bar", Value: 4},
53+
{Key: "foo", Value: 5},
54+
{Key: "baz", Value: 6},
55+
},
56+
},
57+
{
58+
name: "keep intermediate deletes",
59+
input: []watchable.Update[string, int]{
60+
{Key: "foo", Value: 1},
61+
{Key: "bar", Delete: true, Value: 2},
62+
{Key: "baz", Value: 3},
63+
{Key: "bar", Value: 4},
64+
{Key: "foo", Value: 5},
65+
{Key: "baz", Delete: true, Value: 6},
66+
{Key: "bar", Value: 6},
67+
},
68+
expected: []watchable.Update[string, int]{
69+
{Key: "bar", Delete: true, Value: 2},
70+
{Key: "baz", Value: 3},
71+
{Key: "foo", Value: 5},
72+
{Key: "baz", Delete: true, Value: 6},
73+
{Key: "bar", Value: 6},
4174
},
4275
},
4376
}

internal/message/watchutil_test.go

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -89,43 +89,10 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
8989
}
9090
},
9191
)
92-
assert.LessOrEqual(t, storeCalls, 2) // Can be coalesced TODO handle delete
92+
assert.Equal(t, 2, storeCalls)
9393
assert.Equal(t, 1, deleteCalls)
9494
}
9595

96-
func TestHandleSubscriptionCoalescesUpdates(t *testing.T) {
97-
snapshotC := make(chan watchable.Snapshot[string, any], 2)
98-
snapshotC <- watchable.Snapshot[string, any]{State: map[string]any{}}
99-
snapshotC <- watchable.Snapshot[string, any]{
100-
State: map[string]any{},
101-
Updates: []watchable.Update[string, any]{
102-
{Key: "foo", Value: "v1"},
103-
{Key: "foo", Value: "v2"},
104-
{Key: "foo", Delete: true},
105-
{Key: "foo", Value: "v3"},
106-
{Key: "bar", Value: "v1"},
107-
{Key: "bar", Value: "v2"},
108-
},
109-
}
110-
close(snapshotC)
111-
112-
var observed []watchable.Update[string, any]
113-
message.HandleSubscription[string, any](
114-
message.Metadata{Runner: "demo", Message: "demo"},
115-
snapshotC,
116-
func(update message.Update[string, any], errChans chan error) {
117-
observed = append(observed, watchable.Update[string, any](update))
118-
},
119-
)
120-
121-
assert.Len(t, observed, 2)
122-
assert.Equal(t, "foo", observed[0].Key)
123-
assert.Equal(t, "v3", observed[0].Value)
124-
assert.False(t, observed[0].Delete)
125-
assert.Equal(t, "bar", observed[1].Key)
126-
assert.Equal(t, "v2", observed[1].Value)
127-
}
128-
12996
func TestControllerResourceUpdate(t *testing.T) {
13097
tests := []struct {
13198
desc string

0 commit comments

Comments
 (0)