Skip to content

Commit

Permalink
lightning: add IngestData.DecRef to make sure only call it after fini…
Browse files Browse the repository at this point in the history
…sh (#47288)

close #47284
  • Loading branch information
lance6716 authored Oct 7, 2023
1 parent 0b0032b commit a8bc790
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 19 deletions.
11 changes: 8 additions & 3 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,16 @@ func (m *MemoryIngestData) IncRef() {
m.refCnt.Inc()
}

// DecRef implements IngestData.DecRef.
func (m *MemoryIngestData) DecRef() {
if m.refCnt.Dec() == 0 {
m.memBuf.Destroy()
}
}

// Finish implements IngestData.Finish.
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
m.importedKVSize.Add(totalBytes)
m.importedKVCount.Add(totalCount)
if m.refCnt.Dec() == 0 {
m.memBuf.Destroy()
}

}
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,9 @@ func (e *Engine) GetTS() uint64 {
// IncRef implements IngestData interface.
func (*Engine) IncRef() {}

// DecRef implements IngestData interface.
func (*Engine) DecRef() {}

// Finish implements IngestData interface.
func (e *Engine) Finish(totalBytes, totalCount int64) {
e.importedKVSize.Add(totalBytes)
Expand Down
16 changes: 7 additions & 9 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,12 +1215,11 @@ func (local *Backend) generateAndSendJob(
return err
}
for _, job := range jobs {
data.IncRef()
jobWg.Add(1)
job.ref(jobWg)
select {
case <-egCtx.Done():
// this job is not put into jobToWorkerCh
jobWg.Done()
job.done(jobWg)
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
Expand Down Expand Up @@ -1345,14 +1344,13 @@ func (local *Backend) startWorker(
// Don't need to put the job back to retry, because generateJobForRange
// has done the retry internally. Here just done for the "needRescan"
// job and exit directly.
jobWg.Done()
job.done(jobWg)
return err2
}
// 1 "needRescan" job becomes len(jobs) "regionScanned" jobs.
newJobCnt := len(jobs) - 1
jobWg.Add(newJobCnt)
for newJobCnt > 0 {
job.ingestData.IncRef()
job.ref(jobWg)
newJobCnt--
}
for _, j := range jobs {
Expand Down Expand Up @@ -1635,7 +1633,7 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region
if job.retryCount > maxWriteAndIngestRetryTimes {
firstErr.Set(job.lastRetryableErr)
workerCancel()
jobWg.Done()
job.done(&jobWg)
continue
}
// max retry backoff time: 2+4+8+16+30*26=810s
Expand All @@ -1652,10 +1650,10 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region
zap.Time("waitUntil", job.waitUntil))
if !retryer.push(job) {
// retryer is closed by worker error
jobWg.Done()
job.done(&jobWg)
}
case ingested:
jobWg.Done()
job.done(&jobWg)
case needRescan:
panic("should not reach here")
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,8 @@ func (m mockIngestData) GetTS() uint64 { return 0 }

func (m mockIngestData) IncRef() {}

func (m mockIngestData) DecRef() {}

func (m mockIngestData) Finish(_, _ int64) {}

func TestCheckPeersBusy(t *testing.T) {
Expand Down
29 changes: 24 additions & 5 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,27 @@ func (j *regionJob) convertStageTo(stage jobStageTp) {
}
}

// ref means that the ingestData of job will be accessed soon.
func (j *regionJob) ref(wg *sync.WaitGroup) {
if wg != nil {
wg.Add(1)
}
if j.ingestData != nil {
j.ingestData.IncRef()
}
}

// done promises that the ingestData of job will not be accessed. Same amount of
// done should be called to release the ingestData.
func (j *regionJob) done(wg *sync.WaitGroup) {
if j.ingestData != nil {
j.ingestData.DecRef()
}
if wg != nil {
wg.Done()
}
}

// writeToTiKV writes the data to TiKV and mark this job as wrote stage.
// if any write logic has error, writeToTiKV will set job to a proper stage and return nil. TODO: <-check this
// if any underlying logic has error, writeToTiKV will return an error.
Expand Down Expand Up @@ -809,13 +830,11 @@ func (q *regionJobRetryer) close() {
defer q.protectedClosed.mu.Unlock()
q.protectedClosed.closed = true

count := len(q.protectedQueue.q)
if q.protectedToPutBack.toPutBack != nil {
count++
q.protectedToPutBack.toPutBack.done(q.jobWg)
}
for count > 0 {
q.jobWg.Done()
count--
for _, job := range q.protectedQueue.q {
job.done(q.jobWg)
}
}

Expand Down
8 changes: 6 additions & 2 deletions br/pkg/lightning/common/ingest_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ type IngestData interface {
// GetTS will be used as the start/commit TS of the data.
GetTS() uint64
// IncRef should be called every time when IngestData is referred by regionJob.
// Multiple regionJob can share one IngestData. Same amount of Finish should be
// Multiple regionJob can share one IngestData. Same amount of DecRef should be
// called to release the IngestData.
IncRef()
// Finish will be called when the data is ingested successfully.
// DecRef is used to cooperate with IncRef to release IngestData.
DecRef()
// Finish will be called when the data is ingested successfully. Note that
// one IngestData maybe partially ingested, so this function may be called
// multiple times.
Finish(totalBytes, totalCount int64)
}

Expand Down

0 comments on commit a8bc790

Please sign in to comment.