-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement backoff for snowpipe streaming authorization errors #5399
base: master
Are you sure you want to change the base?
Conversation
@@ -26,6 +26,9 @@ import ( | |||
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" | |||
) | |||
|
|||
// Creating an alias since "model.TableSchema" is defined in an internal module | |||
type ModelTableSchema = model.TableSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any alternatives ? I can't import model.TableSchema
in the snowpipestreaming test since it is in an internal module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use whutils.ModelTableSchema
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5399 +/- ##
==========================================
- Coverage 74.82% 74.80% -0.03%
==========================================
Files 438 438
Lines 61331 61404 +73
==========================================
+ Hits 45893 45933 +40
- Misses 12895 12932 +37
+ Partials 2543 2539 -4 ☔ View full report in Codecov by Sentry. |
9c22207
to
476cdb4
Compare
476cdb4
to
5644298
Compare
5644298
to
141b31f
Compare
@@ -26,6 +26,9 @@ import ( | |||
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" | |||
) | |||
|
|||
// Creating an alias since "model.TableSchema" is defined in an internal module | |||
type ModelTableSchema = model.TableSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use whutils.ModelTableSchema
@@ -43,6 +48,29 @@ func (m *mockAPI) GetStatus(_ context.Context, channelID string) (*model.StatusR | |||
return m.getStatusOutputMap[channelID]() | |||
} | |||
|
|||
type mockManager struct { | |||
manager.Manager | |||
throwSchemaErr bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a boolean, we can set the following error like createSchemaErr
, createTableErr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
backoffDuration time.Duration | ||
initialBackoffDuration time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use github.com/cenkalti/backoff/v4
instead of implementing this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced the custom implementation with the library
var authzErr *snowpipeAuthzError | ||
if errors.As(err, &authzErr) { | ||
if shouldResetBackoff { | ||
shouldResetBackoff = false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var authzErr *snowpipeAuthzError | |
if errors.As(err, &authzErr) { | |
if shouldResetBackoff { | |
shouldResetBackoff = false | |
} | |
} | |
if errors.As(err, &authzErr) && !shouldResetBackoff { | |
shouldResetBackoff = false | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed shouldResetBackoff
since it was unnecessary
whutils "github.com/rudderlabs/rudder-server/warehouse/utils" | ||
) | ||
|
||
var json = jsoniter.ConfigCompatibleWithStandardLibrary | ||
|
||
func New( | ||
conf *config.Config, | ||
logger logger.Logger, | ||
mLogger logger.Logger, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mLogger
Do you mean module logger? Can we rename it as log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had renamed it to mLogger
since managerCreator
had an argument of type logger.Logger
. Renaming it to log
logger.NewStringField("table", discardsTable()), | ||
obskit.Error(err), | ||
) | ||
shouldResetBackoff = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return from here itself. No point in going forward.
shouldResetBackoff = false | |
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs)) | |
return common.AsyncUploadOutput{ | |
FailedJobIDs: asyncDest.ImportingJobIDs, | |
FailedCount: len(asyncDest.ImportingJobIDs), | |
FailedReason: fmt.Errorf("failed to initialize channel with schema: %w", err).Error(), | |
DestinationID: asyncDest.Destination.ID, | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an early return with failed job reporting
aa4f042
to
9779ca7
Compare
9779ca7
to
e268f73
Compare
Description
This PR introduces a backoff mechanism when creating channels in the Snowpipe streaming destination. If an authorization error is encountered at the schema, table, or column level, the backoff ensures that we avoid repeatedly attempting to connect to Snowflake unnecessarily.
authzBackoff
in manager to store backoff related statemanagerCreator
as a field in the manager for making the code testable.Security