Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,38 @@ func main() {
}
```

### Streaming uploads

`PutObject` buffers the whole body in memory. For large or unbounded sources
(query exports, log streams), use `PutObjectStream`, which uploads from an
`io.Reader` via the AWS SDK transfer manager without buffering the full payload:

```go
out, err := s3Client.PutObjectStream(ctx, &client.PutObjectStreamInput{
Bucket: "my-bucket",
Key: "exports/large.csv",
Body: reader, // any io.Reader
ContentType: "text/csv",
MaxBytes: 500 * 1024 * 1024, // optional: abort past this many bytes
})
if err != nil {
if errors.Is(err, client.ErrStreamTooLarge) {
// stream exceeded MaxBytes and was aborted
}
log.Fatal(err)
}
log.Printf("uploaded, etag=%s", out.ETag)
```

Notes:

- The per-operation timeout (`S3_TIMEOUT`) is **not** applied to streaming
uploads, since they can legitimately run much longer than a normal request.
Control the deadline through the supplied `context.Context`.
- `MaxBytes` bounds the stream at the library level. The read-only and
size-limit MCP extensions guard the tool layer, not direct library calls;
`PutObjectStream` is currently a library-only capability (no MCP tool).

### Extensibility Patterns

**Middleware** wraps tool execution for cross-cutting concerns:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.41.9
github.com/aws/aws-sdk-go-v2/config v1.32.20
github.com/aws/aws-sdk-go-v2/credentials v1.19.19
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.2.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.102.2
github.com/modelcontextprotocol/go-sdk v1.6.1
gopkg.in/yaml.v3 v3.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.19.19 h1:yuFzSV1U0aRNYCQGVaTY2zW2M/L
github.com/aws/aws-sdk-go-v2/credentials v1.19.19/go.mod h1:7y63L1kGzeoDlJaQ3Z578KrnmfBut96JjvJUzGwR+YE=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.25 h1:0w6dCiO8iez+YKwRhRBlL1CH/E3GTfdkuzrwj1by8vo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.25/go.mod h1:9FDWUothyr5RCRAHc45XOiVCzUR8n/IhCYX+uVqw6vk=
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.2.3 h1:w5OoDiMN6x53ROmiIImGzmVcxXv2q1GXY+aKV4WAJYM=
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.2.3/go.mod h1:dAhgYp776bX3LuWvnSCFwQEjNs6fuFg7YXIy5PXcP3Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.25 h1:Uii3frf9ztec/ABM2/FSH9/z7PLzxfpG8h4RpkUFflQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.25/go.mod h1:G6kntsA2GorAxDPbap6xgB2F+amSLUF8GJTi7PUoX44=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.25 h1:r1+/l6m+WaUJF9HISEsNOLHSNj5EXYQxK8VX6Cz9NlA=
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
Expand All @@ -19,6 +20,7 @@ import (
type Client struct {
s3Client S3API
presignClient PresignAPI
uploader ObjectUploader
config *Config
connectionName string
}
Expand Down Expand Up @@ -186,9 +188,14 @@ func New(ctx context.Context, cfg *Config) (*Client, error) {
// Create presign client
presignClient := s3.NewPresignClient(s3Client)

// Create the streaming/multipart uploader. It shares the same underlying
// S3 client so it honors the configured endpoint, credentials, and region.
uploader := transfermanager.New(s3Client)

return &Client{
s3Client: s3Client,
presignClient: presignClient,
uploader: uploader,
config: cfg.Clone(),
connectionName: cfg.Name,
}, nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/client/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

Expand Down Expand Up @@ -98,6 +99,23 @@ func (m *mockPresignAPI) PresignPutObject(ctx context.Context, params *s3.PutObj
}, nil
}

// mockUploader is a mock implementation of ObjectUploader for testing the
// streaming upload path.
type mockUploader struct {
uploadObjectFunc func(
ctx context.Context, input *transfermanager.UploadObjectInput, opts ...func(*transfermanager.Options),
) (*transfermanager.UploadObjectOutput, error)
}

func (m *mockUploader) UploadObject(
ctx context.Context, input *transfermanager.UploadObjectInput, opts ...func(*transfermanager.Options),
) (*transfermanager.UploadObjectOutput, error) {
if m.uploadObjectFunc != nil {
return m.uploadObjectFunc(ctx, input, opts...)
}
return &transfermanager.UploadObjectOutput{}, nil
}

// newMockClient creates a Client with mock S3 and presign APIs for testing.
func newMockClient(s3api *mockS3API, presignAPI *mockPresignAPI) *Client {
if s3api == nil {
Expand Down
15 changes: 13 additions & 2 deletions pkg/client/s3api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

Expand All @@ -25,8 +26,18 @@ type PresignAPI interface {
PresignPutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
}

// ObjectUploader abstracts a streaming/multipart upload. It is satisfied by
// *transfermanager.Client from the AWS SDK and is defined here, at the consumer,
// so the streaming path can be mocked in unit tests.
type ObjectUploader interface {
UploadObject(
ctx context.Context, input *transfermanager.UploadObjectInput, opts ...func(*transfermanager.Options),
) (*transfermanager.UploadObjectOutput, error)
}

// Compile-time interface checks.
var (
_ S3API = (*s3.Client)(nil)
_ PresignAPI = (*s3.PresignClient)(nil)
_ S3API = (*s3.Client)(nil)
_ PresignAPI = (*s3.PresignClient)(nil)
_ ObjectUploader = (*transfermanager.Client)(nil)
)
103 changes: 103 additions & 0 deletions pkg/client/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package client

import (
"context"
"errors"
"fmt"
"io"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
)

// ErrStreamTooLarge indicates that a streaming upload was aborted because the
// body exceeded the caller-supplied size limit. Callers can test for it with
// errors.Is.
var ErrStreamTooLarge = errors.New("stream exceeds maximum allowed size")

// PutObjectStreamInput contains the parameters for a streaming/multipart upload.
//
// Unlike PutObjectInput, the body is an io.Reader rather than a []byte, so the
// payload is never fully buffered in memory. This makes it suitable for large
// or unbounded sources such as query exports.
type PutObjectStreamInput struct {
Bucket string
Key string
Body io.Reader
ContentType string
Metadata map[string]string

// MaxBytes, when greater than zero, aborts the upload once more than
// MaxBytes have been read from Body, returning an error that wraps
// ErrStreamTooLarge. A value of zero means no limit is enforced here.
MaxBytes int64
}

// PutObjectStream uploads an object from an io.Reader using the AWS SDK transfer
// manager, which splits the body into parts and uploads them without buffering
// the full payload in memory.
//
// Unlike the buffered operations on Client, the per-operation timeout
// (S3_TIMEOUT) is intentionally NOT applied here: a streaming upload of a large
// object can legitimately run far longer than an ordinary request. Callers
// control the deadline through ctx.
//
// Like the other write methods on Client, PutObjectStream performs the upload
// directly; the read-only and size-limit MCP extensions guard the tool layer,
// not direct library calls. Use MaxBytes to bound a stream at the library level.
func (c *Client) PutObjectStream(ctx context.Context, input *PutObjectStreamInput) (*PutObjectOutput, error) {
if input == nil {
return nil, fmt.Errorf("put object stream: input is required")
}
if input.Body == nil {
return nil, fmt.Errorf("put object stream: body is required")
}
if c.uploader == nil {
return nil, fmt.Errorf("put object stream: uploader is not configured")
}

body := input.Body
if input.MaxBytes > 0 {
body = &limitReader{r: body, max: input.MaxBytes}
}

uploadInput := &transfermanager.UploadObjectInput{
Bucket: aws.String(input.Bucket),
Key: aws.String(input.Key),
Body: body,
}
if input.ContentType != "" {
uploadInput.ContentType = aws.String(input.ContentType)
}
if len(input.Metadata) > 0 {
uploadInput.Metadata = input.Metadata
}

output, err := c.uploader.UploadObject(ctx, uploadInput)
if err != nil {
return nil, fmt.Errorf("failed to stream object: %w", err)
}

return &PutObjectOutput{
ETag: aws.ToString(output.ETag),
VersionID: aws.ToString(output.VersionID),
}, nil
}

// limitReader wraps an io.Reader and returns an error wrapping ErrStreamTooLarge
// once more than max bytes have been read. It enforces an upper bound on a
// stream whose length is not known in advance.
type limitReader struct {
r io.Reader
max int64
read int64
}

func (l *limitReader) Read(p []byte) (int, error) {
n, err := l.r.Read(p)
l.read += int64(n)
if l.read > l.max {
return n, fmt.Errorf("read %d bytes: %w of %d bytes", l.read, ErrStreamTooLarge, l.max)
}
return n, err
}
Loading
Loading