diff --git a/accesslogs/processor.go b/accesslogs/processor.go index ca47374a..82507766 100644 --- a/accesslogs/processor.go +++ b/accesslogs/processor.go @@ -96,6 +96,7 @@ type Options struct { QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"` RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"` ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"` + UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"30s"` } } @@ -118,6 +119,9 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor { if opts.UploadingOptions.ShutdownTimeout <= 0 { opts.UploadingOptions.ShutdownTimeout = time.Minute } + if opts.UploadingOptions.UploadTimeout <= 0 { + opts.UploadingOptions.UploadTimeout = 30 * time.Second + } return &Processor{ log: log, @@ -126,6 +130,7 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor { queueLimit: opts.UploadingOptions.QueueLimit, retryLimit: opts.UploadingOptions.RetryLimit, shutdownTimeout: opts.UploadingOptions.ShutdownTimeout, + uploadTimeout: opts.UploadingOptions.UploadTimeout, }), defaultEntryLimit: opts.DefaultEntryLimit, diff --git a/accesslogs/uploader.go b/accesslogs/uploader.go index 128a4e01..7d2fd59e 100644 --- a/accesslogs/uploader.go +++ b/accesslogs/uploader.go @@ -65,11 +65,12 @@ type uploader interface { var _ uploader = (*sequentialUploader)(nil) type upload struct { - store Storage - bucket string - key string - body []byte - retries int + store Storage + bucket string + key string + body []byte + retries int + uploadTimeout time.Duration } type sequentialUploader struct { @@ -79,6 +80,7 @@ type sequentialUploader struct { queueLimit int retryLimit int shutdownTimeout time.Duration + uploadTimeout time.Duration mu sync.Mutex queue chan upload @@ -94,6 +96,7 @@ type sequentialUploaderOptions struct { queueLimit int retryLimit int shutdownTimeout time.Duration + uploadTimeout time.Duration } func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *sequentialUploader { @@ -103,6 +106,7 @@ func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *seq queueLimit: opts.queueLimit, retryLimit: opts.retryLimit, shutdownTimeout: opts.shutdownTimeout, + uploadTimeout: opts.uploadTimeout, queue: make(chan upload, opts.queueLimit), } } @@ -129,11 +133,12 @@ func (u *sequentialUploader) queueUpload(store Storage, bucket, key string, body u.mu.Unlock() u.queue <- upload{ - store: store, - bucket: bucket, - key: key, - body: body, - retries: 0, + store: store, + bucket: bucket, + key: key, + body: body, + retries: 0, + uploadTimeout: u.uploadTimeout, } return nil @@ -154,11 +159,12 @@ func (u *sequentialUploader) queueUploadWithoutQueueLimit(store Storage, bucket, u.mu.Unlock() u.queue <- upload{ - store: store, - bucket: bucket, - key: key, - body: body, - retries: 0, + store: store, + bucket: bucket, + key: key, + body: body, + retries: 0, + uploadTimeout: u.uploadTimeout, } return nil @@ -192,9 +198,10 @@ func (u *sequentialUploader) run() error { for { select { case up := <-u.queue: - // TODO(artur): we need to figure out what context we want - // to pass here. WithTimeout(Background, …)? - if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), up.uploadTimeout) + err := up.store.Put(ctx, up.bucket, up.key, up.body) + cancel() + if err != nil { if up.retries == u.retryLimit { mon.Event("upload_dropped") u.log.Error("retry limit reached", diff --git a/accesslogs/uploader_test.go b/accesslogs/uploader_test.go index a26b4ad4..614f1aac 100644 --- a/accesslogs/uploader_test.go +++ b/accesslogs/uploader_test.go @@ -32,6 +32,7 @@ func TestLimits(t *testing.T) { queueLimit: 2, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) for range 2 { @@ -57,6 +58,7 @@ func TestQueueNoLimit(t *testing.T) { queueLimit: 2, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -88,6 +90,7 @@ func TestQueueNoLimitErroringStorage(t *testing.T) { queueLimit: 10, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -112,6 +115,7 @@ func TestQueueErroringStorage(t *testing.T) { queueLimit: 10, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -120,3 +124,39 @@ func TestQueueErroringStorage(t *testing.T) { require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB))) } } + +type slowStorage struct { + delay time.Duration +} + +func (s slowStorage) Put(ctx context.Context, bucket, key string, data []byte) error { + select { + case <-time.After(s.delay): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func TestUploadTimeout(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + log := zaptest.NewLogger(t) + defer ctx.Check(log.Sync) + + s := slowStorage{delay: 2 * time.Second} + u := newSequentialUploader(log, sequentialUploaderOptions{ + entryLimit: 5 * memory.KiB, + queueLimit: 10, + retryLimit: 1, + shutdownTimeout: 5 * time.Second, + uploadTimeout: 500 * time.Millisecond, + }) + defer ctx.Check(u.close) + ctx.Go(u.run) + + require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB))) +}