Skip to content

Commit

Permalink
use last second copy rate to project ETA (#1231)
Browse files Browse the repository at this point in the history
* use current copy rate to project ETA

* make linter happy

* Replace `tests.ExpectEquals` with `require.Equal` in tests

---------

Co-authored-by: meiji163 <[email protected]>
  • Loading branch information
morgo and meiji163 authored Dec 18, 2024
1 parent 690b1e1 commit 09052e6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ type MigrationContext struct {
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
EtaRowsPerSecond int64
ThrottleHTTPIntervalMillis int64
ThrottleHTTPStatusCode int64
ThrottleHTTPTimeoutMillis int64
Expand Down
24 changes: 21 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,11 +819,18 @@ func (this *Migrator) initiateStatus() {
this.printStatus(ForcePrintStatusAndHintRule)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var previousCount int64
for range ticker.C {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
}
go this.printStatus(HeuristicPrintStatusRule)
totalCopied := atomic.LoadInt64(&this.migrationContext.TotalRowsCopied)
if previousCount > 0 {
copiedThisLoop := totalCopied - previousCount
atomic.StoreInt64(&this.migrationContext.EtaRowsPerSecond, copiedThisLoop)
}
previousCount = totalCopied
}
}

Expand Down Expand Up @@ -925,9 +932,20 @@ func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration
duration = 0
} else if progressPct >= 0.1 {
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
etaRowsPerSecond := atomic.LoadInt64(&this.migrationContext.EtaRowsPerSecond)
var etaSeconds float64
// If there is data available on our current row-copies-per-second rate, use it.
// Otherwise we can fallback to the total elapsed time and extrapolate.
// This is going to be less accurate on a longer copy as the insert rate
// will tend to slow down.
if etaRowsPerSecond > 0 {
remainingRows := float64(rowsEstimate) - float64(totalRowsCopied)
etaSeconds = remainingRows / float64(etaRowsPerSecond)
} else {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
}
if etaSeconds >= 0 {
duration = time.Duration(etaSeconds) * time.Second
} else {
Expand Down
9 changes: 9 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
require.Equal(t, "4h29m44s", eta)
require.Equal(t, "4h29m44s", etaDuration.String())
}
{
// Test using rows-per-second added data.
migrationContext.TotalRowsCopied = 456
migrationContext.EtaRowsPerSecond = 100
state, eta, etaDuration := migrator.getMigrationStateAndETA(123456)
require.Equal(t, "migrating", state)
require.Equal(t, "20m30s", eta)
require.Equal(t, "20m30s", etaDuration.String())
}
{
migrationContext.TotalRowsCopied = 456
state, eta, etaDuration := migrator.getMigrationStateAndETA(456)
Expand Down

0 comments on commit 09052e6

Please sign in to comment.