From a8bc7908729ac7ef891f7e50e25dadcf2ea427b3 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 Oct 2023 17:43:52 +0800 Subject: [PATCH] lightning: add IngestData.DecRef to make sure only call it after finish (#47288) close pingcap/tidb#47284 --- br/pkg/lightning/backend/external/engine.go | 11 ++++++-- br/pkg/lightning/backend/local/engine.go | 3 ++ br/pkg/lightning/backend/local/local.go | 16 +++++------ br/pkg/lightning/backend/local/local_test.go | 2 ++ br/pkg/lightning/backend/local/region_job.go | 29 ++++++++++++++++---- br/pkg/lightning/common/ingest_data.go | 8 ++++-- 6 files changed, 50 insertions(+), 19 deletions(-) diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index cafa6ee8ada8a..a9d9234614e86 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -436,11 +436,16 @@ func (m *MemoryIngestData) IncRef() { m.refCnt.Inc() } +// DecRef implements IngestData.DecRef. +func (m *MemoryIngestData) DecRef() { + if m.refCnt.Dec() == 0 { + m.memBuf.Destroy() + } +} + // Finish implements IngestData.Finish. func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) { m.importedKVSize.Add(totalBytes) m.importedKVCount.Add(totalCount) - if m.refCnt.Dec() == 0 { - m.memBuf.Destroy() - } + } diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 468416a39be1c..e9cc49ce18c7f 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1035,6 +1035,9 @@ func (e *Engine) GetTS() uint64 { // IncRef implements IngestData interface. func (*Engine) IncRef() {} +// DecRef implements IngestData interface. +func (*Engine) DecRef() {} + // Finish implements IngestData interface. func (e *Engine) Finish(totalBytes, totalCount int64) { e.importedKVSize.Add(totalBytes) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 98abc7ecf02b4..bf2e08e54d85a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1215,12 +1215,11 @@ func (local *Backend) generateAndSendJob( return err } for _, job := range jobs { - data.IncRef() - jobWg.Add(1) + job.ref(jobWg) select { case <-egCtx.Done(): // this job is not put into jobToWorkerCh - jobWg.Done() + job.done(jobWg) // if the context is canceled, it means worker has error, the first error can be // found by worker's error group LATER. if this function returns an error it will // seize the "first error". @@ -1345,14 +1344,13 @@ func (local *Backend) startWorker( // Don't need to put the job back to retry, because generateJobForRange // has done the retry internally. Here just done for the "needRescan" // job and exit directly. - jobWg.Done() + job.done(jobWg) return err2 } // 1 "needRescan" job becomes len(jobs) "regionScanned" jobs. newJobCnt := len(jobs) - 1 - jobWg.Add(newJobCnt) for newJobCnt > 0 { - job.ingestData.IncRef() + job.ref(jobWg) newJobCnt-- } for _, j := range jobs { @@ -1635,7 +1633,7 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region if job.retryCount > maxWriteAndIngestRetryTimes { firstErr.Set(job.lastRetryableErr) workerCancel() - jobWg.Done() + job.done(&jobWg) continue } // max retry backoff time: 2+4+8+16+30*26=810s @@ -1652,10 +1650,10 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region zap.Time("waitUntil", job.waitUntil)) if !retryer.push(job) { // retryer is closed by worker error - jobWg.Done() + job.done(&jobWg) } case ingested: - jobWg.Done() + job.done(&jobWg) case needRescan: panic("should not reach here") } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index c097c5e519b06..3f4047909f963 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1209,6 +1209,8 @@ func (m mockIngestData) GetTS() uint64 { return 0 } func (m mockIngestData) IncRef() {} +func (m mockIngestData) DecRef() {} + func (m mockIngestData) Finish(_, _ int64) {} func TestCheckPeersBusy(t *testing.T) { diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 7d26c0065840f..83ffa5b446e48 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -158,6 +158,27 @@ func (j *regionJob) convertStageTo(stage jobStageTp) { } } +// ref means that the ingestData of job will be accessed soon. +func (j *regionJob) ref(wg *sync.WaitGroup) { + if wg != nil { + wg.Add(1) + } + if j.ingestData != nil { + j.ingestData.IncRef() + } +} + +// done promises that the ingestData of job will not be accessed. Same amount of +// done should be called to release the ingestData. +func (j *regionJob) done(wg *sync.WaitGroup) { + if j.ingestData != nil { + j.ingestData.DecRef() + } + if wg != nil { + wg.Done() + } +} + // writeToTiKV writes the data to TiKV and mark this job as wrote stage. // if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this // if any underlying logic has error, writeToTiKV will return an error. @@ -809,13 +830,11 @@ func (q *regionJobRetryer) close() { defer q.protectedClosed.mu.Unlock() q.protectedClosed.closed = true - count := len(q.protectedQueue.q) if q.protectedToPutBack.toPutBack != nil { - count++ + q.protectedToPutBack.toPutBack.done(q.jobWg) } - for count > 0 { - q.jobWg.Done() - count-- + for _, job := range q.protectedQueue.q { + job.done(q.jobWg) } } diff --git a/br/pkg/lightning/common/ingest_data.go b/br/pkg/lightning/common/ingest_data.go index 13c22d8abeb48..a1567cb20857c 100644 --- a/br/pkg/lightning/common/ingest_data.go +++ b/br/pkg/lightning/common/ingest_data.go @@ -28,10 +28,14 @@ type IngestData interface { // GetTS will be used as the start/commit TS of the data. GetTS() uint64 // IncRef should be called every time when IngestData is referred by regionJob. - // Multiple regionJob can share one IngestData. Same amount of Finish should be + // Multiple regionJob can share one IngestData. Same amount of DecRef should be // called to release the IngestData. IncRef() - // Finish will be called when the data is ingested successfully. + // DecRef is used to cooperate with IncRef to release IngestData. + DecRef() + // Finish will be called when the data is ingested successfully. Note that + // one IngestData maybe partially ingested, so this function may be called + // multiple times. Finish(totalBytes, totalCount int64) }