Skip to content

Commit 53caaa1

Browse files
committed
treat delete as normal operations
Signed-off-by: Huabing Zhao <[email protected]>
1 parent 784e555 commit 53caaa1

File tree

2 files changed

+10
-38
lines changed

2 files changed

+10
-38
lines changed

internal/message/watchutil.go

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,6 @@ func HandleSubscription[K comparable, V any](
106106
}
107107
}
108108

109-
// updateKey is a helper struct used to uniquely identify updates by their key and deletion status.
110-
// This is to ensure that delete operations are treated distinctly from update operations.
111-
type updateKey[K comparable] struct {
112-
Key K
113-
Delete bool
114-
}
115-
116109
// coalesceUpdates merges multiple updates for the same key into a single update,
117110
// preserving the latest state for each key.
118111
// This helps reduce redundant processing and ensures that only the most recent update per key is handled.
@@ -122,25 +115,22 @@ func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Upd
122115
}
123116

124117
result := make([]watchable.Update[K, V], 0, len(updates))
125-
indexByKey := make(map[updateKey[K]]int, len(updates))
118+
indexByKey := make(map[K]int, len(updates))
126119

127-
for i := len(updates) - 1; i >= 0; i-- {
128-
update := updates[i]
129-
key := updateKey[K]{Key: update.Key, Delete: update.Delete}
130-
if _, ok := indexByKey[key]; ok {
120+
for _, update := range updates {
121+
if idx, ok := indexByKey[update.Key]; ok {
122+
// Keep the latest update for this key.
123+
result[idx] = update
131124
continue
132125
}
133126

134-
indexByKey[key] = len(result)
127+
indexByKey[update.Key] = len(result)
135128
result = append(result, update)
136129
}
137130

138-
// Reverse the result slice to restore the original order
139-
for left, right := 0, len(result)-1; left < right; left, right = left+1, right-1 {
140-
result[left], result[right] = result[right], result[left]
131+
if len(result) != len(updates) {
132+
logger.WithValues("runner", runner).Info("coalesced updates", "count", len(result), "before", len(updates))
141133
}
142134

143-
logger.WithValues("runner", runner).Info("coalesced updates", "count", len(result), "before", len(updates))
144-
145135
return result
146136
}

internal/message/watchutil_internal_test.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,37 +40,19 @@ func TestCoalesceUpdates(t *testing.T) {
4040
},
4141
{
4242
name: "latest update per key wins",
43-
input: []watchable.Update[string, int]{
44-
{Key: "foo", Value: 1},
45-
{Key: "bar", Value: 2},
46-
{Key: "baz", Value: 3},
47-
{Key: "bar", Value: 4},
48-
{Key: "foo", Value: 5},
49-
{Key: "baz", Value: 6},
50-
},
51-
expected: []watchable.Update[string, int]{
52-
{Key: "bar", Value: 4},
53-
{Key: "foo", Value: 5},
54-
{Key: "baz", Value: 6},
55-
},
56-
},
57-
{
58-
name: "do not coalesce delete and update for the same key",
5943
input: []watchable.Update[string, int]{
6044
{Key: "foo", Value: 1},
6145
{Key: "bar", Delete: true, Value: 2},
6246
{Key: "baz", Value: 3},
6347
{Key: "bar", Value: 4},
6448
{Key: "foo", Value: 5},
6549
{Key: "baz", Delete: true, Value: 6},
66-
{Key: "bar", Value: 6},
50+
{Key: "bar", Value: 7},
6751
},
6852
expected: []watchable.Update[string, int]{
69-
{Key: "bar", Delete: true, Value: 2},
70-
{Key: "baz", Value: 3},
7153
{Key: "foo", Value: 5},
54+
{Key: "bar", Value: 7},
7255
{Key: "baz", Delete: true, Value: 6},
73-
{Key: "bar", Value: 6},
7456
},
7557
},
7658
}

0 commit comments

Comments
 (0)