Skip to content

Commit f873a04

Browse files
authored
🐛 Fix the issue of event loss caused by time filtering. (#1101)
* skipp the expired event Signed-off-by: myan <[email protected]> * fix the ut Signed-off-by: myan <[email protected]> * integration Signed-off-by: myan <[email protected]> --------- Signed-off-by: myan <[email protected]>
1 parent afe4255 commit f873a04

File tree

4 files changed

+18
-2
lines changed

4 files changed

+18
-2
lines changed

agent/pkg/status/controller/filter/time_filter.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var (
2222
eventTimeCache = make(map[string]time.Time)
2323
lastEventTimeCache = make(map[string]time.Time)
2424
eventTimeCacheInterval = 5 * time.Second
25+
deltaDuration = 3 * time.Second
2526
)
2627

2728
// CacheTime cache the latest time
@@ -38,7 +39,10 @@ func Newer(key string, val time.Time) bool {
3839
if !ok {
3940
return true
4041
}
41-
return val.After(old)
42+
43+
// add noise to the time filter to ensure that events occurring very close together in time are not discarded
44+
older := old.Add(-deltaDuration)
45+
return val.After(older)
4246
}
4347

4448
// LaunchTimeFilter start a goroutine periodically sync the time filter cache to configMap

agent/pkg/status/controller/filter/time_filter_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,14 @@ func TestTimeFilter(t *testing.T) {
141141
assert.Nil(t, err)
142142
assert.True(t, cachedTime.Equal(expiredTime))
143143
cancel()
144+
145+
fmt.Println(">> verify5: don't lose events with similar time")
146+
similiarTime := time.Now().Add(10 * time.Second)
147+
assert.True(t, Newer(eventType, similiarTime))
148+
CacheTime(eventType, similiarTime)
149+
assert.True(t, Newer(eventType, similiarTime))
150+
CacheTime(eventType, similiarTime.Add(2*time.Second))
151+
assert.True(t, Newer(eventType, similiarTime))
152+
CacheTime(eventType, similiarTime.Add(5*time.Second))
153+
assert.False(t, Newer(eventType, similiarTime))
144154
}

agent/pkg/status/controller/policies/status_event_emitter.go

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
cloudevents "github.com/cloudevents/sdk-go/v2"
1010
"github.com/go-logr/logr"
1111
corev1 "k8s.io/api/core/v1"
12+
"k8s.io/klog"
1213
policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -102,6 +103,7 @@ func (h *statusEventEmitter) Update(obj client.Object) bool {
102103
for _, evt := range detail.History {
103104
// if the event time is older thant the filter cached sent event time, then skip it
104105
if !filter.Newer(h.name, evt.LastTimestamp.Time) {
106+
klog.Infof("skip the expired event: %s", evt.EventName)
105107
continue
106108
}
107109

test/integration/agent/status/localpolicy_event_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ var _ = Describe("LocalPolicyEventEmitter", Ordered, func() {
9191
Expect(err).Should(Succeed())
9292

9393
name := strings.Replace(string(enum.LocalRootPolicyEventType), enum.EventTypePrefix, "", -1)
94-
filter.CacheTime(name, cachedRootPolicyEvent.CreationTimestamp.Time.Add(5*time.Second))
94+
filter.CacheTime(name, cachedRootPolicyEvent.CreationTimestamp.Time.Add(9*time.Second)) // the delta is 3 seconds
9595

9696
By("Create a expired event")
9797
expiredEvent := &corev1.Event{

0 commit comments

Comments
 (0)