Skip to content

Commit 188e34f

Browse files
Add audit annotation for watch rejections due to storage initialization
Signed-off-by: Shaza Aldawamneh <[email protected]>
1 parent 96593f3 commit 188e34f

File tree

3 files changed

+50
-11
lines changed

3 files changed

+50
-11
lines changed

staging/src/k8s.io/apiserver/pkg/audit/context.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,21 @@ func AddAuditAnnotation(ctx context.Context, key, value string) {
281281
addAuditAnnotationLocked(ac, key, value)
282282
}
283283

284+
// AddAuditAnnotationForRejectWithReason records an audit annotation
285+
// indicating that a watch request was rejected for the given reason.
286+
func AddAuditAnnotationForRejectWithReason(ctx context.Context, reason string) {
287+
AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-reason", reason)
288+
}
289+
290+
// AddAuditAnnotationForRejectMessage adds a human-readable message annotation
291+
// truncated to 1024 characters to avoid excessive log size.
292+
func AddAuditAnnotationForRejectMessage(ctx context.Context, msg string) {
293+
if len(msg) > 1024 {
294+
msg = msg[:1024] + "…"
295+
}
296+
AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-message", msg)
297+
}
298+
284299
// AddAuditAnnotations is a bulk version of AddAuditAnnotation. Refer to AddAuditAnnotation for
285300
// restrictions on when this can be called.
286301
// keysAndValues are the key-value pairs to add, and must have an even number of items.

staging/src/k8s.io/apiserver/pkg/audit/context_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,18 @@ func withAuditContextAndLevel(ctx context.Context, t *testing.T, l auditinternal
182182
}
183183
return ctx
184184
}
185+
func TestAddAuditAnnotationForRejectWithReason(t *testing.T) {
186+
ctx := WithAuditContext(context.Background())
187+
AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing")
188+
AddAuditAnnotationForRejectMessage(ctx, "storage is (re)initializing")
189+
190+
ac := AuditContextFrom(ctx)
191+
annotations := ac.GetEventAnnotations()
192+
193+
if got := annotations["audit.k8s.io/watch-reject-reason"]; got != "storage_initializing" {
194+
t.Errorf("expected reason 'storage_initializing', got %q", got)
195+
}
196+
if got := annotations["audit.k8s.io/watch-reject-message"]; got != "storage is (re)initializing" {
197+
t.Errorf("expected message 'storage is (re)initializing', got %q", got)
198+
}
199+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@ package cacher
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"sync"
2324
"time"
2425

26+
"k8s.io/apiserver/pkg/audit"
2527
"k8s.io/utils/clock"
2628
)
2729

30+
// ErrStorageInitializing is returned when the cacher is still initializing.
31+
// This allows callers to detect this specific condition and handle it
32+
// (e.g., add an audit annotation or return HTTP 429).
33+
var ErrStorageInitializing = errors.New("storage is (re)initializing")
34+
2835
type status int
2936

3037
const (
@@ -80,7 +87,6 @@ func (r *ready) wait(ctx context.Context) error {
8087
// of times we entered ready state if Ready and error otherwise.
8188
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
8289
for {
83-
// r.done() only blocks if state is Pending
8490
select {
8591
case <-ctx.Done():
8692
return 0, ctx.Err()
@@ -89,18 +95,22 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
8995

9096
r.lock.RLock()
9197
if r.state == Pending {
92-
// since we allow to switch between the states Pending and Ready
93-
// if there is a quick transition from Pending -> Ready -> Pending
94-
// a process that was waiting can get unblocked and see a Pending
95-
// state again. If the state is Pending we have to wait again to
96-
// avoid an inconsistent state on the system, with some processes not
97-
// waiting despite the state moved back to Pending.
9898
r.lock.RUnlock()
9999
continue
100100
}
101+
101102
generation, err := r.readGenerationLocked()
102103
r.lock.RUnlock()
103-
return generation, err
104+
105+
if err != nil {
106+
if errors.Is(err, ErrStorageInitializing) {
107+
audit.AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing")
108+
audit.AddAuditAnnotationForRejectMessage(ctx, err.Error())
109+
}
110+
return 0, err
111+
}
112+
113+
return generation, nil
104114
}
105115
}
106116

@@ -122,10 +132,9 @@ func (r *ready) readGenerationLocked() (int, error) {
122132
switch r.state {
123133
case Pending:
124134
if r.lastErr == nil {
125-
return 0, fmt.Errorf("storage is (re)initializing")
126-
} else {
127-
return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
135+
return 0, ErrStorageInitializing
128136
}
137+
return 0, fmt.Errorf("%w: %v", ErrStorageInitializing, r.lastErr)
129138
case Ready:
130139
return r.generation, nil
131140
case Stopped:

0 commit comments

Comments
 (0)