Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 555379f

Browse files
committedSep 5, 2023
WIP: Only obtain a bearer token once at a time
Currently, on pushes, we can start several concurrent layer pushes; each one will check for a bearer token in tokenCache, find none, and ask the server for one, and then write it into the cache. So, we can hammer the server with 6 basically-concurrent token requests. That's unnecessary, slower than just asking once, and potentially might impact rate limiting heuristics. Instead, serialize writes to a bearerToken so that we only have one request in flight at a time. This does not apply to pulls, where the first request is for a manifest; that obtains a token, so subsequent concurrent layer pulls will not request a token again. WIP: Clean up the debugging log entries. Signed-off-by: Miloslav Trmač <mitr@redhat.com>
1 parent 6707866 commit 555379f

File tree

1 file changed

+75
-20
lines changed

1 file changed

+75
-20
lines changed
 

‎docker/docker_client.go

+75-20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
digest "github.com/opencontainers/go-digest"
3333
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
3434
"github.com/sirupsen/logrus"
35+
"golang.org/x/sync/semaphore"
3536
)
3637

3738
const (
@@ -84,8 +85,19 @@ type extensionSignatureList struct {
8485
Signatures []extensionSignature `json:"signatures"`
8586
}
8687

87-
// bearerToken records a cached token we can use to authenticate.
88+
// bearerToken records a cached token we can use to authenticate, or a pending process to obtain one.
89+
//
90+
// The goroutine obtaining the token holds lock to block concurrent token requests, and fills the structure (err and possibly the other fields)
91+
// before releasing the lock.
92+
// Other goroutines obtain lock to block on the token request, if any; and then inspect err to see if the token is usable.
93+
// If it is not, they try to get a new one.
8894
type bearerToken struct {
95+
// lock is held while obtaining the token. Potentially nested inside dockerClient.tokenCacheLock.
96+
// This is a counting semaphore only because we need a cancellable lock operation.
97+
lock *semaphore.Weighted
98+
99+
// The following fields can only be accessed with lock held.
100+
err error // nil if the token was successfully obtained (but may be expired); an error if the next lock holder _must_ obtain a new token.
89101
token string
90102
expirationTime time.Time
91103
}
@@ -115,7 +127,7 @@ type dockerClient struct {
115127
supportsSignatures bool
116128

117129
// Private state for setupRequestAuth (key: string, value: bearerToken)
118-
tokenCacheLock sync.Mutex // Protects tokenCache
130+
tokenCacheLock sync.Mutex // Protects tokenCache.
119131
tokenCache map[string]*bearerToken
120132
// Private state for detectProperties:
121133
detectPropertiesOnce sync.Once // detectPropertiesOnce is used to execute detectProperties() at most once.
@@ -736,31 +748,74 @@ func (c *dockerClient) obtainBearerToken(ctx context.Context, challenge challeng
736748
scopes = append(scopes, *extraScope)
737749
}
738750

739-
var token *bearerToken
740-
var inCache bool
741-
func() { // A scope for defer
751+
logrus.Debugf("REMOVE: Checking token cache for key %q", cacheKey)
752+
token, newEntry, err := func() (*bearerToken, bool, error) { // A scope for defer
742753
c.tokenCacheLock.Lock()
743754
defer c.tokenCacheLock.Unlock()
744-
token, inCache = c.tokenCache[cacheKey]
745-
}()
746-
if !inCache || time.Now().After(token.expirationTime) {
747-
token = &bearerToken{}
748-
749-
var err error
750-
if c.auth.IdentityToken != "" {
751-
err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes)
755+
token, ok := c.tokenCache[cacheKey]
756+
if ok {
757+
return token, false, nil
752758
} else {
753-
err = c.getBearerToken(ctx, token, challenge, scopes)
759+
logrus.Debugf("REMOVE: No token cache for key %q, allocating one…", cacheKey)
760+
token = &bearerToken{
761+
lock: semaphore.NewWeighted(1),
762+
}
763+
// If this is a new *bearerToken, lock the entry before adding it to the cache, so that any other goroutine that finds
764+
// this entry blocks until we obtain the token for the first time, and does not see an empty object
765+
// (and does not try to obtain the token itself when we are going to do so).
766+
if err := token.lock.Acquire(ctx, 1); err != nil {
767+
// We do not block on this Acquire, so we don’t really expect to fail here — but if ctx is canceled,
768+
// there is no point in trying to continue anyway.
769+
return nil, false, err
770+
}
771+
c.tokenCache[cacheKey] = token
772+
return token, true, nil
754773
}
755-
if err != nil {
774+
}()
775+
if err != nil {
776+
return "", err
777+
}
778+
if !newEntry {
779+
// If this is an existing *bearerToken, obtain the lock only after releasing c.tokenCacheLock,
780+
// so that users of other cacheKey values are not blocked for the whole duration of our HTTP roundtrip.
781+
logrus.Debugf("REMOVE: Found existing token cache for key %q, getting lock", cacheKey)
782+
if err := token.lock.Acquire(ctx, 1); err != nil {
756783
return "", err
757784
}
785+
logrus.Debugf("REMOVE: Locked existing token cache for key %q", cacheKey)
786+
}
758787

759-
func() { // A scope for defer
760-
c.tokenCacheLock.Lock()
761-
defer c.tokenCacheLock.Unlock()
762-
c.tokenCache[cacheKey] = token
763-
}()
788+
defer token.lock.Release(1)
789+
790+
// Determine if the bearerToken is usable: if it is not, log the cause and fall through, otherwise return early.
791+
switch {
792+
case newEntry:
793+
logrus.Debugf("REMOVE: New token cache entry for key %q, getting first token", cacheKey)
794+
case token.err != nil:
795+
// If obtaining a token fails for any reason, the request that triggered that will fail;
796+
// other requests will see token.err and try obtaining their own token, one goroutine at a time.
797+
// (Consider that a request can fail because a very short timeout was provided to _that one operation_ using a context.Context;
798+
// that clearly shouldn’t prevent other operations from trying with a longer timeout.)
799+
//
800+
// If we got here while holding token.lock, we are the goroutine responsible for trying again; others are blocked
801+
// on token.lock.
802+
logrus.Debugf("REMOVE: Token cache for key %q records failure %v, getting new token", cacheKey, token.err)
803+
case time.Now().After(token.expirationTime):
804+
logrus.Debugf("REMOVE: Token cache for key %q is expired, getting new token", cacheKey)
805+
806+
default:
807+
return token.token, nil
808+
}
809+
810+
if c.auth.IdentityToken != "" {
811+
err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes)
812+
} else {
813+
err = c.getBearerToken(ctx, token, challenge, scopes)
814+
}
815+
logrus.Debugf("REMOVE: Obtaining a token for key %q, error %v", cacheKey, err)
816+
token.err = err
817+
if token.err != nil {
818+
return "", token.err
764819
}
765820
return token.token, nil
766821
}

0 commit comments

Comments
 (0)
Please sign in to comment.