From ca51060d45d7b9eaff6561d51884531e0f4d0870 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 5 May 2026 16:55:37 -0400 Subject: [PATCH 1/2] jobs: add targetFraction parameter to ChunkProgressLogger Add a `targetFraction` parameter to `NewChunkProgressLogger`, `NewChunkProgressLoggerForJob`, and `DeprecatedNewChunkProgressLoggerForJob`. Previously, progress was always interpolated from `startFraction` to 1.0. The new parameter allows callers to specify a different upper bound, which is needed for multi-phase jobs where each phase occupies a portion of the overall progress range. All existing callers pass 1.0 to preserve current behavior. We modify the deprecated API rather than migrating restore off it because the deprecated chunk progress logger is currently coupled to checkpoint frontier persistence via `updateJobCallback`. Migrating to the new API would also require migrating frontier persistence to the job frontier API -- work for another day. Epic: none Release note: None Co-Authored-By: roachdev-claude --- pkg/backup/backup_job.go | 2 +- pkg/backup/compaction_job.go | 2 +- pkg/backup/restore_job.go | 2 +- pkg/jobs/progress.go | 21 ++++++++++++++------- pkg/jobs/progress_test.go | 28 +++++++++++++++++++++++++--- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/pkg/backup/backup_job.go b/pkg/backup/backup_job.go index 77ef28dbd214..567ce271ead1 100644 --- a/pkg/backup/backup_job.go +++ b/pkg/backup/backup_job.go @@ -365,7 +365,7 @@ func backup( numTotalSpans += len(spec.IntroducedSpans) + len(spec.Spans) } - progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numTotalSpans, job.FractionCompleted(), jobs.DeprecatedProgressUpdateOnly) + progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numTotalSpans, job.FractionCompleted(), 1.0, jobs.DeprecatedProgressUpdateOnly) requestFinishedCh := make(chan struct{}, numTotalSpans) // enough buffer to never block var jobProgressLoop func(ctx context.Context) error diff --git a/pkg/backup/compaction_job.go b/pkg/backup/compaction_job.go index ca9c7fca8e64..29e856daae1c 100644 --- a/pkg/backup/compaction_job.go +++ b/pkg/backup/compaction_job.go @@ -948,7 +948,7 @@ func doCompaction( frac = 0 } logger := jobs.NewChunkProgressLoggerForJob( - jobID, execCtx.ExecCfg().InternalDB, totalEntriesLeft, frac, + jobID, execCtx.ExecCfg().InternalDB, totalEntriesLeft, frac, 1.0, ) progressLoop := func(ctx context.Context) error { return logger.Loop(ctx, chunkFinishedCh) diff --git a/pkg/backup/restore_job.go b/pkg/backup/restore_job.go index 688d39708b8c..ad99b307b728 100644 --- a/pkg/backup/restore_job.go +++ b/pkg/backup/restore_job.go @@ -568,7 +568,7 @@ func restore( // Only update the job progress on the main data bundle. This should account // for the bulk of the data to restore. Other data (e.g. zone configs in // cluster restores) may be restored first. - progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), progressTracker.updateJobCallback) + progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), 1.0, progressTracker.updateJobCallback) jobProgressLoop := func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-loop") diff --git a/pkg/jobs/progress.go b/pkg/jobs/progress.go index 0986769f3090..9032d548dcf5 100644 --- a/pkg/jobs/progress.go +++ b/pkg/jobs/progress.go @@ -56,9 +56,11 @@ type ChunkProgressLogger struct { // NewChunkProgressLoggerForJob returns a ChunkProgressLogger which updates a job's // fraction_completed via ProgressStorage. expectedChunks is the expected number of chunks left -// for this job, and fraction_completed is calculated based on this number and startFraction. +// for this job, and fraction_completed is calculated based on this number, startFraction, +// and targetFraction. Progress is linearly interpolated from startFraction to targetFraction +// as chunks complete. func NewChunkProgressLoggerForJob( - jobID jobspb.JobID, db isql.DB, expectedChunks int, startFraction float64, + jobID jobspb.JobID, db isql.DB, expectedChunks int, startFraction float64, targetFraction float64, ) *ChunkProgressLogger { return NewChunkProgressLogger( func(ctx context.Context, fraction float64) error { @@ -66,7 +68,7 @@ func NewChunkProgressLoggerForJob( return ProgressStorage(jobID).SetFraction(ctx, txn, fraction) }) }, - expectedChunks, startFraction, + expectedChunks, startFraction, targetFraction, ) } @@ -78,6 +80,7 @@ func DeprecatedNewChunkProgressLoggerForJob( j *Job, expectedChunks int, startFraction float32, + targetFraction float32, progressedFn func(context.Context, jobspb.ProgressDetails), ) *ChunkProgressLogger { return NewChunkProgressLogger( @@ -88,17 +91,21 @@ func DeprecatedNewChunkProgressLoggerForJob( } return float32(fraction) }) - }, expectedChunks, float64(startFraction)) + }, expectedChunks, float64(startFraction), float64(targetFraction)) } -// NewChunkProgressLogger returns a ChunkProgressLogger. +// NewChunkProgressLogger returns a ChunkProgressLogger. Progress is linearly +// interpolated from startFraction to targetFraction as chunks complete. func NewChunkProgressLogger( - report func(ctx context.Context, pct float64) error, expectedChunks int, startFraction float64, + report func(ctx context.Context, pct float64) error, + expectedChunks int, + startFraction float64, + targetFraction float64, ) *ChunkProgressLogger { return &ChunkProgressLogger{ expectedChunks: expectedChunks, batcher: ProgressUpdateBatcher{ - perChunkContribution: (1.0 - startFraction) * 1.0 / float64(expectedChunks), + perChunkContribution: (targetFraction - startFraction) / float64(expectedChunks), start: startFraction, reported: startFraction, Report: report, diff --git a/pkg/jobs/progress_test.go b/pkg/jobs/progress_test.go index aaf7da59a88f..28cdc2c0602d 100644 --- a/pkg/jobs/progress_test.go +++ b/pkg/jobs/progress_test.go @@ -48,7 +48,7 @@ func TestChunkProgressLogger(t *testing.T) { requestFinishedCh := make(chan struct{}, 100) loggerGroup.GoCtx(func(ctx context.Context) error { return NewChunkProgressLoggerForJob( - jobID, s.InternalDB().(isql.DB), 100 /* expectedChunks */, 0, /* startFraction */ + jobID, s.InternalDB().(isql.DB), 100 /* expectedChunks */, 0 /* startFraction */, 1.0, /* targetFraction */ ).Loop(ctx, requestFinishedCh) }) @@ -82,7 +82,7 @@ func TestChunkProgressLogger(t *testing.T) { requestFinishedCh2 := make(chan struct{}, 100) loggerGroup.GoCtx(func(ctx context.Context) error { return NewChunkProgressLoggerForJob( - jobID, s.InternalDB().(isql.DB), 40 /* expectedChunks */, 0.6, /* startFraction */ + jobID, s.InternalDB().(isql.DB), 40 /* expectedChunks */, 0.6 /* startFraction */, 1.0, /* targetFraction */ ).Loop(ctx, requestFinishedCh2) }) @@ -114,9 +114,31 @@ func TestChunkProgressLoggerLimitsFloatingPointError(t *testing.T) { require.Less(t, pct, float64(1.01)) lastReported = pct return nil - }, rangeCount, 0) + }, rangeCount, 0, 1.0) for i := 0; i < rangeCount; i++ { require.NoError(t, l.chunkFinished(ctx), "failed at update %d", i) } require.Greater(t, lastReported, float64(0.99)) } + +func TestChunkProgressLoggerTargetFraction(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + defer TestingSetProgressThresholds()() + + const targetFraction = 0.05 + chunkCount := 100 + + var lastReported float64 + l := NewChunkProgressLogger(func(_ context.Context, pct float64) error { + require.LessOrEqual(t, pct, targetFraction+0.001) + lastReported = pct + return nil + }, chunkCount, 0, targetFraction) + for i := 0; i < chunkCount; i++ { + require.NoError(t, l.chunkFinished(ctx), "failed at update %d", i) + } + require.InDelta(t, targetFraction, lastReported, 0.001) +} From 941b57fe436dc3b228013e8027ff0327f71e9066 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 5 May 2026 16:58:02 -0400 Subject: [PATCH 2/2] backup: normalize fast restore fractionCompleted across phases Previously, ExperimentalCopy (fast restore) reported fractionCompleted from 0 to 1.0 during the link phase, then reset to 0 and went back to 1.0 during the download phase, causing a visible regression in the UI. Normalize progress so the link phase occupies the first 5% (0.0-0.05) and the download phase occupies the remaining 95% (0.05-1.0), ensuring fractionCompleted increases monotonically across both phases. Epic: none Release note: None Co-Authored-By: roachdev-claude --- c-deps/geos | 2 +- pkg/backup/restore_job.go | 6 +++- pkg/backup/restore_online.go | 11 ++++++- pkg/backup/restore_online_test.go | 52 +++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/c-deps/geos b/c-deps/geos index c389f532d25f..431568d6e311 160000 --- a/c-deps/geos +++ b/c-deps/geos @@ -1 +1 @@ -Subproject commit c389f532d25fe6228861d9b19339f9cb57ca4bdb +Subproject commit 431568d6e311e0bbfb057b4ec3d44d0d3ba3335f diff --git a/pkg/backup/restore_job.go b/pkg/backup/restore_job.go index ad99b307b728..0629c8c7ea15 100644 --- a/pkg/backup/restore_job.go +++ b/pkg/backup/restore_job.go @@ -568,7 +568,11 @@ func restore( // Only update the job progress on the main data bundle. This should account // for the bulk of the data to restore. Other data (e.g. zone configs in // cluster restores) may be restored first. - progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), 1.0, progressTracker.updateJobCallback) + fractionTarget := float32(1.0) + if details.ExperimentalCopy { + fractionTarget = experimentalCopyLinkFraction + } + progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), fractionTarget, progressTracker.updateJobCallback) jobProgressLoop := func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-loop") diff --git a/pkg/backup/restore_online.go b/pkg/backup/restore_online.go index 1d3975cb7797..dc3b6a5ebe95 100644 --- a/pkg/backup/restore_online.go +++ b/pkg/backup/restore_online.go @@ -54,6 +54,10 @@ import ( const defaultLinkWorkersPerNode = 2 +// experimentalCopyLinkFraction is the fraction of overall job progress +// allocated to the link phase of an ExperimentalCopy restore. +const experimentalCopyLinkFraction = 0.05 + var onlineRestoreLinkWorkers = settings.RegisterIntSetting( settings.ApplicationLevel, "backup.restore.online_worker_count", @@ -786,7 +790,12 @@ func (r *restoreResumer) waitForDownloadToComplete( total = remaining } - fractionComplete := float32(total-remaining) / float32(total) + rawFraction := float32(total-remaining) / float32(total) + fractionComplete := rawFraction + if details.ExperimentalCopy { + fractionComplete = experimentalCopyLinkFraction + + rawFraction*(1.0-experimentalCopyLinkFraction) + } log.Dev.VInfof(ctx, 1, "restore download phase, %s downloaded, %s remaining of %s total (%.2f complete)", sz(total-remaining), sz(remaining), sz(total), fractionComplete, ) diff --git a/pkg/backup/restore_online_test.go b/pkg/backup/restore_online_test.go index 65a55128aabb..0564321678b3 100644 --- a/pkg/backup/restore_online_test.go +++ b/pkg/backup/restore_online_test.go @@ -207,6 +207,58 @@ func TestOnlineRestoreRecovery(t *testing.T) { }) } +func TestFastRestoreFractionCompleted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + backuptestutils.EnableFastRestoreForTest(t) + + const numAccounts = 100 + _, sqlDB, _, cleanupFn := backupRestoreTestSetup( + t, singleNode, numAccounts, InitManualReplication, + ) + defer cleanupFn() + + externalStorage := backuptestutils.GetExternalStorageURI( + t, "nodelocal://1/backup", "backup", sqlDB, + ) + sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO '%s'", externalStorage)) + + sqlDB.Exec( + t, + "SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_download'", + ) + var jobID int + sqlDB.QueryRow(t, fmt.Sprintf( + "RESTORE DATABASE data FROM LATEST IN '%s' WITH EXPERIMENTAL COPY, new_db_name=data2, detached", + externalStorage, + )).Scan(&jobID) + jobutils.WaitForJobToPause(t, sqlDB, jobspb.JobID(jobID)) + + var fractionCompleted float64 + sqlDB.QueryRow( + t, + fmt.Sprintf( + "SELECT fraction_completed FROM [SHOW JOBS] WHERE job_id = %d", + jobID, + ), + ).Scan(&fractionCompleted) + require.InDelta(t, experimentalCopyLinkFraction, fractionCompleted, 1e-6) + + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''") + sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", jobID)) + jobutils.WaitForJobToSucceed(t, sqlDB, jobspb.JobID(jobID)) + + sqlDB.QueryRow( + t, + fmt.Sprintf( + "SELECT fraction_completed FROM [SHOW JOBS] WHERE job_id = %d", + jobID, + ), + ).Scan(&fractionCompleted) + require.Equal(t, 1.0, fractionCompleted) +} + // We run full cluster online restore recovery in a separate environment since // it requires dropping all databases and will impact other tests. func TestFullClusterOnlineRestoreRecovery(t *testing.T) {