Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions accesslogs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Options struct {
QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"`
RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"`
ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"`
UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"5m"`
}
}

Expand All @@ -118,6 +119,9 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor {
if opts.UploadingOptions.ShutdownTimeout <= 0 {
opts.UploadingOptions.ShutdownTimeout = time.Minute
}
if opts.UploadingOptions.UploadTimeout <= 0 {
opts.UploadingOptions.UploadTimeout = 5 * time.Minute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot a 5-minute default is a bit too much. Let's go with a slightly lower value, like 30 seconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the default upload timeout from 5 minutes to 30 seconds in commit 9ef9a2c.

}

return &Processor{
log: log,
Expand All @@ -126,6 +130,7 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor {
queueLimit: opts.UploadingOptions.QueueLimit,
retryLimit: opts.UploadingOptions.RetryLimit,
shutdownTimeout: opts.UploadingOptions.ShutdownTimeout,
uploadTimeout: opts.UploadingOptions.UploadTimeout,
}),

defaultEntryLimit: opts.DefaultEntryLimit,
Expand Down
10 changes: 7 additions & 3 deletions accesslogs/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type sequentialUploader struct {
queueLimit int
retryLimit int
shutdownTimeout time.Duration
uploadTimeout time.Duration

mu sync.Mutex
queue chan upload
Expand All @@ -94,6 +95,7 @@ type sequentialUploaderOptions struct {
queueLimit int
retryLimit int
shutdownTimeout time.Duration
uploadTimeout time.Duration
}

func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *sequentialUploader {
Expand All @@ -103,6 +105,7 @@ func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *seq
queueLimit: opts.queueLimit,
retryLimit: opts.retryLimit,
shutdownTimeout: opts.shutdownTimeout,
uploadTimeout: opts.uploadTimeout,
queue: make(chan upload, opts.queueLimit),
}
}
Expand Down Expand Up @@ -192,9 +195,10 @@ func (u *sequentialUploader) run() error {
for {
select {
case up := <-u.queue:
// TODO(artur): we need to figure out what context we want
// to pass here. WithTimeout(Background, …)?
if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), u.uploadTimeout)
err := up.store.Put(ctx, up.bucket, up.key, up.body)
cancel()
if err != nil {
if up.retries == u.retryLimit {
mon.Event("upload_dropped")
u.log.Error("retry limit reached",
Expand Down
40 changes: 40 additions & 0 deletions accesslogs/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestLimits(t *testing.T) {
queueLimit: 2,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})

for range 2 {
Expand All @@ -57,6 +58,7 @@ func TestQueueNoLimit(t *testing.T) {
queueLimit: 2,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestQueueNoLimitErroringStorage(t *testing.T) {
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand All @@ -112,6 +115,7 @@ func TestQueueErroringStorage(t *testing.T) {
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: time.Second,
uploadTimeout: time.Minute,
})
defer ctx.Check(u.close)
ctx.Go(u.run)
Expand All @@ -120,3 +124,39 @@ func TestQueueErroringStorage(t *testing.T) {
require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB)))
}
}

type slowStorage struct {
delay time.Duration
}

func (s slowStorage) Put(ctx context.Context, bucket, key string, data []byte) error {
select {
case <-time.After(s.delay):
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func TestUploadTimeout(t *testing.T) {
t.Parallel()

ctx := testcontext.New(t)
defer ctx.Cleanup()

log := zaptest.NewLogger(t)
defer ctx.Check(log.Sync)

s := slowStorage{delay: 2 * time.Second}
u := newSequentialUploader(log, sequentialUploaderOptions{
entryLimit: 5 * memory.KiB,
queueLimit: 10,
retryLimit: 1,
shutdownTimeout: 5 * time.Second,
uploadTimeout: 500 * time.Millisecond,
})
defer ctx.Check(u.close)
ctx.Go(u.run)

require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB)))
}