Skip to content

Commit d1a1c70

Browse files
authored
[fix][backend]fix backfill count (#289)
* fix backfill count * add log * add debug log * no lock * fix backfill * add lock * delete debug log * go fmt * add comment
1 parent bd543a8 commit d1a1c70

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

backend/modules/observability/domain/task/service/taskexe/processor/auto_evaluate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (p *AutoEvaluteProcessor) OnCreateTaskChange(ctx context.Context, currentTa
191191
case task.TimeUnitWeek:
192192
runEndAt = runStartAt + (currentTask.Sampler.CycleInterval)*7*24*time.Hour.Milliseconds()
193193
default:
194-
runEndAt = runStartAt + (currentTask.Sampler.CycleInterval)*10*time.Minute.Milliseconds()
194+
runEndAt = runStartAt + (currentTask.Sampler.CycleInterval)*24*time.Hour.Milliseconds()
195195
}
196196
}
197197
err = p.OnCreateTaskRunChange(ctx, taskexe.OnCreateTaskRunChangeReq{

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
pageSize = 500
3030
backfillLockKeyTemplate = "observability:tracehub:backfill:%d"
3131
backfillLockMaxHold = 24 * time.Hour
32+
backfillLockTTL = 3 * time.Minute
3233
)
3334

3435
// 定时任务+锁
@@ -43,7 +44,7 @@ func (h *TraceHubServiceImpl) BackFill(ctx context.Context, event *entity.BackFi
4344
)
4445
if h.locker != nil && event != nil {
4546
lockKey = fmt.Sprintf(backfillLockKeyTemplate, event.TaskID)
46-
locked, lockCtx, cancel, lockErr := h.locker.LockWithRenew(ctx, lockKey, transformTaskStatusLockTTL, backfillLockMaxHold)
47+
locked, lockCtx, cancel, lockErr := h.locker.LockWithRenew(ctx, lockKey, backfillLockTTL, backfillLockMaxHold)
4748
if lockErr != nil {
4849
logs.CtxError(ctx, "backfill acquire lock failed", "task_id", event.TaskID, "err", lockErr)
4950
return lockErr
@@ -311,7 +312,7 @@ func (h *TraceHubServiceImpl) fetchAndSendSpans(ctx context.Context, listParam *
311312
logs.CtxInfo(ctx, "completed listing spans, total_count=%d, task_id=%d", totalCount, sub.t.GetID())
312313
break
313314
}
314-
315+
listParam.PageToken = result.PageToken
315316
pageToken = result.PageToken
316317
}
317318

@@ -417,7 +418,7 @@ func (h *TraceHubServiceImpl) applySampling(spans []*loop_span.Span, sub *spanSu
417418
// processSpansForBackfill handles spans for backfill
418419
func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans []*loop_span.Span, sub *spanSubscriber) error {
419420
// Batch processing spans for efficiency
420-
const batchSize = 100
421+
const batchSize = 50
421422

422423
for i := 0; i < len(spans); i += batchSize {
423424
end := i + batchSize
@@ -432,6 +433,8 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans
432433
// Continue with the next batch without stopping due to a single failure
433434
continue
434435
}
436+
// ml_flow rate-limited: 50/5s
437+
time.Sleep(5 * time.Second)
435438
}
436439

437440
return nil

0 commit comments

Comments
 (0)