diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7fc9b2d..ae86321 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -4,6 +4,10 @@ name: tests on: push: +permissions: + contents: read + packages: read + jobs: build-test: name: Build and test @@ -21,6 +25,19 @@ jobs: with: go-version: '1.24' + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ github.token }} + + - name: Start Workbench + uses: scality/workbench@v0.5.0 + - name: Start ClickHouse run: docker compose up -d @@ -52,6 +69,12 @@ jobs: name: log-courier path: ./bin/log-courier + - name: Stop Workbench + if: always() + run: | + workbench logs + workbench down + - name: Stop ClickHouse if: always() run: | diff --git a/docker-compose.yml b/docker-compose.yml index d05ecc7..71328f7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: image: docker.io/clickhouse/clickhouse-server:24.3.2.23-alpine container_name: log-courier-clickhouse ports: - - "9000:9000" # Native protocol + - "9002:9000" # Native protocol - "8123:8123" # HTTP interface environment: CLICKHOUSE_DB: logs diff --git a/env/default/.gitignore b/env/default/.gitignore new file mode 100644 index 0000000..9001028 --- /dev/null +++ b/env/default/.gitignore @@ -0,0 +1,4 @@ +# Generated by workbench +/* +!values.yaml +!.gitignore diff --git a/env/default/values.yaml b/env/default/values.yaml new file mode 100644 index 0000000..2c7c30b --- /dev/null +++ b/env/default/values.yaml @@ -0,0 +1,17 @@ +global: + log_level: info + +features: + scuba: + enabled: false + enable_service_user: false + + +cloudserver: + image: ghcr.io/scality/cloudserver:7.70.77 + +vault: + image: ghcr.io/scality/vault:7.81.0 + +s3_metadata: + image: ghcr.io/scality/metadata:8.19.0-standalone diff --git a/go.mod b/go.mod index 3ebc94b..b5b4cbd 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,24 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.40.3 // indirect github.com/Masterminds/semver/v3 v3.4.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.4 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect + github.com/aws/aws-sdk-go-v2/config v1.31.15 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.19 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.11 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.89.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.29.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.9 // indirect + github.com/aws/smithy-go v1.23.1 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.7.1 // indirect diff --git a/go.sum b/go.sum index 5f863d9..ec20b21 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,42 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1 github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/aws/aws-sdk-go-v2 v1.39.4 h1:qTsQKcdQPHnfGYBBs+Btl8QwxJeoWcOcPcixK90mRhg= +github.com/aws/aws-sdk-go-v2 v1.39.4/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= +github.com/aws/aws-sdk-go-v2/config v1.31.15 h1:gE3M4xuNXfC/9bG4hyowGm/35uQTi7bUKeYs5e/6uvU= +github.com/aws/aws-sdk-go-v2/config v1.31.15/go.mod h1:HvnvGJoE2I95KAIW8kkWVPJ4XhdrlvwJpV6pEzFQa8o= +github.com/aws/aws-sdk-go-v2/credentials v1.18.19 h1:Jc1zzwkSY1QbkEcLujwqRTXOdvW8ppND3jRBb/VhBQc= +github.com/aws/aws-sdk-go-v2/credentials v1.18.19/go.mod h1:DIfQ9fAk5H0pGtnqfqkbSIzky82qYnGvh06ASQXXg6A= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11 h1:X7X4YKb+c0rkI6d4uJ5tEMxXgCZ+jZ/D6mvkno8c8Uw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.11/go.mod h1:EqM6vPZQsZHYvC4Cai35UDg/f5NCEU+vp0WfbVqVcZc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11 h1:7AANQZkF3ihM8fbdftpjhken0TP9sBzFbV/Ze/Y4HXA= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.11/go.mod h1:NTF4QCGkm6fzVwncpkFQqoquQyOolcyXfbpC98urj+c= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11 h1:ShdtWUZT37LCAA4Mw2kJAJtzaszfSHFb5n25sdcv4YE= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.11/go.mod h1:7bUb2sSr2MZ3M/N+VyETLTQtInemHXb/Fl3s8CLzm0Y= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.11 h1:bKgSxk1TW//00PGQqYmrq83c+2myGidEclp+t9pPqVI= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.11/go.mod h1:vrPYCQ6rFHL8jzQA8ppu3gWX18zxjLIDGTeqDxkBmSI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.2 h1:DGFpGybmutVsCuF6vSuLZ25Vh55E3VmsnJmFfjeBx4M= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.2/go.mod h1:hm/wU1HDvXCFEDzOLorQnZZ/CVvPXvWEmHMSmqgQRuA= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11 h1:GpMf3z2KJa4RnJ0ew3Hac+hRFYLZ9DDjfgXjuW+pB54= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.11/go.mod h1:6MZP3ZI4QQsgUCFTwMZA2V0sEriNQ8k2hmoHF3qjimQ= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.11 h1:weapBOuuFIBEQ9OX/NVW3tFQCvSutyjZYk/ga5jDLPo= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.11/go.mod h1:3C1gN4FmIVLwYSh8etngUS+f1viY6nLCDVtZmrFbDy0= +github.com/aws/aws-sdk-go-v2/service/s3 v1.89.0 h1:JbCUlVDEjmhpvpIgXP9QN+/jW61WWWj99cGmxMC49hM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.89.0/go.mod h1:UHKgcRSx8PVtvsc1Poxb/Co3PD3wL7P+f49P0+cWtuY= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.8 h1:M5nimZmugcZUO9wG7iVtROxPhiqyZX6ejS1lxlDPbTU= +github.com/aws/aws-sdk-go-v2/service/sso v1.29.8/go.mod h1:mbef/pgKhtKRwrigPPs7SSSKZgytzP8PQ6P6JAAdqyM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3 h1:S5GuJZpYxE0lKeMHKn+BRTz6PTFpgThyJ+5mYfux7BM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.3/go.mod h1:X4OF+BTd7HIb3L+tc4UlWHVrpgwZZIVENU15pRDVTI0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.9 h1:Ekml5vGg6sHSZLZJQJagefnVe6PmqC2oiRkBq4F7fU0= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.9/go.mod h1:/e15V+o1zFHWdH3u7lpI3rVBcxszktIKuHKCY2/py+k= +github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= +github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/logcourier/config.go b/pkg/logcourier/config.go index 727265f..79c8d27 100644 --- a/pkg/logcourier/config.go +++ b/pkg/logcourier/config.go @@ -29,5 +29,15 @@ func ValidateConfig() error { return fmt.Errorf("consumer.time-threshold-seconds must be positive, got %d", timeThreshold) } + maxRetryAttempts := ConfigSpec.GetInt("s3.max-retry-attempts") + if maxRetryAttempts <= 0 { + return fmt.Errorf("s3.max-retry-attempts must be positive, got %d", maxRetryAttempts) + } + + maxBackoffDelay := ConfigSpec.GetInt("s3.max-backoff-delay-seconds") + if maxBackoffDelay <= 0 { + return fmt.Errorf("s3.max-backoff-delay-seconds must be positive, got %d", maxBackoffDelay) + } + return nil } diff --git a/pkg/logcourier/configspec.go b/pkg/logcourier/configspec.go index 7918179..f81e7c6 100644 --- a/pkg/logcourier/configspec.go +++ b/pkg/logcourier/configspec.go @@ -9,7 +9,7 @@ var ConfigSpec = util.ConfigSpec{ // ClickHouse connection "clickhouse.url": util.ConfigVarSpec{ Help: "ClickHouse connection URL", - DefaultValue: "localhost:9000", + DefaultValue: "localhost:9002", EnvVar: "LOG_COURIER_CLICKHOUSE_URL", }, "clickhouse.username": util.ConfigVarSpec{ @@ -40,6 +40,33 @@ var ConfigSpec = util.ConfigSpec{ EnvVar: "LOG_COURIER_CONSUMER_TIME_THRESHOLD_SECONDS", }, + // S3 configuration + "s3.endpoint": util.ConfigVarSpec{ + Help: "S3 endpoint URL", + DefaultValue: "127.0.0.1:8000", + EnvVar: "S3_ENDPOINT", + }, + "s3.access-key-id": util.ConfigVarSpec{ + Help: "S3 access key ID", + DefaultValue: "", + EnvVar: "S3_ACCESS_KEY_ID", + }, + "s3.secret-access-key": util.ConfigVarSpec{ + Help: "S3 secret access key", + DefaultValue: "", + EnvVar: "S3_SECRET_ACCESS_KEY", + }, + "s3.max-retry-attempts": util.ConfigVarSpec{ + Help: "Maximum number of retry attempts for S3 operations (including initial request)", + DefaultValue: 3, + EnvVar: "S3_MAX_RETRY_ATTEMPTS", + }, + "s3.max-backoff-delay-seconds": util.ConfigVarSpec{ + Help: "Maximum backoff delay in seconds between S3 retry attempts", + DefaultValue: 20, + EnvVar: "S3_MAX_BACKOFF_DELAY_SECONDS", + }, + // General "log-level": util.ConfigVarSpec{ Help: "Log level (error|warn|info|debug)", diff --git a/pkg/s3/client.go b/pkg/s3/client.go new file mode 100644 index 0000000..f57a3f7 --- /dev/null +++ b/pkg/s3/client.go @@ -0,0 +1,87 @@ +package s3 + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +const ( + defaultRegion = "us-east-1" +) + +// Client wraps S3 client +type Client struct { + s3Client *s3.Client +} + +// Config holds S3 client configuration +type Config struct { + Endpoint string + AccessKeyID string + SecretAccessKey string + MaxRetryAttempts int + MaxBackoffDelay time.Duration +} + +// NewClient creates a new S3 client +func NewClient(ctx context.Context, cfg Config) (*Client, error) { + if cfg.AccessKeyID == "" || cfg.SecretAccessKey == "" { + return nil, fmt.Errorf("access key ID and secret access key are required") + } + + var optFns []func(*config.LoadOptions) error + + // Set region and credentials + optFns = append(optFns, + config.WithRegion(defaultRegion), + config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + cfg.AccessKeyID, + cfg.SecretAccessKey, + "", + ), + ), + ) + + // Set retry configuration if non-zero values provided + if cfg.MaxRetryAttempts > 0 || cfg.MaxBackoffDelay > 0 { + optFns = append(optFns, config.WithRetryer(func() aws.Retryer { + retryer := retry.NewStandard() + var result aws.Retryer = retryer + if cfg.MaxRetryAttempts > 0 { + result = retry.AddWithMaxAttempts(result, cfg.MaxRetryAttempts) + } + if cfg.MaxBackoffDelay > 0 { + result = retry.AddWithMaxBackoffDelay(result, cfg.MaxBackoffDelay) + } + return result + })) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, optFns...) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + + // Create S3 client + s3ClientOpts := []func(*s3.Options){} + + // Set endpoint + if cfg.Endpoint != "" { + s3ClientOpts = append(s3ClientOpts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true // Required for non-AWS S3-compatible services + }) + } + + s3Client := s3.NewFromConfig(awsCfg, s3ClientOpts...) + + return &Client{s3Client: s3Client}, nil +} diff --git a/pkg/s3/uploader.go b/pkg/s3/uploader.go new file mode 100644 index 0000000..dc05848 --- /dev/null +++ b/pkg/s3/uploader.go @@ -0,0 +1,37 @@ +package s3 + +import ( + "bytes" + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// Uploader uploads log objects to S3 +type Uploader struct { + client *Client +} + +// NewUploader creates a new uploader +func NewUploader(client *Client) *Uploader { + return &Uploader{client: client} +} + +// Upload uploads a log object to the specified bucket. +// Retries are handled automatically by the SDK client based on its retry configuration. +func (u *Uploader) Upload(ctx context.Context, bucket, key string, content []byte) error { + _, err := u.client.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(content), + ContentType: aws.String("text/plain"), + }) + + if err != nil { + return fmt.Errorf("failed to upload to S3: bucket=%s, key=%s: %w", bucket, key, err) + } + + return nil +} diff --git a/pkg/s3/uploader_test.go b/pkg/s3/uploader_test.go new file mode 100644 index 0000000..ca1a319 --- /dev/null +++ b/pkg/s3/uploader_test.go @@ -0,0 +1,286 @@ +package s3_test + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + + "github.com/scality/log-courier/pkg/logcourier" + "github.com/scality/log-courier/pkg/s3" +) + +func TestS3(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "S3 Suite") +} + +const ( + testKey = "test-key" + testBucket = "test-log-courier" + testEndpoint = "http://127.0.0.1:8000" + testRegion = "us-east-1" + workbenchAccessKey = "LSOVSCTL01CME9OETI5A" + //nolint:gosec // Test credentials + workbenchSecretKey = "6xHQtgUX46WwfsxyhhdatdWqlZj0omlgVSLx4qNV" +) + +// createS3ClientAndBucket creates an AWS S3 client and ensures the bucket exists. +// This is used in test setup to create buckets using the AWS SDK directly. +func createS3ClientAndBucket(ctx context.Context, cfg s3.Config, bucketName string) error { + s3Client := awss3.NewFromConfig(aws.Config{ + Region: testRegion, + Credentials: aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: cfg.AccessKeyID, + SecretAccessKey: cfg.SecretAccessKey, + }, nil + }), + }, func(o *awss3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + o.UsePathStyle = true + } + }) + + _, err := s3Client.CreateBucket(ctx, &awss3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + var bucketAlreadyExists *types.BucketAlreadyExists + var bucketAlreadyOwnedByYou *types.BucketAlreadyOwnedByYou + if !errors.As(err, &bucketAlreadyExists) && !errors.As(err, &bucketAlreadyOwnedByYou) { + return fmt.Errorf("failed to create test bucket: %w", err) + } + } + + return nil +} + +var _ = Describe("S3 Uploader", func() { + var ( + ctx context.Context + client *s3.Client + uploader *s3.Uploader + bucket string + ) + + BeforeEach(func() { + ctx = context.Background() + + viper.Reset() + viper.SetDefault("s3.endpoint", testEndpoint) + viper.SetDefault("s3.access-key-id", workbenchAccessKey) + viper.SetDefault("s3.secret-access-key", workbenchSecretKey) + + _ = viper.BindEnv("s3.endpoint", "S3_ENDPOINT") + _ = viper.BindEnv("s3.access-key-id", "S3_ACCESS_KEY_ID") + _ = viper.BindEnv("s3.secret-access-key", "S3_SECRET_ACCESS_KEY") + _ = viper.BindEnv("s3.max-retry-attempts", "S3_MAX_RETRY_ATTEMPTS") + _ = viper.BindEnv("s3.max-backoff-delay-seconds", "S3_MAX_BACKOFF_DELAY_SECONDS") + + bucket = testBucket + + cfg := s3.Config{ + Endpoint: logcourier.ConfigSpec.GetString("s3.endpoint"), + AccessKeyID: logcourier.ConfigSpec.GetString("s3.access-key-id"), + SecretAccessKey: logcourier.ConfigSpec.GetString("s3.secret-access-key"), + MaxRetryAttempts: logcourier.ConfigSpec.GetInt("s3.max-retry-attempts"), + MaxBackoffDelay: time.Duration(logcourier.ConfigSpec.GetInt("s3.max-backoff-delay-seconds")) * time.Second, + } + + var err error + client, err = s3.NewClient(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + + err = createS3ClientAndBucket(ctx, cfg, bucket) + Expect(err).NotTo(HaveOccurred()) + + uploader = s3.NewUploader(client) + }) + + Describe("NewClient", func() { + It("should create client with valid config", func() { + cfg := s3.Config{ + Endpoint: testEndpoint, + AccessKeyID: "test-access-key", + SecretAccessKey: "test-secret-key", + } + + client, err := s3.NewClient(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(client).NotTo(BeNil()) + }) + + It("should create client with custom retry configuration", func() { + cfg := s3.Config{ + Endpoint: testEndpoint, + AccessKeyID: "test-access-key", + SecretAccessKey: "test-secret-key", + MaxRetryAttempts: 5, + MaxBackoffDelay: 10 * time.Second, + } + + client, err := s3.NewClient(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(client).NotTo(BeNil()) + }) + + It("should fail without access key", func() { + cfg := s3.Config{ + Endpoint: testEndpoint, + AccessKeyID: "", + SecretAccessKey: "test-secret-key", + } + + _, err := s3.NewClient(ctx, cfg) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("access key ID and secret access key are required")) + }) + + It("should fail without secret key", func() { + cfg := s3.Config{ + Endpoint: testEndpoint, + AccessKeyID: "test-access-key", + SecretAccessKey: "", + } + + _, err := s3.NewClient(ctx, cfg) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("access key ID and secret access key are required")) + }) + }) + + Describe("Upload", func() { + It("should upload object successfully", func() { + key := fmt.Sprintf("test-logs/%d.log", time.Now().UnixNano()) + content := []byte("test log content\n") + + err := uploader.Upload(ctx, bucket, key, content) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should upload empty object", func() { + key := fmt.Sprintf("test-logs/empty-%d.log", time.Now().UnixNano()) + content := []byte("") + + err := uploader.Upload(ctx, bucket, key, content) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should handle large objects", func() { + key := fmt.Sprintf("test-logs/large-%d.log", time.Now().UnixNano()) + // 1MB content + content := make([]byte, 1024*1024) + for i := range content { + content[i] = byte(i % 256) + } + + err := uploader.Upload(ctx, bucket, key, content) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should fail with invalid bucket", func() { + content := []byte("test") + + err := uploader.Upload(ctx, "nonexistent-bucket-12345", testKey, content) + Expect(err).To(HaveOccurred()) + }) + + It("should retry on server errors and eventually succeed", func() { + var requestCount atomic.Int32 + + // Mock S3 server that fails first 2 requests, succeeds on 3rd + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count := requestCount.Add(1) + if count < 3 { + // Return 503 Service Unavailable (retryable error) + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(` + + ServiceUnavailable + Service is temporarily unavailable +`)) + return + } + // Success on 3rd attempt + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(` +`)) + })) + defer mockServer.Close() + + cfg := s3.Config{ + Endpoint: mockServer.URL, + AccessKeyID: workbenchAccessKey, + SecretAccessKey: workbenchSecretKey, + MaxRetryAttempts: 3, + MaxBackoffDelay: 1 * time.Second, // Short backoff for faster test + } + + retryClient, err := s3.NewClient(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + + retryUploader := s3.NewUploader(retryClient) + + content := []byte("test content") + + // Should succeed after retries + err = retryUploader.Upload(ctx, bucket, testKey, content) + Expect(err).NotTo(HaveOccurred()) + + // Verify exactly 3 requests were made + Expect(requestCount.Load()).To(Equal(int32(3))) + }) + + It("should exhaust retries and fail when all attempts fail", func() { + var requestCount atomic.Int32 + + // Mock S3 server that always fails + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount.Add(1) + // Always return 503 Service Unavailable + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(` + + ServiceUnavailable + Service is temporarily unavailable +`)) + })) + defer mockServer.Close() + + cfg := s3.Config{ + Endpoint: mockServer.URL, + AccessKeyID: workbenchAccessKey, + SecretAccessKey: workbenchSecretKey, + MaxRetryAttempts: 3, + MaxBackoffDelay: 1 * time.Second, // Short backoff for faster test + } + + retryClient, err := s3.NewClient(ctx, cfg) + Expect(err).NotTo(HaveOccurred()) + + retryUploader := s3.NewUploader(retryClient) + + content := []byte("test content") + + // Should fail after exhausting retries + err = retryUploader.Upload(ctx, bucket, testKey, content) + Expect(err).To(HaveOccurred()) + + // Verify exactly 3 requests were made (initial + 2 retries) + Expect(requestCount.Load()).To(Equal(int32(3))) + }) + }) +}) diff --git a/pkg/testutil/clickhouse.go b/pkg/testutil/clickhouse.go index 4208b78..c04c669 100644 --- a/pkg/testutil/clickhouse.go +++ b/pkg/testutil/clickhouse.go @@ -3,10 +3,12 @@ package testutil import ( "context" "fmt" - "os" "time" + "github.com/spf13/viper" + "github.com/scality/log-courier/pkg/clickhouse" + "github.com/scality/log-courier/pkg/logcourier" ) const ( @@ -25,16 +27,18 @@ type ClickHouseTestHelper struct { // NewClickHouseTestHelper creates a new test helper with a unique database func NewClickHouseTestHelper(ctx context.Context) (*ClickHouseTestHelper, error) { - url := os.Getenv("LOG_COURIER_CLICKHOUSE_URL") - if url == "" { - url = "localhost:9000" + for key, spec := range logcourier.ConfigSpec { + viper.SetDefault(key, spec.DefaultValue) + if spec.EnvVar != "" { + _ = viper.BindEnv(key, spec.EnvVar) + } } cfg := clickhouse.Config{ - URL: url, - Username: "default", - Password: "", - Timeout: 10 * time.Second, + URL: logcourier.ConfigSpec.GetString("clickhouse.url"), + Username: logcourier.ConfigSpec.GetString("clickhouse.username"), + Password: logcourier.ConfigSpec.GetString("clickhouse.password"), + Timeout: time.Duration(logcourier.ConfigSpec.GetInt("clickhouse.timeout-seconds")) * time.Second, } client, err := clickhouse.NewClient(ctx, cfg)