From 37187b7231d5d77b5a8374609bed8ce7a7bb1b43 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Oct 2025 16:01:55 +0000 Subject: [PATCH 1/3] Initial plan From 63824b6c05f00f255285d608bdbcc84a7fb0d142 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Oct 2025 16:09:09 +0000 Subject: [PATCH 2/3] Add configurable upload timeout for parcel uploads Co-authored-by: amwolff <30234756+amwolff@users.noreply.github.com> --- accesslogs/processor.go | 5 +++++ accesslogs/uploader.go | 10 +++++++--- accesslogs/uploader_test.go | 40 +++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/accesslogs/processor.go b/accesslogs/processor.go index ca47374a..a6b7615a 100644 --- a/accesslogs/processor.go +++ b/accesslogs/processor.go @@ -96,6 +96,7 @@ type Options struct { QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"` RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"` ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"` + UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"5m"` } } @@ -118,6 +119,9 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor { if opts.UploadingOptions.ShutdownTimeout <= 0 { opts.UploadingOptions.ShutdownTimeout = time.Minute } + if opts.UploadingOptions.UploadTimeout <= 0 { + opts.UploadingOptions.UploadTimeout = 5 * time.Minute + } return &Processor{ log: log, @@ -126,6 +130,7 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor { queueLimit: opts.UploadingOptions.QueueLimit, retryLimit: opts.UploadingOptions.RetryLimit, shutdownTimeout: opts.UploadingOptions.ShutdownTimeout, + uploadTimeout: opts.UploadingOptions.UploadTimeout, }), defaultEntryLimit: opts.DefaultEntryLimit, diff --git a/accesslogs/uploader.go b/accesslogs/uploader.go index 128a4e01..e9e6a63e 100644 --- a/accesslogs/uploader.go +++ b/accesslogs/uploader.go @@ -79,6 +79,7 @@ type sequentialUploader struct { queueLimit int retryLimit int shutdownTimeout time.Duration + uploadTimeout time.Duration mu sync.Mutex queue chan upload @@ -94,6 +95,7 @@ type sequentialUploaderOptions struct { queueLimit int retryLimit int shutdownTimeout time.Duration + uploadTimeout time.Duration } func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *sequentialUploader { @@ -103,6 +105,7 @@ func newSequentialUploader(log *zap.Logger, opts sequentialUploaderOptions) *seq queueLimit: opts.queueLimit, retryLimit: opts.retryLimit, shutdownTimeout: opts.shutdownTimeout, + uploadTimeout: opts.uploadTimeout, queue: make(chan upload, opts.queueLimit), } } @@ -192,9 +195,10 @@ func (u *sequentialUploader) run() error { for { select { case up := <-u.queue: - // TODO(artur): we need to figure out what context we want - // to pass here. WithTimeout(Background, …)? - if err := up.store.Put(context.TODO(), up.bucket, up.key, up.body); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), u.uploadTimeout) + err := up.store.Put(ctx, up.bucket, up.key, up.body) + cancel() + if err != nil { if up.retries == u.retryLimit { mon.Event("upload_dropped") u.log.Error("retry limit reached", diff --git a/accesslogs/uploader_test.go b/accesslogs/uploader_test.go index a26b4ad4..614f1aac 100644 --- a/accesslogs/uploader_test.go +++ b/accesslogs/uploader_test.go @@ -32,6 +32,7 @@ func TestLimits(t *testing.T) { queueLimit: 2, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) for range 2 { @@ -57,6 +58,7 @@ func TestQueueNoLimit(t *testing.T) { queueLimit: 2, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -88,6 +90,7 @@ func TestQueueNoLimitErroringStorage(t *testing.T) { queueLimit: 10, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -112,6 +115,7 @@ func TestQueueErroringStorage(t *testing.T) { queueLimit: 10, retryLimit: 1, shutdownTimeout: time.Second, + uploadTimeout: time.Minute, }) defer ctx.Check(u.close) ctx.Go(u.run) @@ -120,3 +124,39 @@ func TestQueueErroringStorage(t *testing.T) { require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB))) } } + +type slowStorage struct { + delay time.Duration +} + +func (s slowStorage) Put(ctx context.Context, bucket, key string, data []byte) error { + select { + case <-time.After(s.delay): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func TestUploadTimeout(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + defer ctx.Cleanup() + + log := zaptest.NewLogger(t) + defer ctx.Check(log.Sync) + + s := slowStorage{delay: 2 * time.Second} + u := newSequentialUploader(log, sequentialUploaderOptions{ + entryLimit: 5 * memory.KiB, + queueLimit: 10, + retryLimit: 1, + shutdownTimeout: 5 * time.Second, + uploadTimeout: 500 * time.Millisecond, + }) + defer ctx.Check(u.close) + ctx.Go(u.run) + + require.NoError(t, u.queueUpload(s, "test", "test", testrand.Bytes(memory.KiB))) +} From 8af8299ec72f1dd7d864a0693228c132152d3238 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 21 Oct 2025 17:01:50 +0000 Subject: [PATCH 3/3] Make upload timeout per-parcel and change default to 30s - Changed default upload timeout from 5 minutes to 30 seconds - Made timeout scoped to individual parcels instead of global - Added uploadTimeout field to upload struct - Each upload now carries its own timeout value Co-authored-by: amwolff <30234756+amwolff@users.noreply.github.com> --- accesslogs/processor.go | 4 ++-- accesslogs/uploader.go | 35 +++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/accesslogs/processor.go b/accesslogs/processor.go index a6b7615a..82507766 100644 --- a/accesslogs/processor.go +++ b/accesslogs/processor.go @@ -96,7 +96,7 @@ type Options struct { QueueLimit int `user:"true" help:"log file upload queue limit" default:"100"` RetryLimit int `user:"true" help:"maximum number of retries for log file uploads" default:"3"` ShutdownTimeout time.Duration `user:"true" help:"time limit waiting for queued logs to finish uploading when gateway is shutting down" default:"1m"` - UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"5m"` + UploadTimeout time.Duration `user:"true" help:"time limit for each individual log file upload" default:"30s"` } } @@ -120,7 +120,7 @@ func NewProcessor(log *zap.Logger, opts Options) *Processor { opts.UploadingOptions.ShutdownTimeout = time.Minute } if opts.UploadingOptions.UploadTimeout <= 0 { - opts.UploadingOptions.UploadTimeout = 5 * time.Minute + opts.UploadingOptions.UploadTimeout = 30 * time.Second } return &Processor{ diff --git a/accesslogs/uploader.go b/accesslogs/uploader.go index e9e6a63e..7d2fd59e 100644 --- a/accesslogs/uploader.go +++ b/accesslogs/uploader.go @@ -65,11 +65,12 @@ type uploader interface { var _ uploader = (*sequentialUploader)(nil) type upload struct { - store Storage - bucket string - key string - body []byte - retries int + store Storage + bucket string + key string + body []byte + retries int + uploadTimeout time.Duration } type sequentialUploader struct { @@ -132,11 +133,12 @@ func (u *sequentialUploader) queueUpload(store Storage, bucket, key string, body u.mu.Unlock() u.queue <- upload{ - store: store, - bucket: bucket, - key: key, - body: body, - retries: 0, + store: store, + bucket: bucket, + key: key, + body: body, + retries: 0, + uploadTimeout: u.uploadTimeout, } return nil @@ -157,11 +159,12 @@ func (u *sequentialUploader) queueUploadWithoutQueueLimit(store Storage, bucket, u.mu.Unlock() u.queue <- upload{ - store: store, - bucket: bucket, - key: key, - body: body, - retries: 0, + store: store, + bucket: bucket, + key: key, + body: body, + retries: 0, + uploadTimeout: u.uploadTimeout, } return nil @@ -195,7 +198,7 @@ func (u *sequentialUploader) run() error { for { select { case up := <-u.queue: - ctx, cancel := context.WithTimeout(context.Background(), u.uploadTimeout) + ctx, cancel := context.WithTimeout(context.Background(), up.uploadTimeout) err := up.store.Put(ctx, up.bucket, up.key, up.body) cancel() if err != nil {