Skip to content

Commit

Permalink
Merge pull request #3131 from redpanda-data/snow_log
Browse files Browse the repository at this point in the history
snowflake: add logging around storage creds refresh
  • Loading branch information
rockwotj authored Jan 20, 2025
2 parents 6b9e0fd + d3f0e5e commit 2752d1b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
4 changes: 4 additions & 0 deletions internal/impl/snowflake/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func SetupSnowflakeStream(t *testing.T, outputConfiguration string) (service.Mes
stream, err := streamBuilder.Build()
require.NoError(t, err)
license.InjectTestService(stream.Resources())
t.Cleanup(func() {
err := stream.Stop(context.Background())
require.NoError(t, err)
})
return produce, stream
}

Expand Down
6 changes: 5 additions & 1 deletion internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,15 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl

uploader: uploaderAtomic,
// Tokens expire every hour, so refresh a bit before that
uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(2*time.Minute), func(ctx context.Context) {
uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(5*time.Minute), func(ctx context.Context) {
client.logger.Info("refreshing snowflake storage credentials")
resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role})
if err != nil {
client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err)
uploaderAtomic.Store(stageUploaderResult{err: err})
return
}
client.logger.Debug("refreshing snowflake storage credentials success")
// TODO: Do the other checks here that the Java SDK does (deploymentID, etc)
uploader, err := newUploader(resp.StageLocation)
uploaderAtomic.Store(stageUploaderResult{uploader: uploader, err: err})
Expand All @@ -128,6 +131,7 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl

// Close closes the client and future requests have undefined behavior.
func (c *SnowflakeServiceClient) Close() {
c.options.Logger.Debug("closing snowflake streaming output")
c.uploadRefreshLoop.Stop()
c.client.Close()
c.flusher.Close()
Expand Down

0 comments on commit 2752d1b

Please sign in to comment.