99 "time"
1010
1111 "github.com/pkg/errors"
12+ "github.com/raystack/meteor/metrics/otelmw"
1213 "github.com/raystack/meteor/models"
1314 "github.com/raystack/meteor/plugins"
1415 "github.com/raystack/meteor/recipe"
@@ -26,7 +27,7 @@ type Agent struct {
2627 extractorFactory * registry.ExtractorFactory
2728 processorFactory * registry.ProcessorFactory
2829 sinkFactory * registry.SinkFactory
29- monitor Monitor
30+ monitor [] Monitor
3031 logger log.Logger
3132 retrier * retrier
3233 stopOnSinkError bool
@@ -36,9 +37,6 @@ type Agent struct {
3637// NewAgent returns an Agent with plugin factories.
3738func NewAgent (config Config ) * Agent {
3839 mt := config .Monitor
39- if isNilMonitor (mt ) {
40- mt = new (defaultMonitor )
41- }
4240
4341 timerFn := config .TimerFn
4442 if timerFn == nil {
@@ -59,7 +57,8 @@ func NewAgent(config Config) *Agent {
5957}
6058
6159// Validate checks the recipe for linting errors.
62- func (r * Agent ) Validate (rcp recipe.Recipe ) (errs []error ) {
60+ func (r * Agent ) Validate (rcp recipe.Recipe ) []error {
61+ var errs []error
6362 if ext , err := r .extractorFactory .Get (rcp .Source .Name ); err != nil {
6463 errs = append (errs , err )
6564 } else {
@@ -92,7 +91,8 @@ func (r *Agent) Validate(rcp recipe.Recipe) (errs []error) {
9291 errs = append (errs , r .enrichInvalidConfigError (err , p .Name , plugins .PluginTypeProcessor ))
9392 }
9493 }
95- return
94+
95+ return errs
9696}
9797
9898// RunMultiple executes multiple recipes.
@@ -120,34 +120,37 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
120120 r .logger .Info ("running recipe" , "recipe" , run .Recipe .Name )
121121
122122 var (
123- getDuration = r .timerFn ()
124- stream = newStream ()
125- recordCnt int64
123+ getDuration = r .timerFn ()
124+ stream = newStream ()
125+ recordCnt int64
126+ extractorRetryCnt int64
126127 )
127128
128129 defer func () {
129130 run .DurationInMs = getDuration ()
130- r .logAndRecordMetrics (run )
131+ run .ExtractorRetries = int (extractorRetryCnt )
132+ run .AssetsExtracted = int (recordCnt )
133+ r .logAndRecordMetrics (ctx , run )
131134 }()
132135
133136 runExtractor , err := r .setupExtractor (ctx , recipe .Source , stream )
134137 if err != nil {
135- run .Error = errors . Wrap ( err , "failed to setup extractor" )
136- return
138+ run .Error = fmt . Errorf ( " setup extractor: %w" , err )
139+ return run
137140 }
138141
139142 for _ , pr := range recipe .Processors {
140- if err := r .setupProcessor (ctx , pr , stream ); err != nil {
141- run .Error = errors . Wrap ( err , "failed to setup processor" )
142- return
143+ if err := r .setupProcessor (ctx , pr , stream , recipe . Name ); err != nil {
144+ run .Error = fmt . Errorf ( " setup processor: %w" , err )
145+ return run
143146 }
144147 }
145148
146149 for _ , sr := range recipe .Sinks {
147- err := r .setupSink (ctx , sr , stream , recipe )
150+ err := r .setupSink (ctx , sr , stream , recipe . Name )
148151 if err != nil {
149- run .Error = errors . Wrap ( err , "failed to setup sink" )
150- return
152+ run .Error = fmt . Errorf ( " setup sink: %w" , err )
153+ return run
151154 }
152155 }
153156
@@ -178,6 +181,7 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
178181 }()
179182
180183 retryNotification := func (e error , d time.Duration ) {
184+ atomic .AddInt64 (& extractorRetryCnt , 1 )
181185 r .logger .Warn (
182186 fmt .Sprintf ("retrying extractor in %s" , d ),
183187 "retry_delay_ms" , d .Milliseconds (),
@@ -191,55 +195,60 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
191195 func () error { return runExtractor () },
192196 retryNotification ,
193197 )
194-
195198 if err != nil {
196- run .Error = errors . Wrap ( err , "failed to run extractor" )
199+ run .Error = fmt . Errorf ( " run extractor: %w" , err )
197200 }
198201 }()
199202 defer stream .Close ()
200203
201204 // start listening.
202205 // this process is blocking
203206 if err := stream .broadcast (); err != nil {
204- run .Error = errors . Wrap ( err , "failed to broadcast stream" )
207+ run .Error = fmt . Errorf ( " broadcast stream: %w" , err )
205208 }
206209
207210 // code will reach here stream.Listen() is done.
208211 run .RecordCount = (int )(recordCnt )
209212 run .Success = run .Error == nil
210- return
213+ return run
211214}
212215
213216func (r * Agent ) setupExtractor (ctx context.Context , sr recipe.PluginRecipe , str * stream ) (runFn func () error , err error ) {
214217 extractor , err := r .extractorFactory .Get (sr .Name )
215218 if err != nil {
216- return nil , errors . Wrapf ( err , "could not find extractor \" %s \" " , sr .Name )
219+ return nil , fmt . Errorf ( " find extractor %q: %w" , sr .Name , err )
217220 }
218221 if err := extractor .Init (ctx , recipeToPluginConfig (sr )); err != nil {
219- return nil , errors . Wrapf ( err , "could not initiate extractor \" %s \" " , sr .Name )
222+ return nil , fmt . Errorf ( " initiate extractor %q: %w" , sr .Name , err )
220223 }
221224
222225 return func () error {
223226 if err := extractor .Extract (ctx , str .push ); err != nil {
224- return errors . Wrapf ( err , "error running extractor \" %s \" " , sr .Name )
227+ return fmt . Errorf ( "run extractor %q: %w" , sr .Name , err )
225228 }
226229 return nil
227230 }, nil
228231}
229232
230- func (r * Agent ) setupProcessor (ctx context.Context , pr recipe.PluginRecipe , str * stream ) error {
233+ func (r * Agent ) setupProcessor (ctx context.Context , pr recipe.PluginRecipe , str * stream , recipeName string ) ( err error ) {
231234 proc , err := r .processorFactory .Get (pr .Name )
232235 if err != nil {
233- return errors .Wrapf (err , "could not find processor \" %s\" " , pr .Name )
236+ return fmt .Errorf ("find processor %q: %w" , pr .Name , err )
237+ }
238+
239+ proc , err = otelmw .WithProcessorMW (proc , pr .Name , recipeName )
240+ if err != nil {
241+ return fmt .Errorf ("wrap processor %q: %w" , pr .Name , err )
234242 }
243+
235244 if err := proc .Init (ctx , recipeToPluginConfig (pr )); err != nil {
236- return errors . Wrapf ( err , "could not initiate processor \" %s \" " , pr .Name )
245+ return fmt . Errorf ( " initiate processor %q: %w" , pr .Name , err )
237246 }
238247
239248 str .setMiddleware (func (src models.Record ) (models.Record , error ) {
240249 dst , err := proc .Process (ctx , src )
241250 if err != nil {
242- return models.Record {}, errors . Wrapf ( err , "error running processor \" %s \" " , pr .Name )
251+ return models.Record {}, fmt . Errorf ( "run processor %q: %w" , pr .Name , err )
243252 }
244253
245254 return dst , nil
@@ -248,16 +257,32 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
248257 return nil
249258}
250259
251- func (r * Agent ) setupSink (ctx context.Context , sr recipe.PluginRecipe , stream * stream , recipe recipe.Recipe ) error {
260+ func (r * Agent ) setupSink (ctx context.Context , sr recipe.PluginRecipe , stream * stream , recipeName string ) error {
261+ pluginInfo := PluginInfo {
262+ RecipeName : recipeName ,
263+ PluginName : sr .Name ,
264+ PluginType : "sink" ,
265+ }
266+
252267 sink , err := r .sinkFactory .Get (sr .Name )
253268 if err != nil {
254- return errors . Wrapf ( err , "could not find sink \" %s \" " , sr .Name )
269+ return fmt . Errorf ( " find sink %q: %w" , sr .Name , err )
255270 }
271+
272+ sink , err = otelmw .WithSinkMW (sink , sr .Name , recipeName )
273+ if err != nil {
274+ return fmt .Errorf ("wrap otel sink %q: %w" , sr .Name , err )
275+ }
276+
256277 if err := sink .Init (ctx , recipeToPluginConfig (sr )); err != nil {
257- return errors . Wrapf ( err , "could not initiate sink \" %s \" " , sr .Name )
278+ return fmt . Errorf ( " initiate sink %q: %w" , sr .Name , err )
258279 }
259280
260281 retryNotification := func (e error , d time.Duration ) {
282+ for _ , mt := range r .monitor {
283+ mt .RecordSinkRetryCount (ctx , pluginInfo )
284+ }
285+
261286 r .logger .Warn (
262287 fmt .Sprintf ("retrying sink in %s" , d ),
263288 "retry_delay_ms" , d .Milliseconds (),
@@ -266,14 +291,20 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
266291 )
267292 }
268293 stream .subscribe (func (records []models.Record ) error {
294+ pluginInfo .BatchSize = len (records )
295+
269296 err := r .retrier .retry (
270297 ctx ,
271- func () error { return sink .Sink (ctx , records ) },
298+ func () error {
299+ return sink .Sink (ctx , records )
300+ },
272301 retryNotification ,
273302 )
274303
275- success := err == nil
276- r .monitor .RecordPlugin (recipe .Name , sr .Name , "sink" , success )
304+ pluginInfo .Success = err == nil
305+ for _ , mt := range r .monitor {
306+ mt .RecordPlugin (ctx , pluginInfo ) // this can be deleted when statsd is removed
307+ }
277308 if err != nil {
278309 // once it reaches here, it means that the retry has been exhausted and still got error
279310 r .logger .Error ("error running sink" , "sink" , sr .Name , "error" , err .Error ())
@@ -283,7 +314,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
283314 return nil
284315 }
285316
286- r .logger .Info ("Successfully published record" , "sink" , sr .Name , "recipe" , recipe . Name )
317+ r .logger .Info ("Successfully published record" , "sink" , sr .Name , "recipe" , recipeName )
287318 return nil
288319 }, defaultBatchSize )
289320
@@ -296,8 +327,11 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
296327 return nil
297328}
298329
299- func (r * Agent ) logAndRecordMetrics (run Run ) {
300- r .monitor .RecordRun (run )
330+ func (r * Agent ) logAndRecordMetrics (ctx context.Context , run Run ) {
331+ for _ , monitor := range r .monitor {
332+ monitor .RecordRun (ctx , run )
333+ }
334+
301335 if run .Success {
302336 r .logger .Info ("done running recipe" ,
303337 "recipe" , run .Recipe .Name ,
0 commit comments