Skip to content

Commit 1aa4a47

Browse files
authored
Import-data: Reuse worker pool across tasks, implement sequential task picker (#2257)
- Re-use worker pool for all tasks pertaining to all tables. - Implement Sequential Task Picker, that picks one task at a time.
1 parent f66a641 commit 1aa4a47

9 files changed

+437
-40
lines changed

migtests/tests/import-file/validate

+3-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ TABLE_NAME_DATA_MAP = {
132132
def file_import_done_checks(tgt):
133133
fetched_row_cnts = tgt.row_count_of_all_tables()
134134
print(f"Row counts after import data file: {fetched_row_cnts}")
135-
assert fetched_row_cnts == EXPECTED
135+
for table_name, rc in EXPECTED.items():
136+
assert fetched_row_cnts[table_name] == rc, f"Row count mismatch for table {table_name}, expected: {rc}, got: {fetched_row_cnts[table_name]}"
137+
136138

137139
fetched_row_cnts_non_public = tgt.row_count_of_all_tables("non_public")
138140
print(f"Row counts after import data file in non_public schema: {fetched_row_cnts_non_public}")

yb-voyager/cmd/importData.go

+99-15
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func importDataCommandFn(cmd *cobra.Command, args []string) {
155155
// TODO: handle case-sensitive in table names with oracle ff-db
156156
// quoteTableNameIfRequired()
157157
importFileTasks := discoverFilesToImport()
158+
log.Debugf("Discovered import file tasks: %v", importFileTasks)
158159
if importerRole == TARGET_DB_IMPORTER_ROLE {
159160

160161
importType = record.ExportType
@@ -316,6 +317,10 @@ type ImportFileTask struct {
316317
FileSize int64
317318
}
318319

320+
func (task *ImportFileTask) String() string {
321+
return fmt.Sprintf("{ID: %d, FilePath: %s, TableName: %s, RowCount: %d, FileSize: %d}", task.ID, task.FilePath, task.TableNameTup.ForOutput(), task.RowCount, task.FileSize)
322+
}
323+
319324
// func quoteTableNameIfRequired() {
320325
// if tconf.TargetDBType != ORACLE {
321326
// return
@@ -543,6 +548,8 @@ func importData(importFileTasks []*ImportFileTask) {
543548
utils.ErrExit("Failed to classify tasks: %s", err)
544549
}
545550
}
551+
log.Infof("pending tasks: %v", pendingTasks)
552+
log.Infof("completed tasks: %v", completedTasks)
546553

547554
//TODO: BUG: we are applying table-list filter on importFileTasks, but here we are considering all tables as per
548555
// export-data table-list. Should be fine because we are only disabling and re-enabling, but this is still not ideal.
@@ -584,28 +591,36 @@ func importData(importFileTasks []*ImportFileTask) {
584591
controlPlane.UpdateImportedRowCount(importDataAllTableMetrics)
585592
}
586593

587-
for _, task := range pendingTasks {
588-
// The code can produce `poolSize` number of batches at a time. But, it can consume only
589-
// `parallelism` number of batches at a time.
590-
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
591-
log.Infof("created batch import pool of size: %d", poolSize)
592-
593-
taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter)
594+
useTaskPicker := utils.GetEnvAsBool("USE_TASK_PICKER_FOR_IMPORT", true)
595+
if useTaskPicker {
596+
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize)
594597
if err != nil {
595-
utils.ErrExit("Failed to create file task importer: %s", err)
598+
utils.ErrExit("Failed to import tasks via task picker: %s", err)
596599
}
600+
} else {
601+
for _, task := range pendingTasks {
602+
// The code can produce `poolSize` number of batches at a time. But, it can consume only
603+
// `parallelism` number of batches at a time.
604+
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
605+
log.Infof("created batch import pool of size: %d", poolSize)
597606

598-
for !taskImporter.AllBatchesSubmitted() {
599-
err := taskImporter.SubmitNextBatch()
607+
taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter)
600608
if err != nil {
601-
utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err)
609+
utils.ErrExit("Failed to create file task importer: %s", err)
602610
}
603-
}
604611

605-
batchImportPool.Wait() // Wait for the file import to finish.
606-
taskImporter.PostProcess()
612+
for !taskImporter.AllBatchesSubmitted() {
613+
err := taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
614+
if err != nil {
615+
utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err)
616+
}
617+
}
618+
619+
batchImportPool.Wait() // wait for file import to finish
620+
taskImporter.PostProcess()
621+
}
622+
time.Sleep(time.Second * 2)
607623
}
608-
time.Sleep(time.Second * 2)
609624
}
610625
utils.PrintAndLog("snapshot data import complete\n\n")
611626
}
@@ -684,6 +699,75 @@ func importData(importFileTasks []*ImportFileTask) {
684699

685700
}
686701

702+
/*
703+
1. Initialize a worker pool
704+
2. Create a task picker which helps the importer choose which task to process in each iteration.
705+
3. Loop until all tasks are done:
706+
- Pick a task from the task picker.
707+
- If the task is not already being processed, create a new FileTaskImporter for the task.
708+
- For the task that is picked, produce the next batch and submit it to the worker pool. Worker will asynchronously import the batch.
709+
- If task is done, mark it as done in the task picker.
710+
*/
711+
func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int) error {
712+
// The code can produce `poolSize` number of batches at a time. But, it can consume only
713+
// `parallelism` number of batches at a time.
714+
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
715+
log.Infof("created batch import pool of size: %d", poolSize)
716+
717+
taskPicker, err := NewSequentialTaskPicker(pendingTasks, state)
718+
if err != nil {
719+
return fmt.Errorf("create task picker: %w", err)
720+
}
721+
taskImporters := map[int]*FileTaskImporter{}
722+
723+
for taskPicker.HasMoreTasks() {
724+
task, err := taskPicker.Pick()
725+
if err != nil {
726+
return fmt.Errorf("get next task: %w", err)
727+
}
728+
var taskImporter *FileTaskImporter
729+
var ok bool
730+
taskImporter, ok = taskImporters[task.ID]
731+
if !ok {
732+
taskImporter, err = NewFileTaskImporter(task, state, batchImportPool, progressReporter)
733+
if err != nil {
734+
return fmt.Errorf("create file task importer: %s", err)
735+
}
736+
log.Infof("created file task importer for table: %s, task: %v", task.TableNameTup.ForOutput(), task)
737+
taskImporters[task.ID] = taskImporter
738+
}
739+
740+
if taskImporter.AllBatchesSubmitted() {
741+
// All batches for this task have been submitted.
742+
// task could have been completed (all batches imported) OR still in progress
743+
// in case task is done, we should inform task picker so that we stop picking that task.
744+
taskDone, err := taskImporter.AllBatchesImported()
745+
if err != nil {
746+
return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err)
747+
}
748+
if taskDone {
749+
taskImporter.PostProcess()
750+
err = taskPicker.MarkTaskAsDone(task)
751+
if err != nil {
752+
return fmt.Errorf("mark task as done: task: %v, err: %w", task, err)
753+
}
754+
continue
755+
} else {
756+
// some batches are still in progress, wait for them to complete as decided by the picker.
757+
// don't want to busy-wait, so in case of sequentialTaskPicker, we sleep.
758+
taskPicker.WaitForTasksBatchesTobeImported()
759+
continue
760+
}
761+
762+
}
763+
err = taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
764+
if err != nil {
765+
return fmt.Errorf("submit next batch: task:%v err: %s", task, err)
766+
}
767+
}
768+
return nil
769+
}
770+
687771
func startAdaptiveParallelism() (bool, error) {
688772
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
689773
if !ok {

yb-voyager/cmd/importDataFileBatchProducer_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestBasicFileBatchProducer(t *testing.T) {
3939

4040
fileContents := `id,val
4141
1, "hello"`
42-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
42+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
4343
assert.NoError(t, err)
4444

4545
batchproducer, err := NewFileBatchProducer(task, state)
@@ -71,7 +71,7 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) {
7171
2, "world"
7272
3, "foo"
7373
4, "bar"`
74-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
74+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
7575
assert.NoError(t, err)
7676

7777
batchproducer, err := NewFileBatchProducer(task, state)
@@ -122,7 +122,7 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) {
122122
2, "ghijk"
123123
3, "mnopq"
124124
4, "stuvw"`
125-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
125+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
126126
assert.NoError(t, err)
127127

128128
batchproducer, err := NewFileBatchProducer(task, state)
@@ -190,7 +190,7 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes
190190
2, "ghijk"
191191
3, "mnopq1234567899876543"
192192
4, "stuvw"`
193-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
193+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
194194
assert.NoError(t, err)
195195

196196
batchproducer, err := NewFileBatchProducer(task, state)
@@ -226,7 +226,7 @@ func TestFileBatchProducerResumable(t *testing.T) {
226226
2, "world"
227227
3, "foo"
228228
4, "bar"`
229-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
229+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
230230
assert.NoError(t, err)
231231

232232
batchproducer, err := NewFileBatchProducer(task, state)
@@ -282,7 +282,7 @@ func TestFileBatchProducerResumeAfterAllBatchesProduced(t *testing.T) {
282282
2, "world"
283283
3, "foo"
284284
4, "bar"`
285-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table")
285+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table", 1)
286286
assert.NoError(t, err)
287287

288288
batchproducer, err := NewFileBatchProducer(task, state)

yb-voyager/cmd/importDataFileCommand.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ func getImportFileTasks(currFileTableMapping string) []*ImportFileTask {
179179
return result
180180
}
181181
kvs := strings.Split(currFileTableMapping, ",")
182-
for i, kv := range kvs {
182+
idCounter := 0
183+
for _, kv := range kvs {
183184
globPattern, table := strings.Split(kv, ":")[0], strings.Split(kv, ":")[1]
184185
filePaths, err := dataStore.Glob(globPattern)
185186
if err != nil {
@@ -198,12 +199,13 @@ func getImportFileTasks(currFileTableMapping string) []*ImportFileTask {
198199
utils.ErrExit("calculating file size in bytes: %q: %v", filePath, err)
199200
}
200201
task := &ImportFileTask{
201-
ID: i,
202+
ID: idCounter,
202203
FilePath: filePath,
203204
TableNameTup: tableNameTuple,
204205
FileSize: fileSize,
205206
}
206207
result = append(result, task)
208+
idCounter++
207209
}
208210
}
209211
return result

yb-voyager/cmd/importDataFileTaskImporter.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname"
3131
)
3232

33+
/*
34+
FileTaskImporter is responsible for importing an ImportFileTask.
35+
It uses a FileBatchProducer to produce batches. It submits each batch to a provided
36+
worker pool for processing. It also maintains and updates the progress of the task.
37+
*/
3338
type FileTaskImporter struct {
3439
task *ImportFileTask
3540
state *ImportDataState
@@ -75,12 +80,15 @@ func (fti *FileTaskImporter) AllBatchesSubmitted() bool {
7580
return fti.batchProducer.Done()
7681
}
7782

78-
func (fti *FileTaskImporter) AllBatchesImported() error {
79-
// TODO: check importDataState for status.
80-
panic("not implemented")
83+
func (fti *FileTaskImporter) AllBatchesImported() (bool, error) {
84+
taskStatus, err := fti.state.GetFileImportState(fti.task.FilePath, fti.task.TableNameTup)
85+
if err != nil {
86+
return false, fmt.Errorf("getting file import state: %s", err)
87+
}
88+
return taskStatus == FILE_IMPORT_COMPLETED, nil
8189
}
8290

83-
func (fti *FileTaskImporter) SubmitNextBatch() error {
91+
func (fti *FileTaskImporter) ProduceAndSubmitNextBatchToWorkerPool() error {
8492
if fti.AllBatchesSubmitted() {
8593
return fmt.Errorf("no more batches to submit")
8694
}
@@ -234,6 +242,7 @@ func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *t
234242
}
235243
// If `columns` is unset at this point, no attribute list is passed in the COPY command.
236244
fileFormat := dataFileDescriptor.FileFormat
245+
237246
// from export data with ora2pg, it comes as an SQL file, with COPY command having data.
238247
// Import-data also reads it appropriately with the help of sqlDataFile.
239248
// But while running COPY for a batch, we need to set the format as TEXT (SQL does not make sense)

yb-voyager/cmd/importDataFileTaskImporter_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestBasicTaskImport(t *testing.T) {
4747
fileContents := `id,val
4848
1, "hello"
4949
2, "world"`
50-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_basic")
50+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_basic", 1)
5151
testutils.FatalIfError(t, err)
5252

5353
progressReporter := NewImportDataProgressReporter(true)
@@ -56,7 +56,7 @@ func TestBasicTaskImport(t *testing.T) {
5656
testutils.FatalIfError(t, err)
5757

5858
for !taskImporter.AllBatchesSubmitted() {
59-
err := taskImporter.SubmitNextBatch()
59+
err := taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
6060
assert.NoError(t, err)
6161
}
6262

@@ -88,15 +88,15 @@ func TestImportAllBatchesAndResume(t *testing.T) {
8888
fileContents := `id,val
8989
1, "hello"
9090
2, "world"`
91-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_all")
91+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_all", 1)
9292
testutils.FatalIfError(t, err)
9393

9494
progressReporter := NewImportDataProgressReporter(true)
9595
workerPool := pool.New().WithMaxGoroutines(2)
9696
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
9797

9898
for !taskImporter.AllBatchesSubmitted() {
99-
err := taskImporter.SubmitNextBatch()
99+
err := taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
100100
assert.NoError(t, err)
101101
}
102102

@@ -139,7 +139,7 @@ func TestTaskImportResumable(t *testing.T) {
139139
2, "world"
140140
3, "foo"
141141
4, "bar"`
142-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume")
142+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume", 1)
143143
testutils.FatalIfError(t, err)
144144

145145
progressReporter := NewImportDataProgressReporter(true)
@@ -148,7 +148,7 @@ func TestTaskImportResumable(t *testing.T) {
148148
testutils.FatalIfError(t, err)
149149

150150
// submit 1 batch
151-
err = taskImporter.SubmitNextBatch()
151+
err = taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
152152
assert.NoError(t, err)
153153

154154
// check that the first batch was imported
@@ -165,7 +165,7 @@ func TestTaskImportResumable(t *testing.T) {
165165
testutils.FatalIfError(t, err)
166166

167167
// submit second batch, not first batch again as it was already imported
168-
err = taskImporter.SubmitNextBatch()
168+
err = taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
169169
assert.NoError(t, err)
170170

171171
assert.Equal(t, true, taskImporter.AllBatchesSubmitted())
@@ -198,7 +198,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {
198198
2, "world"
199199
3, "foo"
200200
4, "bar"`
201-
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume_no_pk")
201+
_, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table_resume_no_pk", 1)
202202
testutils.FatalIfError(t, err)
203203

204204
progressReporter := NewImportDataProgressReporter(true)
@@ -207,7 +207,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {
207207
testutils.FatalIfError(t, err)
208208

209209
// submit 1 batch
210-
err = taskImporter.SubmitNextBatch()
210+
err = taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
211211
assert.NoError(t, err)
212212

213213
// check that the first batch was imported
@@ -224,7 +224,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {
224224
testutils.FatalIfError(t, err)
225225

226226
// submit second batch, not first batch again as it was already imported
227-
err = taskImporter.SubmitNextBatch()
227+
err = taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
228228
assert.NoError(t, err)
229229

230230
assert.Equal(t, true, taskImporter.AllBatchesSubmitted())

0 commit comments

Comments
 (0)