Skip to content

Commit 9255492

Browse files
authored
Name registry integration - Import Data + Stats (#1384)
1 parent c196a2a commit 9255492

34 files changed

+1308
-1169
lines changed

.github/workflows/go.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Go
22

33
on:
44
push:
5-
branches: [ main ]
5+
branches: [main]
66
pull_request:
7-
branches: [ main ]
7+
branches: [main]
88

99
jobs:
1010

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ migtests/tests/analyze-schema/dummy-export-dir/.*.lck
3737

3838
# debezium server
3939
debezium-server-voyager/**/target/
40-
*.iml
40+
*.iml
41+
42+
**/dummy_name_registry.json

migtests/scripts/functions.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ end_migration() {
477477
yb-voyager end migration --export-dir ${EXPORT_DIR} \
478478
--backup-dir ${BACKUP_DIR} --backup-schema-files true \
479479
--backup-data-files true --backup-log-files true \
480-
--save-migration-reports true $* || {
480+
--save-migration-reports true $* || {
481481
cat ${EXPORT_DIR}/logs/yb-voyager-end-migration.log
482482
exit 1
483483
}

migtests/tests/pg/partitions/cleanup-db

100644100755
File mode changed.

migtests/tests/pg/partitions/fix-schema

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ sed -i 's/p2\.sydney/p2.sydney_region/g' $TEST_DIR/export-dir/schema/tables/tab
99

1010
sed -i 's/p2\.boston/p2.boston_region/g' $TEST_DIR/export-dir/schema/tables/INDEXES_table.sql
1111
sed -i 's/p2\.london/p2.london_region/g' $TEST_DIR/export-dir/schema/tables/INDEXES_table.sql
12-
sed -i 's/p2\.sydney/p2.sydney_region/g' $TEST_DIR/export-dir/schema/tables/INDEXES_table.sql
12+
sed -i 's/p2\.sydney/p2.sydney_region/g' $TEST_DIR/export-dir/schema/tables/INDEXES_table.sql

migtests/tests/pg/views-and-rules/cleanup-db

100644100755
File mode changed.

yb-voyager/cmd/common.go

+93-68
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ import (
4444
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
4545
"github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm"
4646
"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
47+
"github.com/yugabyte/yb-voyager/yb-voyager/src/namereg"
4748
"github.com/yugabyte/yb-voyager/yb-voyager/src/srcdb"
49+
"github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb"
4850
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
4951
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/jsonfile"
5052
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname"
@@ -340,7 +342,7 @@ func displayImportedRowCountSnapshot(state *ImportDataState, tasks []*ImportFile
340342
} else {
341343
fmt.Printf("snapshot import report\n")
342344
}
343-
tableList := importFileTasksToTableNames(tasks)
345+
tableList := importFileTasksToTableNameTuples(tasks)
344346
err := retrieveMigrationUUID()
345347
if err != nil {
346348
utils.ErrExit("could not retrieve migration UUID: %w", err)
@@ -352,15 +354,15 @@ func displayImportedRowCountSnapshot(state *ImportDataState, tasks []*ImportFile
352354
dbType = "source-replica"
353355
}
354356

355-
snapshotRowCount := make(map[string]int64)
357+
snapshotRowCount := utils.NewStructMap[sqlname.NameTuple, int64]()
356358

357359
if importerRole == IMPORT_FILE_ROLE {
358360
for _, tableName := range tableList {
359361
tableRowCount, err := state.GetImportedSnapshotRowCountForTable(tableName)
360362
if err != nil {
361363
utils.ErrExit("could not fetch snapshot row count for table %q: %w", tableName, err)
362364
}
363-
snapshotRowCount[tableName] = tableRowCount
365+
snapshotRowCount.Put(tableName, tableRowCount)
364366
}
365367
} else {
366368
snapshotRowCount, err = getImportedSnapshotRowsMap(dbType, tableList)
@@ -373,11 +375,10 @@ func displayImportedRowCountSnapshot(state *ImportDataState, tasks []*ImportFile
373375
if i == 0 {
374376
addHeader(uitable, "SCHEMA", "TABLE", "IMPORTED ROW COUNT")
375377
}
376-
table := tableName
377-
if len(strings.Split(tableName, ".")) == 2 {
378-
table = strings.Split(tableName, ".")[1]
379-
}
380-
uitable.AddRow(getTargetSchemaName(tableName), table, snapshotRowCount[tableName])
378+
s, t := tableName.ForCatalogQuery()
379+
380+
rowCount, _ := snapshotRowCount.Get(tableName)
381+
uitable.AddRow(s, t, rowCount)
381382
}
382383
if len(tableList) > 0 {
383384
fmt.Printf("\n")
@@ -452,6 +453,49 @@ func initMetaDB() {
452453
}
453454
}
454455

456+
func InitNameRegistry(
457+
exportDir string, role string,
458+
sconf *srcdb.Source, sdb srcdb.SourceDB,
459+
tconf *tgtdb.TargetConf, tdb tgtdb.TargetDB) error {
460+
461+
var sdbReg namereg.SourceDBInterface
462+
var ybdb namereg.YBDBInterface
463+
var sourceDbType, sourceDbSchema, sourceDbName string
464+
var targetDBSchema string
465+
466+
if sconf != nil {
467+
sourceDbType = sconf.DBType
468+
sourceDbName = sconf.DBName
469+
sourceDbSchema = sconf.Schema
470+
}
471+
if sdb != nil {
472+
sdbReg = sdb.(namereg.SourceDBInterface)
473+
}
474+
475+
if tconf != nil {
476+
targetDBSchema = tconf.Schema
477+
}
478+
var ok bool
479+
if tdb != nil && lo.Contains([]string{TARGET_DB_IMPORTER_ROLE, IMPORT_FILE_ROLE}, role) {
480+
ybdb, ok = tdb.(namereg.YBDBInterface)
481+
if !ok {
482+
return fmt.Errorf("expected targetDB to adhere to YBDBRegirsty")
483+
}
484+
}
485+
nameregistryParams := namereg.NameRegistryParams{
486+
FilePath: fmt.Sprintf("%s/metainfo/name_registry.json", exportDir),
487+
Role: role,
488+
SourceDBType: sourceDbType,
489+
SourceDBSchema: sourceDbSchema,
490+
SourceDBName: sourceDbName,
491+
TargetDBSchema: targetDBSchema,
492+
SDB: sdbReg,
493+
YBDB: ybdb,
494+
}
495+
496+
return namereg.InitNameRegistry(nameregistryParams)
497+
}
498+
455499
// sets the global variable migrationUUID after retrieving it from MigrationStatusRecord
456500
func retrieveMigrationUUID() error {
457501
if migrationUUID != uuid.Nil {
@@ -588,22 +632,20 @@ func validateMetaDBCreated() {
588632
}
589633
}
590634

591-
func getImportTableList(sourceTableList []string) []string {
635+
func getImportTableList(sourceTableList []string) ([]sqlname.NameTuple, error) {
592636
if importerRole == IMPORT_FILE_ROLE {
593-
return nil
637+
return nil, nil
594638
}
595-
var tableList []string
639+
var tableList []sqlname.NameTuple
596640
sqlname.SourceDBType = source.DBType
597641
for _, qualifiedTableName := range sourceTableList {
598-
// TODO: handle case sensitivity?
599-
tableName := sqlname.NewSourceNameFromQualifiedName(qualifiedTableName)
600-
table := tableName.ObjectName.MinQuoted
601-
if source.DBType == POSTGRESQL && tableName.SchemaName.MinQuoted != "public" {
602-
table = tableName.Qualified.MinQuoted
642+
table, err := namereg.NameReg.LookupTableName(qualifiedTableName)
643+
if err != nil {
644+
return nil, fmt.Errorf("lookup table %s in name registry : %v", qualifiedTableName, err)
603645
}
604646
tableList = append(tableList, table)
605647
}
606-
return tableList
648+
return tableList, nil
607649
}
608650

609651
func hideImportFlagsInFallForwardOrBackCmds(cmd *cobra.Command) {
@@ -769,25 +811,30 @@ func renameTableIfRequired(table string) (string, bool) {
769811
return table, false
770812
}
771813

772-
func getExportedSnapshotRowsMap(tableList []string, exportSnapshotStatus *ExportSnapshotStatus) (map[string]int64, map[string][]string, error) {
773-
snapshotRowsMap := make(map[string]int64)
774-
snapshotStatusMap := make(map[string][]string)
775-
for _, table := range tableList {
776-
tableStatus := exportSnapshotStatus.GetTableStatusByTableName(table)
777-
table = strings.TrimPrefix(table, "public.") //safely can remove it for now. TODO: fix with NameRegistry all such occurrences
778-
for _, status := range tableStatus {
779-
if status.FileName == "" {
780-
//in case of root table as well in the tablelist during export an entry with empty file name is there
781-
continue
782-
}
783-
snapshotRowsMap[table] += status.ExportedRowCountSnapshot
784-
snapshotStatusMap[table] = append(snapshotStatusMap[table], status.Status)
814+
func getExportedSnapshotRowsMap(exportSnapshotStatus *ExportSnapshotStatus) (*utils.StructMap[sqlname.NameTuple, int64], *utils.StructMap[sqlname.NameTuple, []string], error) {
815+
snapshotRowsMap := utils.NewStructMap[sqlname.NameTuple, int64]()
816+
snapshotStatusMap := utils.NewStructMap[sqlname.NameTuple, []string]()
817+
818+
for _, tableStatus := range exportSnapshotStatus.Tables {
819+
if tableStatus.FileName == "" {
820+
//in case of root table as well in the tablelist during export an entry with empty file name is there
821+
continue
822+
}
823+
nt, err := namereg.NameReg.LookupTableName(tableStatus.TableName)
824+
if err != nil {
825+
return nil, nil, fmt.Errorf("lookup table [%s] from name registry: %v", tableStatus.TableName, err)
785826
}
827+
existingSnapshotRows, _ := snapshotRowsMap.Get(nt)
828+
snapshotRowsMap.Put(nt, existingSnapshotRows+tableStatus.ExportedRowCountSnapshot)
829+
existingStatuses, _ := snapshotStatusMap.Get(nt)
830+
existingStatuses = append(existingStatuses, tableStatus.Status)
831+
snapshotStatusMap.Put(nt, existingStatuses)
786832
}
833+
787834
return snapshotRowsMap, snapshotStatusMap, nil
788835
}
789836

790-
func getImportedSnapshotRowsMap(dbType string, tableList []string) (map[string]int64, error) {
837+
func getImportedSnapshotRowsMap(dbType string, tableList []sqlname.NameTuple) (*utils.StructMap[sqlname.NameTuple, int64], error) {
791838
switch dbType {
792839
case "target":
793840
importerRole = TARGET_DB_IMPORTER_ROLE
@@ -802,45 +849,23 @@ func getImportedSnapshotRowsMap(dbType string, tableList []string) (map[string]i
802849
snapshotDataFileDescriptor = datafile.OpenDescriptor(exportDir)
803850
}
804851

805-
msr, err := metaDB.GetMigrationStatusRecord()
806-
if err != nil {
807-
return nil, fmt.Errorf("get migration status record: %w", err)
808-
}
809-
sourceSchemaCount := len(strings.Split(msr.SourceDBConf.Schema, "|"))
810-
811-
snapshotRowsMap := make(map[string]int64)
812-
for _, table := range tableList {
813-
parts := strings.Split(table, ".")
814-
schemaName := ""
815-
tableName := parts[0]
816-
if len(parts) > 1 {
817-
schemaName = parts[0]
818-
tableName = parts[1]
819-
}
820-
if sourceSchemaCount <= 1 && source.DBType != POSTGRESQL { //this check is for Oracle case
821-
schemaName = ""
822-
}
823-
if schemaName == "public" || schemaName == "" {
824-
table = tableName
825-
}
826-
//Now multiple files can be there for a table in case of partitions
827-
dataFiles := snapshotDataFileDescriptor.GetDataFileEntriesByTableName(table)
828-
if len(dataFiles) == 0 {
829-
dataFile := &datafile.FileEntry{
830-
FilePath: "",
831-
TableName: table,
832-
RowCount: 0,
833-
FileSize: 0,
834-
}
835-
dataFiles = append(dataFiles, dataFile)
852+
snapshotRowsMap := utils.NewStructMap[sqlname.NameTuple, int64]()
853+
dataFilePathNtMap := map[string]sqlname.NameTuple{}
854+
for _, fileEntry := range snapshotDataFileDescriptor.DataFileList {
855+
nt, err := namereg.NameReg.LookupTableName(fileEntry.TableName)
856+
if err != nil {
857+
return nil, fmt.Errorf("lookup table name from data file descriptor %s : %v", fileEntry.TableName, err)
836858
}
837-
for _, dataFile := range dataFiles {
838-
snapshotRowCount, err := state.GetImportedRowCount(dataFile.FilePath, dataFile.TableName)
839-
if err != nil {
840-
return nil, fmt.Errorf("could not fetch snapshot row count for table %q: %w", table, err)
841-
}
842-
snapshotRowsMap[table] += snapshotRowCount
859+
dataFilePathNtMap[fileEntry.FilePath] = nt
860+
}
861+
862+
for dataFilePath, nt := range dataFilePathNtMap {
863+
snapshotRowCount, err := state.GetImportedRowCount(dataFilePath, nt)
864+
if err != nil {
865+
return nil, fmt.Errorf("could not fetch snapshot row count for table %q: %w", nt, err)
843866
}
867+
existingRows, _ := snapshotRowsMap.Get(nt)
868+
snapshotRowsMap.Put(nt, existingRows+snapshotRowCount)
844869
}
845870
return snapshotRowsMap, nil
846871
}

yb-voyager/cmd/conflictDetectionCache.go

+10-18
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ limitations under the License.
1616
package cmd
1717

1818
import (
19-
"fmt"
2019
"sync"
2120

2221
"github.com/samber/lo"
2322
log "github.com/sirupsen/logrus"
2423
"github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb"
24+
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
25+
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname"
2526
)
2627

2728
/*
@@ -98,12 +99,12 @@ type ConflictDetectionCache struct {
9899
*/
99100
m map[int64]*tgtdb.Event
100101
cond *sync.Cond
101-
tableToUniqueKeyColumns map[string][]string
102+
tableToUniqueKeyColumns *utils.StructMap[sqlname.NameTuple, []string]
102103
evChans []chan *tgtdb.Event
103104
sourceDBType string
104105
}
105106

106-
func NewConflictDetectionCache(tableToIdentityColumnNames map[string][]string, evChans []chan *tgtdb.Event, sourceDBType string) *ConflictDetectionCache {
107+
func NewConflictDetectionCache(tableToIdentityColumnNames *utils.StructMap[sqlname.NameTuple, []string], evChans []chan *tgtdb.Event, sourceDBType string) *ConflictDetectionCache {
107108
c := &ConflictDetectionCache{}
108109
c.m = make(map[int64]*tgtdb.Event)
109110
c.cond = sync.NewCond(&c.Mutex)
@@ -117,6 +118,7 @@ func (c *ConflictDetectionCache) Put(event *tgtdb.Event) {
117118
c.Lock()
118119
defer c.Unlock()
119120
c.m[event.Vsn] = event.Copy()
121+
log.Infof("adding event vsn(%d) to conflict cache", event.Vsn)
120122
}
121123

122124
func (c *ConflictDetectionCache) WaitUntilNoConflict(incomingEvent *tgtdb.Event) {
@@ -163,11 +165,8 @@ func (c *ConflictDetectionCache) eventsConfict(cachedEvent *tgtdb.Event, incomin
163165
if !c.eventsAreOfSameTable(cachedEvent, incomingEvent) {
164166
return false
165167
}
166-
maybeQualifiedName := cachedEvent.TableName
167-
if (c.sourceDBType == "postgresql" || c.sourceDBType == "yugabytedb") && cachedEvent.SchemaName != "public" {
168-
maybeQualifiedName = fmt.Sprintf("%s.%s", cachedEvent.SchemaName, cachedEvent.TableName)
169-
}
170-
uniqueKeyColumns := c.tableToUniqueKeyColumns[maybeQualifiedName]
168+
169+
uniqueKeyColumns, _ := c.tableToUniqueKeyColumns.Get(cachedEvent.TableNameTup)
171170
/*
172171
Not checking for value of unique key values conflict in case of export from yb because of inconsistency issues in before values of events provided by yb-cdc
173172
TODO(future): Fix this in our debezium voyager plugin
@@ -189,7 +188,7 @@ func (c *ConflictDetectionCache) eventsConfict(cachedEvent *tgtdb.Event, incomin
189188
}
190189

191190
if conflict {
192-
log.Infof("conflict detected for table %s, between event1(vsn=%d) and event2(vsn=%d)", cachedEvent.TableName, cachedEvent.Vsn, incomingEvent.Vsn)
191+
log.Infof("conflict detected for table %s, between event1(vsn=%d) and event2(vsn=%d)", cachedEvent.TableNameTup, cachedEvent.Vsn, incomingEvent.Vsn)
193192
}
194193
return conflict
195194
}
@@ -201,20 +200,13 @@ func (c *ConflictDetectionCache) eventsConfict(cachedEvent *tgtdb.Event, incomin
201200

202201
if *cachedEvent.BeforeFields[column] == *incomingEvent.Fields[column] {
203202
log.Infof("conflict detected for table %s, column %s, between value of event1(vsn=%d, colVal=%s) and event2(vsn=%d, colVal=%s)",
204-
maybeQualifiedName, column, cachedEvent.Vsn, *cachedEvent.BeforeFields[column], incomingEvent.Vsn, *incomingEvent.Fields[column])
203+
cachedEvent.TableNameTup.ForKey(), column, cachedEvent.Vsn, *cachedEvent.BeforeFields[column], incomingEvent.Vsn, *incomingEvent.Fields[column])
205204
return true
206205
}
207206
}
208207
return false
209208
}
210209

211210
func (c *ConflictDetectionCache) eventsAreOfSameTable(event1 *tgtdb.Event, event2 *tgtdb.Event) bool {
212-
switch c.sourceDBType {
213-
case "oracle":
214-
return event1.TableName == event2.TableName
215-
case "postgresql", "yugabytedb":
216-
return event1.SchemaName == event2.SchemaName && event1.TableName == event2.TableName
217-
default:
218-
panic(fmt.Sprintf("unknown source database type %q for unique key conflict detection", c.sourceDBType))
219-
}
211+
return event1.TableNameTup.Equals(event2.TableNameTup)
220212
}

yb-voyager/cmd/exportData.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
4242
"github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm"
4343
"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
44-
"github.com/yugabyte/yb-voyager/yb-voyager/src/namereg"
4544
"github.com/yugabyte/yb-voyager/yb-voyager/src/srcdb"
4645
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
4746
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/jsonfile"
@@ -156,7 +155,7 @@ func exportData() bool {
156155
source.DB().CheckRequiredToolsAreInstalled()
157156
saveSourceDBConfInMSR()
158157
saveExportTypeInMSR()
159-
err = namereg.InitNameRegistry(exportDir, exporterRole, &source, source.DB(), nil, nil)
158+
err = InitNameRegistry(exportDir, exporterRole, &source, source.DB(), nil, nil)
160159
if err != nil {
161160
utils.ErrExit("initialize name registry: %v", err)
162161
}
@@ -493,11 +492,6 @@ func reportUnsupportedTables(finalTableList []*sqlname.SourceName) {
493492
}
494493
var nonPKTables []string
495494
for _, table := range finalTableList {
496-
if source.DBType == POSTGRESQL {
497-
if table.ObjectName.MinQuoted != table.ObjectName.Unquoted {
498-
caseSensitiveTables = append(caseSensitiveTables, table.Qualified.MinQuoted)
499-
}
500-
}
501495
if lo.Contains(allNonPKTables, table.Qualified.MinQuoted) {
502496
nonPKTables = append(nonPKTables, table.Qualified.MinQuoted)
503497
}

0 commit comments

Comments
 (0)