Skip to content

Commit e6830e4

Browse files
Chief-Rishabravisuhag
authored andcommitted
feat: add support for retries on extractor error
1 parent e8b88cc commit e6830e4

File tree

3 files changed

+128
-3
lines changed

3 files changed

+128
-3
lines changed

agent/agent.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,23 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
176176
}
177177
stream.Shutdown()
178178
}()
179-
if err := runExtractor(); err != nil {
179+
180+
retryNotification := func(e error, d time.Duration) {
181+
r.logger.Warn(
182+
fmt.Sprintf("retrying extractor in %s", d),
183+
"retry_delay_ms", d.Milliseconds(),
184+
"extractor", recipe.Source.Name,
185+
"error", e,
186+
)
187+
}
188+
189+
err := r.retrier.retry(
190+
ctx,
191+
func() error { return runExtractor() },
192+
retryNotification,
193+
)
194+
195+
if err != nil {
180196
run.Error = errors.Wrap(err, "failed to run extractor")
181197
}
182198
}()

agent/agent_test.go

+107-1
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,58 @@ func TestAgentRun(t *testing.T) {
541541
ProcessorFactory: pf,
542542
SinkFactory: sf,
543543
Logger: utils.Logger,
544-
StopOnSinkError: true,
544+
StopOnSinkError: false,
545+
Monitor: monitor,
546+
})
547+
548+
run := r.Run(ctx, validRecipe)
549+
assert.False(t, run.Success)
550+
assert.Error(t, run.Error)
551+
})
552+
553+
t.Run("should return error when sink fails to close", func(t *testing.T) {
554+
data := []models.Record{
555+
models.NewRecord(&v1beta2.Asset{}),
556+
}
557+
558+
extr := mocks.NewExtractor()
559+
extr.SetEmit(data)
560+
extr.On("Init", mockCtx, buildPluginConfig(validRecipe.Source)).Return(nil).Once()
561+
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
562+
ef := registry.NewExtractorFactory()
563+
if err := ef.Register("test-extractor", newExtractor(extr)); err != nil {
564+
t.Fatal(err)
565+
}
566+
567+
proc := mocks.NewProcessor()
568+
proc.On("Init", mockCtx, buildPluginConfig(validRecipe.Processors[0])).Return(nil).Once()
569+
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
570+
defer proc.AssertExpectations(t)
571+
pf := registry.NewProcessorFactory()
572+
if err := pf.Register("test-processor", newProcessor(proc)); err != nil {
573+
t.Fatal(err)
574+
}
575+
576+
sink := mocks.NewSink()
577+
sink.On("Init", mockCtx, buildPluginConfig(validRecipe.Sinks[0])).Return(nil).Once()
578+
sink.On("Sink", mockCtx, data).Return(nil)
579+
sink.On("Close").Return(errors.New("some error"))
580+
defer sink.AssertExpectations(t)
581+
sf := registry.NewSinkFactory()
582+
if err := sf.Register("test-sink", newSink(sink)); err != nil {
583+
t.Fatal(err)
584+
}
585+
586+
monitor := newMockMonitor()
587+
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
588+
defer monitor.AssertExpectations(t)
589+
590+
r := agent.NewAgent(agent.Config{
591+
ExtractorFactory: ef,
592+
ProcessorFactory: pf,
593+
SinkFactory: sf,
594+
Logger: utils.Logger,
595+
StopOnSinkError: false,
545596
Monitor: monitor,
546597
})
547598

@@ -671,6 +722,61 @@ func TestAgentRun(t *testing.T) {
671722
assert.Equal(t, validRecipe, run.Recipe)
672723
})
673724

725+
t.Run("should retry if extractor returns retry error", func(t *testing.T) {
726+
err := errors.New("some-error")
727+
data := []models.Record{
728+
models.NewRecord(&v1beta2.Asset{}),
729+
}
730+
731+
extr := mocks.NewExtractor()
732+
extr.SetEmit(data)
733+
extr.On("Init", mockCtx, buildPluginConfig(validRecipe.Source)).Return(nil).Once()
734+
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(plugins.NewRetryError(err)).Twice()
735+
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
736+
ef := registry.NewExtractorFactory()
737+
if err := ef.Register("test-extractor", newExtractor(extr)); err != nil {
738+
t.Fatal(err)
739+
}
740+
741+
proc := mocks.NewProcessor()
742+
proc.On("Init", mockCtx, buildPluginConfig(validRecipe.Processors[0])).Return(nil).Once()
743+
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
744+
defer proc.AssertExpectations(t)
745+
pf := registry.NewProcessorFactory()
746+
if err := pf.Register("test-processor", newProcessor(proc)); err != nil {
747+
t.Fatal(err)
748+
}
749+
750+
sink := mocks.NewSink()
751+
sink.On("Init", mockCtx, buildPluginConfig(validRecipe.Sinks[0])).Return(nil).Once()
752+
sink.On("Sink", mockCtx, data).Return(nil)
753+
sink.On("Close").Return(nil)
754+
defer sink.AssertExpectations(t)
755+
756+
sf := registry.NewSinkFactory()
757+
if err := sf.Register("test-sink", newSink(sink)); err != nil {
758+
t.Fatal(err)
759+
}
760+
761+
monitor := newMockMonitor()
762+
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
763+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
764+
defer monitor.AssertExpectations(t)
765+
766+
r := agent.NewAgent(agent.Config{
767+
ExtractorFactory: ef,
768+
ProcessorFactory: pf,
769+
SinkFactory: sf,
770+
Logger: utils.Logger,
771+
Monitor: monitor,
772+
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
773+
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
774+
})
775+
run := r.Run(ctx, validRecipe)
776+
assert.NoError(t, run.Error)
777+
assert.Equal(t, validRecipe, run.Recipe)
778+
})
779+
674780
t.Run("should retry if sink returns retry error", func(t *testing.T) {
675781
err := errors.New("some-error")
676782
data := []models.Record{

plugins/extractors/caramlstore/caramlstore.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
8888

8989
// Extract checks if the table is valid and extracts the table schema
9090
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
91-
defer e.client.Close()
9291

9392
projects, err := e.client.Projects(ctx)
9493
if err != nil {
@@ -136,6 +135,10 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
136135
return nil
137136
}
138137

138+
func (e *Extractor) Close() error {
139+
return e.client.Close()
140+
}
141+
139142
func shouldRetry(err error) bool {
140143
switch utils.StatusCode(err) {
141144
case codes.Canceled,

0 commit comments

Comments
 (0)