Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c-deps/geos
Submodule geos updated 473 files
2 changes: 1 addition & 1 deletion pkg/backup/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func backup(
numTotalSpans += len(spec.IntroducedSpans) + len(spec.Spans)
}

progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numTotalSpans, job.FractionCompleted(), jobs.DeprecatedProgressUpdateOnly)
progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numTotalSpans, job.FractionCompleted(), 1.0, jobs.DeprecatedProgressUpdateOnly)

requestFinishedCh := make(chan struct{}, numTotalSpans) // enough buffer to never block
var jobProgressLoop func(ctx context.Context) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/compaction_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func doCompaction(
frac = 0
}
logger := jobs.NewChunkProgressLoggerForJob(
jobID, execCtx.ExecCfg().InternalDB, totalEntriesLeft, frac,
jobID, execCtx.ExecCfg().InternalDB, totalEntriesLeft, frac, 1.0,
)
progressLoop := func(ctx context.Context) error {
return logger.Loop(ctx, chunkFinishedCh)
Expand Down
6 changes: 5 additions & 1 deletion pkg/backup/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,11 @@ func restore(
// Only update the job progress on the main data bundle. This should account
// for the bulk of the data to restore. Other data (e.g. zone configs in
// cluster restores) may be restored first.
progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), progressTracker.updateJobCallback)
fractionTarget := float32(1.0)
if details.ExperimentalCopy {
fractionTarget = experimentalCopyLinkFraction
}
progressLogger := jobs.DeprecatedNewChunkProgressLoggerForJob(job, numImportSpans, job.FractionCompleted(), fractionTarget, progressTracker.updateJobCallback)

jobProgressLoop := func(ctx context.Context) error {
ctx, progressSpan := tracing.ChildSpan(ctx, "progress-loop")
Expand Down
11 changes: 10 additions & 1 deletion pkg/backup/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ import (

const defaultLinkWorkersPerNode = 2

// experimentalCopyLinkFraction is the fraction of overall job progress
// allocated to the link phase of an ExperimentalCopy restore.
const experimentalCopyLinkFraction = 0.05

var onlineRestoreLinkWorkers = settings.RegisterIntSetting(
settings.ApplicationLevel,
"backup.restore.online_worker_count",
Expand Down Expand Up @@ -786,7 +790,12 @@ func (r *restoreResumer) waitForDownloadToComplete(
total = remaining
}

fractionComplete := float32(total-remaining) / float32(total)
rawFraction := float32(total-remaining) / float32(total)
fractionComplete := rawFraction
if details.ExperimentalCopy {
fractionComplete = experimentalCopyLinkFraction +
rawFraction*(1.0-experimentalCopyLinkFraction)
}
log.Dev.VInfof(ctx, 1, "restore download phase, %s downloaded, %s remaining of %s total (%.2f complete)",
sz(total-remaining), sz(remaining), sz(total), fractionComplete,
)
Expand Down
52 changes: 52 additions & 0 deletions pkg/backup/restore_online_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,58 @@ func TestOnlineRestoreRecovery(t *testing.T) {
})
}

func TestFastRestoreFractionCompleted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

backuptestutils.EnableFastRestoreForTest(t)

const numAccounts = 100
_, sqlDB, _, cleanupFn := backupRestoreTestSetup(
t, singleNode, numAccounts, InitManualReplication,
)
defer cleanupFn()

externalStorage := backuptestutils.GetExternalStorageURI(
t, "nodelocal://1/backup", "backup", sqlDB,
)
sqlDB.Exec(t, fmt.Sprintf("BACKUP DATABASE data INTO '%s'", externalStorage))

sqlDB.Exec(
t,
"SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_download'",
)
var jobID int
sqlDB.QueryRow(t, fmt.Sprintf(
"RESTORE DATABASE data FROM LATEST IN '%s' WITH EXPERIMENTAL COPY, new_db_name=data2, detached",
externalStorage,
)).Scan(&jobID)
jobutils.WaitForJobToPause(t, sqlDB, jobspb.JobID(jobID))

var fractionCompleted float64
sqlDB.QueryRow(
t,
fmt.Sprintf(
"SELECT fraction_completed FROM [SHOW JOBS] WHERE job_id = %d",
jobID,
),
).Scan(&fractionCompleted)
require.InDelta(t, experimentalCopyLinkFraction, fractionCompleted, 1e-6)

sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", jobID))
jobutils.WaitForJobToSucceed(t, sqlDB, jobspb.JobID(jobID))

sqlDB.QueryRow(
t,
fmt.Sprintf(
"SELECT fraction_completed FROM [SHOW JOBS] WHERE job_id = %d",
jobID,
),
).Scan(&fractionCompleted)
require.Equal(t, 1.0, fractionCompleted)
}

// We run full cluster online restore recovery in a separate environment since
// it requires dropping all databases and will impact other tests.
func TestFullClusterOnlineRestoreRecovery(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/jobs/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ type ChunkProgressLogger struct {

// NewChunkProgressLoggerForJob returns a ChunkProgressLogger which updates a job's
// fraction_completed via ProgressStorage. expectedChunks is the expected number of chunks left
// for this job, and fraction_completed is calculated based on this number and startFraction.
// for this job, and fraction_completed is calculated based on this number, startFraction,
// and targetFraction. Progress is linearly interpolated from startFraction to targetFraction
// as chunks complete.
func NewChunkProgressLoggerForJob(
jobID jobspb.JobID, db isql.DB, expectedChunks int, startFraction float64,
jobID jobspb.JobID, db isql.DB, expectedChunks int, startFraction float64, targetFraction float64,
) *ChunkProgressLogger {
return NewChunkProgressLogger(
func(ctx context.Context, fraction float64) error {
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return ProgressStorage(jobID).SetFraction(ctx, txn, fraction)
})
},
expectedChunks, startFraction,
expectedChunks, startFraction, targetFraction,
)
}

Expand All @@ -78,6 +80,7 @@ func DeprecatedNewChunkProgressLoggerForJob(
j *Job,
expectedChunks int,
startFraction float32,
targetFraction float32,
progressedFn func(context.Context, jobspb.ProgressDetails),
) *ChunkProgressLogger {
return NewChunkProgressLogger(
Expand All @@ -88,17 +91,21 @@ func DeprecatedNewChunkProgressLoggerForJob(
}
return float32(fraction)
})
}, expectedChunks, float64(startFraction))
}, expectedChunks, float64(startFraction), float64(targetFraction))
}

// NewChunkProgressLogger returns a ChunkProgressLogger.
// NewChunkProgressLogger returns a ChunkProgressLogger. Progress is linearly
// interpolated from startFraction to targetFraction as chunks complete.
func NewChunkProgressLogger(
report func(ctx context.Context, pct float64) error, expectedChunks int, startFraction float64,
report func(ctx context.Context, pct float64) error,
expectedChunks int,
startFraction float64,
targetFraction float64,
) *ChunkProgressLogger {
return &ChunkProgressLogger{
expectedChunks: expectedChunks,
batcher: ProgressUpdateBatcher{
perChunkContribution: (1.0 - startFraction) * 1.0 / float64(expectedChunks),
perChunkContribution: (targetFraction - startFraction) / float64(expectedChunks),
start: startFraction,
reported: startFraction,
Report: report,
Expand Down
28 changes: 25 additions & 3 deletions pkg/jobs/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestChunkProgressLogger(t *testing.T) {
requestFinishedCh := make(chan struct{}, 100)
loggerGroup.GoCtx(func(ctx context.Context) error {
return NewChunkProgressLoggerForJob(
jobID, s.InternalDB().(isql.DB), 100 /* expectedChunks */, 0, /* startFraction */
jobID, s.InternalDB().(isql.DB), 100 /* expectedChunks */, 0 /* startFraction */, 1.0, /* targetFraction */
).Loop(ctx, requestFinishedCh)
})

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestChunkProgressLogger(t *testing.T) {
requestFinishedCh2 := make(chan struct{}, 100)
loggerGroup.GoCtx(func(ctx context.Context) error {
return NewChunkProgressLoggerForJob(
jobID, s.InternalDB().(isql.DB), 40 /* expectedChunks */, 0.6, /* startFraction */
jobID, s.InternalDB().(isql.DB), 40 /* expectedChunks */, 0.6 /* startFraction */, 1.0, /* targetFraction */
).Loop(ctx, requestFinishedCh2)
})

Expand Down Expand Up @@ -114,9 +114,31 @@ func TestChunkProgressLoggerLimitsFloatingPointError(t *testing.T) {
require.Less(t, pct, float64(1.01))
lastReported = pct
return nil
}, rangeCount, 0)
}, rangeCount, 0, 1.0)
for i := 0; i < rangeCount; i++ {
require.NoError(t, l.chunkFinished(ctx), "failed at update %d", i)
}
require.Greater(t, lastReported, float64(0.99))
}

func TestChunkProgressLoggerTargetFraction(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

defer TestingSetProgressThresholds()()

const targetFraction = 0.05
chunkCount := 100

var lastReported float64
l := NewChunkProgressLogger(func(_ context.Context, pct float64) error {
require.LessOrEqual(t, pct, targetFraction+0.001)
lastReported = pct
return nil
}, chunkCount, 0, targetFraction)
for i := 0; i < chunkCount; i++ {
require.NoError(t, l.chunkFinished(ctx), "failed at update %d", i)
}
require.InDelta(t, targetFraction, lastReported, 0.001)
}
Loading