@@ -320,9 +320,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
320320
321321 spans , err := ca .setupSpansAndFrontier ()
322322 if err != nil {
323- if log .V (2 ) {
324- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error setting up spans and frontier: %v" , err )
325- }
323+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error setting up spans and frontier: %v" , err )
326324 ca .MoveToDraining (err )
327325 ca .cancel ()
328326 return
@@ -348,9 +346,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
348346 scope , _ := opts .GetMetricScope ()
349347 ca .sliMetrics , err = ca .metrics .getSLIMetrics (scope )
350348 if err != nil {
351- if log .V (2 ) {
352- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error getting sli metrics: %v" , err )
353- }
349+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error getting sli metrics: %v" , err )
354350 ca .MoveToDraining (err )
355351 ca .cancel ()
356352 return
@@ -360,21 +356,16 @@ func (ca *changeAggregator) Start(ctx context.Context) {
360356 recorder := metricsRecorder (ca .sliMetrics )
361357 recorder , err = ca .wrapMetricsRecorderWithTelemetry (ctx , recorder )
362358 if err != nil {
363- if log .V (2 ) {
364- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error wrapping metrics controller: %v" , err )
365- }
359+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error wrapping metrics controller: %v" , err )
366360 ca .MoveToDraining (err )
367361 ca .cancel ()
368- return
369362 }
370363
371364 ca .sink , err = getEventSink (ctx , ca .FlowCtx .Cfg , ca .spec .Feed , timestampOracle ,
372365 ca .spec .User (), ca .spec .JobID , recorder )
373366 if err != nil {
374367 err = changefeedbase .MarkRetryableError (err )
375- if log .V (2 ) {
376- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error getting sink: %v" , err )
377- }
368+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error getting sink: %v" , err )
378369 ca .MoveToDraining (err )
379370 ca .cancel ()
380371 return
@@ -406,9 +397,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
406397 limit := changefeedbase .PerChangefeedMemLimit .Get (& ca .FlowCtx .Cfg .Settings .SV )
407398 ca .eventProducer , ca .kvFeedDoneCh , ca .errCh , err = ca .startKVFeed (ctx , spans , kvFeedHighWater , needsInitialScan , feed , pool , limit , opts )
408399 if err != nil {
409- if log .V (2 ) {
410- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error starting kv feed: %v" , err )
411- }
400+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error starting kv feed: %v" , err )
412401 ca .MoveToDraining (err )
413402 ca .cancel ()
414403 return
@@ -418,9 +407,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
418407 ctx , ca .FlowCtx .Cfg , ca .spec , feed , ca .frontier , kvFeedHighWater ,
419408 ca .sink , ca .metrics , ca .sliMetrics , ca .knobs )
420409 if err != nil {
421- if log .V (2 ) {
422- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error creating event consumer: %v" , err )
423- }
410+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error creating event consumer: %v" , err )
424411 ca .MoveToDraining (err )
425412 ca .cancel ()
426413 return
@@ -635,7 +622,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
635622 // Checkpointed spans are spans that were above the highwater mark, and we
636623 // must preserve that information in the frontier for future checkpointing.
637624 if err := checkpoint .Restore (ca .frontier , ca .spec .SpanLevelCheckpoint ); err != nil {
638- return nil , err
625+ return nil , errors . Wrapf ( err , "failed to restore span-level checkpoint" )
639626 }
640627
641628 return spans , nil
@@ -751,9 +738,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
751738 // NB: we do not invoke ca.cancel here -- just merely moving
752739 // to drain state so that the trailing metadata callback
753740 // has a chance to produce shutdown checkpoint.
754- if log .V (2 ) {
755- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error while checking for node drain: %v" , err )
756- }
741+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error while checking for node drain: %v" , err )
757742 ca .MoveToDraining (err )
758743 break
759744 }
@@ -775,9 +760,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
775760 }
776761 // Shut down the poller if it wasn't already.
777762 ca .cancel ()
778- if log .V (2 ) {
779- log .Infof (ca .Ctx (), "change aggregator moving to draining due to error from tick: %v" , err )
780- }
763+ log .Dev .Warningf (ca .Ctx (), "moving to draining due to error from tick: %v" , err )
781764 ca .MoveToDraining (err )
782765 break
783766 }
@@ -1315,9 +1298,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13151298 scope := cf .spec .Feed .Opts [changefeedbase .OptMetricsScope ]
13161299 sli , err := cf .metrics .getSLIMetrics (scope )
13171300 if err != nil {
1318- if log .V (2 ) {
1319- log .Infof (cf .Ctx (), "change frontier moving to draining due to error getting sli metrics: %v" , err )
1320- }
1301+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error getting sli metrics: %v" , err )
13211302 cf .MoveToDraining (err )
13221303 return
13231304 }
@@ -1327,9 +1308,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13271308 cf .spec .User (), cf .spec .JobID , sli )
13281309 if err != nil {
13291310 err = changefeedbase .MarkRetryableError (err )
1330- if log .V (2 ) {
1331- log .Infof (cf .Ctx (), "change frontier moving to draining due to error getting sink: %v" , err )
1332- }
1311+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error getting sink: %v" , err )
13331312 cf .MoveToDraining (err )
13341313 return
13351314 }
@@ -1342,9 +1321,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13421321
13431322 cf .highWaterAtStart = cf .spec .Feed .StatementTime
13441323 if cf .evalCtx .ChangefeedState == nil {
1345- if log .V (2 ) {
1346- log .Infof (cf .Ctx (), "change frontier moving to draining due to missing changefeed state" )
1347- }
1324+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to missing changefeed state" )
13481325 cf .MoveToDraining (errors .AssertionFailedf ("expected initialized local state" ))
13491326 return
13501327 }
@@ -1356,9 +1333,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13561333 if cf .spec .JobID != 0 {
13571334 job , err := cf .FlowCtx .Cfg .JobRegistry .LoadClaimedJob (ctx , cf .spec .JobID )
13581335 if err != nil {
1359- if log .V (2 ) {
1360- log .Infof (cf .Ctx (), "change frontier moving to draining due to error loading claimed job: %v" , err )
1361- }
1336+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to error loading claimed job: %v" , err )
13621337 cf .MoveToDraining (err )
13631338 return
13641339 }
@@ -1403,15 +1378,16 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14031378 // Set up the resolved span frontier.
14041379 cf .frontier , err = resolvedspan .NewCoordinatorFrontier (cf .spec .Feed .StatementTime , initialHighwater , cf .spec .TrackedSpans ... )
14051380 if err != nil {
1406- log .Infof (cf .Ctx (), "change frontier moving to draining due to error setting up frontier: %v" , err )
1381+ log .Dev . Warningf (cf .Ctx (), "moving to draining due to error setting up frontier: %v" , err )
14071382 cf .MoveToDraining (err )
14081383 return
14091384 }
14101385
14111386 if err := checkpoint .Restore (cf .frontier , cf .spec .SpanLevelCheckpoint ); err != nil {
1412- if log .V (2 ) {
1413- log .Infof (cf .Ctx (), "change frontier encountered error on checkpoint restore: %v" , err )
1414- }
1387+ log .Dev .Warningf (cf .Ctx (),
1388+ "moving to draining due to error restoring span-level checkpoint: %v" , err )
1389+ cf .MoveToDraining (err )
1390+ return
14151391 }
14161392
14171393 if cf .knobs .AfterCoordinatorFrontierRestore != nil {
@@ -1560,39 +1536,31 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
15601536 }
15611537 }
15621538
1563- if log .V (2 ) {
1564- log .Infof (cf .Ctx (),
1565- "change frontier moving to draining after reaching resolved span boundary (%s): %v" ,
1566- boundaryType , err )
1567- }
1539+ log .Dev .Warningf (cf .Ctx (),
1540+ "moving to draining after reaching resolved span boundary (%s): %v" ,
1541+ boundaryType , err )
15681542 cf .MoveToDraining (err )
15691543 break
15701544 }
15711545
15721546 row , meta := cf .input .Next ()
15731547 if meta != nil {
15741548 if meta .Err != nil {
1575- if log .V (2 ) {
1576- log .Infof (cf .Ctx (), "change frontier moving to draining after getting error from aggregator: %v" , meta .Err )
1577- }
1549+ log .Dev .Warningf (cf .Ctx (), "moving to draining after getting error from aggregator: %v" , meta .Err )
15781550 cf .MoveToDraining (nil /* err */ )
15791551 }
15801552 if meta .Changefeed != nil && meta .Changefeed .DrainInfo != nil {
15811553 // Seeing changefeed drain info metadata from the aggregator means
15821554 // that the aggregator exited due to node shutdown. Transition to
15831555 // draining so that the remaining aggregators will shut down and
15841556 // transmit their up-to-date frontier.
1585- if log .V (2 ) {
1586- log .Infof (cf .Ctx (), "change frontier moving to draining due to aggregator shutdown: %s" , meta .Changefeed )
1587- }
1557+ log .Dev .Warningf (cf .Ctx (), "moving to draining due to aggregator shutdown: %s" , meta .Changefeed )
15881558 cf .MoveToDraining (changefeedbase .ErrNodeDraining )
15891559 }
15901560 return nil , meta
15911561 }
15921562 if row == nil {
1593- if log .V (2 ) {
1594- log .Infof (cf .Ctx (), "change frontier moving to draining after getting nil row from aggregator" )
1595- }
1563+ log .Dev .Warningf (cf .Ctx (), "moving to draining after getting nil row from aggregator" )
15961564 cf .MoveToDraining (nil /* err */ )
15971565 break
15981566 }
@@ -1607,9 +1575,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16071575 }
16081576
16091577 if err := cf .noteAggregatorProgress (row [0 ]); err != nil {
1610- if log .V (2 ) {
1611- log .Infof (cf .Ctx (), "change frontier moving to draining after error while processing aggregator progress: %v" , err )
1612- }
1578+ log .Dev .Warningf (cf .Ctx (), "moving to draining after error while processing aggregator progress: %v" , err )
16131579 cf .MoveToDraining (err )
16141580 break
16151581 }
0 commit comments