Skip to content

Commit 9c8b27a

Browse files
Add audit annotation for watch rejections due to storage initialization
Signed-off-by: Shaza Aldawamneh <[email protected]>
1 parent 188e34f commit 9c8b27a

File tree

4 files changed

+20
-42
lines changed

4 files changed

+20
-42
lines changed

staging/src/k8s.io/apiserver/pkg/audit/context.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -281,21 +281,6 @@ func AddAuditAnnotation(ctx context.Context, key, value string) {
281281
addAuditAnnotationLocked(ac, key, value)
282282
}
283283

284-
// AddAuditAnnotationForRejectWithReason records an audit annotation
285-
// indicating that a watch request was rejected for the given reason.
286-
func AddAuditAnnotationForRejectWithReason(ctx context.Context, reason string) {
287-
AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-reason", reason)
288-
}
289-
290-
// AddAuditAnnotationForRejectMessage adds a human-readable message annotation
291-
// truncated to 1024 characters to avoid excessive log size.
292-
func AddAuditAnnotationForRejectMessage(ctx context.Context, msg string) {
293-
if len(msg) > 1024 {
294-
msg = msg[:1024] + "…"
295-
}
296-
AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-message", msg)
297-
}
298-
299284
// AddAuditAnnotations is a bulk version of AddAuditAnnotation. Refer to AddAuditAnnotation for
300285
// restrictions on when this can be called.
301286
// keysAndValues are the key-value pairs to add, and must have an even number of items.

staging/src/k8s.io/apiserver/pkg/audit/context_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,3 @@ func withAuditContextAndLevel(ctx context.Context, t *testing.T, l auditinternal
182182
}
183183
return ctx
184184
}
185-
func TestAddAuditAnnotationForRejectWithReason(t *testing.T) {
186-
ctx := WithAuditContext(context.Background())
187-
AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing")
188-
AddAuditAnnotationForRejectMessage(ctx, "storage is (re)initializing")
189-
190-
ac := AuditContextFrom(ctx)
191-
annotations := ac.GetEventAnnotations()
192-
193-
if got := annotations["audit.k8s.io/watch-reject-reason"]; got != "storage_initializing" {
194-
t.Errorf("expected reason 'storage_initializing', got %q", got)
195-
}
196-
if got := annotations["audit.k8s.io/watch-reject-message"]; got != "storage is (re)initializing" {
197-
t.Errorf("expected message 'storage is (re)initializing', got %q", got)
198-
}
199-
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
506506
} else {
507507
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
508508
if err != nil {
509+
if err == ErrStorageInitializing || strings.HasPrefix(err.Error(), "storage is (re)initializing") {
510+
// Add audit annotations with OpenShift prefix
511+
audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-reason", "storage_initializing")
512+
513+
msg := err.Error()
514+
if len(msg) > 1024 {
515+
msg = msg[:1024] + "…"
516+
}
517+
audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-message", msg)
518+
}
519+
520+
// Return HTTP 503 (ServiceUnavailable) to the client
509521
return nil, errors.NewServiceUnavailable(err.Error())
510522
}
511523
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"sync"
2424
"time"
2525

26-
"k8s.io/apiserver/pkg/audit"
2726
"k8s.io/utils/clock"
2827
)
2928

@@ -87,6 +86,7 @@ func (r *ready) wait(ctx context.Context) error {
8786
// of times we entered ready state if Ready and error otherwise.
8887
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
8988
for {
89+
// r.done() only blocks if state is Pending
9090
select {
9191
case <-ctx.Done():
9292
return 0, ctx.Err()
@@ -95,22 +95,18 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
9595

9696
r.lock.RLock()
9797
if r.state == Pending {
98+
// since we allow to switch between the states Pending and Ready
99+
// if there is a quick transition from Pending -> Ready -> Pending
100+
// a process that was waiting can get unblocked and see a Pending
101+
// state again. If the state is Pending we have to wait again to
102+
// avoid an inconsistent state on the system, with some processes not
103+
// waiting despite the state moved back to Pending.
98104
r.lock.RUnlock()
99105
continue
100106
}
101-
102107
generation, err := r.readGenerationLocked()
103108
r.lock.RUnlock()
104-
105-
if err != nil {
106-
if errors.Is(err, ErrStorageInitializing) {
107-
audit.AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing")
108-
audit.AddAuditAnnotationForRejectMessage(ctx, err.Error())
109-
}
110-
return 0, err
111-
}
112-
113-
return generation, nil
109+
return generation, err
114110
}
115111
}
116112

0 commit comments

Comments
 (0)