Skip to content

Commit febabf4

Browse files
Multiple import data COPY command error handling improvements. (#220)
- Ignore Unique constraint violation in case of transactional mode - Add ROWS_PER_TRANSACTION in generated COPY command - Added extra layer of check before enabling upsert and transaction mode to findout if it is supported on that server version. - Improved retrying logic to retry for based on retryable vs non-retryable errors - Improved logging for automation tests in case of failure - Enabled upsert mode by default
1 parent 3a6dba6 commit febabf4

12 files changed

+124
-69
lines changed

.github/workflows/mysql-migtests.yml

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ jobs:
1616
- name: Run installer script
1717
run: |
1818
yes | ./installer_scripts/install-yb-voyager local
19+
env:
20+
ON_INSTALLER_ERROR_OUTPUT_LOG: Y
1921

2022
- name: Start MySQL
2123
run: |

.github/workflows/pg-migtests.yml

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ jobs:
3232
run: |
3333
cd installer_scripts
3434
yes | ./install-yb-voyager local
35+
env:
36+
ON_INSTALLER_ERROR_OUTPUT_LOG: Y
3537

3638
- name: Test PostgreSQL Connection
3739
run: |

installer_scripts/install-yb-voyager

+5-6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ on_exit() {
4545
echo "Done!"
4646
else
4747
echo "Script failed. Check log file ${LOG_FILE} ."
48+
if [[ ${ON_INSTALLER_ERROR_OUTPUT_LOG:-N} = "Y" ]]
49+
then
50+
sudo cat ${LOG_FILE}
51+
fi
4852
fi
4953
}
5054

@@ -319,13 +323,8 @@ update_bashrc() {
319323

320324
create_yb_session_vars_file() {
321325
vars_file_name="/etc/yb-voyager/ybSessionVariables.sql"
322-
if [ -f "$vars_file_name" ]
323-
then
324-
output "No need to create ybSessionVariables.sql again. Skipping."
325-
return
326-
fi
327326
sudo mkdir -p /etc/yb-voyager
328-
sudo wget -qO $vars_file_name https://github.com/yugabyte/yb-voyager/raw/main/yb-voyager/files/ybSessionVariables.sql
327+
sudo touch $vars_file_name
329328
}
330329

331330
create_base_ora2pg_conf_file() {

yb-voyager/cmd/constants.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,27 @@ const (
3333
LAST_SPLIT_NUM = 0
3434
SPLIT_INFO_PATTERN = "[0-9]*.[0-9]*.[0-9]*.[0-9]*"
3535
LAST_SPLIT_PATTERN = "0.[0-9]*.[0-9]*.[0-9]*"
36-
COPY_MAX_RETRY_COUNT = 5
37-
MAX_SLEEP_SECOND = 10
36+
COPY_MAX_RETRY_COUNT = 10
37+
MAX_SLEEP_SECOND = 60
3838
)
3939

40-
var IMPORT_SESSION_SETTERS = []string{
41-
"SET client_encoding TO 'UTF8';",
42-
// Disable transactions to improve ingestion throughput.
43-
"SET yb_disable_transactional_writes to true;",
44-
//Disable triggers or fkeys constraint checks.
45-
"SET session_replication_role TO replica;",
46-
// Enable UPSERT mode instead of normal inserts into a table.
47-
"SET yb_enable_upsert_mode to true;",
48-
}
40+
// import session parameters
41+
const (
42+
SET_CLIENT_ENCODING_TO_UTF8 = "SET client_encoding TO 'UTF8'"
43+
SET_SESSION_REPLICATE_ROLE_TO_REPLICA = "SET session_replication_role TO replica" //Disable triggers or fkeys constraint checks.
44+
SET_YB_ENABLE_UPSERT_MODE = "SET yb_enable_upsert_mode to true"
45+
SET_YB_DISABLE_TRANSACTIONAL_WRITES = "SET yb_disable_transactional_writes to true" // Disable transactions to improve ingestion throughput.
46+
)
4947

5048
var supportedSourceDBTypes = []string{ORACLE, MYSQL, POSTGRESQL}
5149

5250
var validSSLModes = map[string][]string{
5351
"mysql": {"disable", "prefer", "require", "verify-ca", "verify-full"},
5452
"postgresql": {"disable", "allow", "prefer", "require", "verify-ca", "verify-full"},
5553
}
54+
55+
var NonRetryCopyErrors = []string{
56+
"Sending too long RPC message",
57+
"invalid input syntax",
58+
"violates unique constraint",
59+
}

yb-voyager/cmd/import.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ func registerCommonImportFlags(cmd *cobra.Command) {
147147
"For example: \"host1:port1,host2:port2\" or \"host1,host2\"\n"+
148148
"Note: use-public-ip flag will be ignored if this is used.")
149149

150-
cmd.Flags().BoolVar(&enableUpsert, "enable-upsert", false,
151-
"true - to enable upsert for insert in target tables (default false)")
150+
cmd.Flags().BoolVar(&enableUpsert, "enable-upsert", true,
151+
"true - to enable upsert for insert in target tables")
152152

153153
// flag existence depends on fix of this gh issue: https://github.com/yugabyte/yugabyte-db/issues/12464
154154
cmd.Flags().BoolVar(&disableTransactionalWrites, "disable-transactional-writes", false,

yb-voyager/cmd/importData.go

+78-38
Original file line numberDiff line numberDiff line change
@@ -838,10 +838,12 @@ func doOneImport(task *SplitFileImportTask, connPool *tgtdb.ConnectionPool) {
838838

839839
// copyCommand is empty when there are no rows for that table
840840
if copyCommand != "" {
841-
var rowsCount int64
841+
copyCommand = fmt.Sprintf(copyCommand, (task.OffsetEnd - task.OffsetStart))
842+
log.Infof("COPY command: %s", copyCommand)
843+
var rowsAffected int64
842844
var copyErr error
843845
// if retry=n, total try call will be n+1
844-
copyRetryCount := COPY_MAX_RETRY_COUNT + 1
846+
remainingRetries := COPY_MAX_RETRY_COUNT + 1
845847

846848
copyErr = connPool.WithConn(func(conn *pgx.Conn) (bool, error) {
847849
// reset the reader to begin for every call
@@ -855,39 +857,44 @@ func doOneImport(task *SplitFileImportTask, connPool *tgtdb.ConnectionPool) {
855857
}
856858
}
857859
res, err := conn.PgConn().CopyFrom(context.Background(), reader, copyCommand)
858-
rowsCount = res.RowsAffected()
860+
rowsAffected = res.RowsAffected()
859861

862+
if err != nil && utils.InsensitiveSliceContains(NonRetryCopyErrors, err.Error()) {
863+
return false, err
864+
}
865+
866+
/*
867+
Note: If a user retries after deleting some row(s) from a batch,
868+
yb-voyager will never be able to mark the batch as completed
869+
github issue: https://github.com/yugabyte/yb-voyager/issues/223
870+
*/
860871
if err != nil {
861872
log.Warnf("COPY FROM file %q: %s", inProgressFilePath, err)
862-
if !strings.Contains(err.Error(), "violates unique constraint") {
863-
log.Errorf("RETRYING.. COPY %q FROM file %q due to encountered error: %v ", task.TableName, inProgressFilePath, err)
864-
duration := time.Duration(math.Min(MAX_SLEEP_SECOND, math.Pow(2, float64(COPY_MAX_RETRY_COUNT+1-copyRetryCount))))
865-
log.Infof("sleep for duration %d before retrying...", duration)
866-
time.Sleep(time.Second * duration) // delay for 1 sec before retrying
867-
copyRetryCount--
868-
return copyRetryCount > 0, err
873+
log.Errorf("RETRYING.. COPY %q FROM file %q due to encountered error: %v ", task.TableName, inProgressFilePath, err)
874+
875+
remainingRetries--
876+
if remainingRetries > 0 {
877+
retryNum := COPY_MAX_RETRY_COUNT + 1 - remainingRetries
878+
duration := time.Duration(math.Min(MAX_SLEEP_SECOND, 10*float64(retryNum)))
879+
log.Infof("sleep for duration %d before retrying the file %s for %d time...",
880+
duration, inProgressFilePath, retryNum)
881+
time.Sleep(time.Second * duration)
869882
}
883+
return remainingRetries > 0, err
870884
}
871885

872886
return false, err
873887
})
874888

875-
log.Infof("%q => %d rows affected", copyCommand, rowsCount)
889+
log.Infof("%q => %d rows affected", copyCommand, rowsAffected)
876890
if copyErr != nil {
877-
log.Warnf("COPY FROM file %q: %s", inProgressFilePath, copyErr)
878-
if !strings.Contains(copyErr.Error(), "violates unique constraint") {
891+
if !disableTransactionalWrites && strings.Contains(copyErr.Error(), "violates unique constraint") {
892+
log.Infof("Ignoring encountered Error: %v, Assuming batch is already imported due to transactional mode", copyErr)
893+
} else {
879894
utils.ErrExit("COPY %q FROM file %q: %s", task.TableName, inProgressFilePath, copyErr)
880-
} else { //in case of unique key violation error take row count from the split task
881-
rowsCount = task.OffsetEnd - task.OffsetStart
882-
log.Infof("got error:%v, assuming affected rows count %v for %q", copyErr, rowsCount, task.TableName)
883895
}
884896
}
885897

886-
if rowsCount != task.OffsetEnd-task.OffsetStart {
887-
// TODO: print info/details about missed rows on the screen after progress bar is complete
888-
log.Warnf("Expected to import %v records from %s. Imported %v.",
889-
task.OffsetEnd-task.OffsetStart, inProgressFilePath, rowsCount)
890-
}
891898
incrementImportProgressBar(task.TableName, inProgressFilePath)
892899
}
893900
doneFilePath := getDoneFilePath(task)
@@ -1001,6 +1008,7 @@ func extractCopyStmtForTable(table string, fileToSearchIn string) {
10011008
utils.ErrExit("error while readline for extraction of copy stmt from file %q: %v", fileToSearchIn, err)
10021009
}
10031010
if copyCommandRegex.MatchString(line) {
1011+
line = strings.Trim(line, ";") + ` WITH (ROWS_PER_TRANSACTION %v)`
10041012
copyTableFromCommands[table] = line
10051013
log.Infof("copyTableFromCommand for table %q is %q", table, line)
10061014
return
@@ -1038,43 +1046,75 @@ func getProgressAmount(filePath string) int64 {
10381046
}
10391047

10401048
func getYBSessionInitScript() []string {
1041-
sessionVarsPath := "/etc/yb-voyager/ybSessionVariables.sql"
10421049
var sessionVars []string
1043-
disableTransactionalWritesCmd := fmt.Sprintf("SET yb_disable_transactional_writes to %v", disableTransactionalWrites)
1044-
enableUpsertCmd := fmt.Sprintf("SET yb_enable_upsert_mode to %v", enableUpsert)
1045-
defaultSessionVars := []string{
1046-
"SET client_encoding to 'UTF-8'",
1047-
"SET session_replication_role to replica",
1048-
disableTransactionalWritesCmd,
1049-
enableUpsertCmd,
1050+
if checkSessionVariableSupport(SET_CLIENT_ENCODING_TO_UTF8) {
1051+
sessionVars = append(sessionVars, SET_CLIENT_ENCODING_TO_UTF8)
1052+
}
1053+
if checkSessionVariableSupport(SET_SESSION_REPLICATE_ROLE_TO_REPLICA) {
1054+
sessionVars = append(sessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA)
10501055
}
10511056

1057+
if enableUpsert {
1058+
// upsert_mode parameters was introduced later than yb_disable_transactional writes in yb releases
1059+
// hence if upsert_mode is supported then its safe to assume yb_disable_transactional_writes is already there
1060+
if checkSessionVariableSupport(SET_YB_ENABLE_UPSERT_MODE) {
1061+
sessionVars = append(sessionVars, SET_YB_ENABLE_UPSERT_MODE)
1062+
// SET_YB_DISABLE_TRANSACTIONAL_WRITES is used only with & if upsert_mode is supported
1063+
if disableTransactionalWrites {
1064+
if checkSessionVariableSupport(SET_YB_DISABLE_TRANSACTIONAL_WRITES) {
1065+
sessionVars = append(sessionVars, SET_YB_DISABLE_TRANSACTIONAL_WRITES)
1066+
} else {
1067+
disableTransactionalWrites = false
1068+
}
1069+
}
1070+
} else {
1071+
log.Infof("Falling back to transactional inserts of batches during data import")
1072+
}
1073+
}
1074+
1075+
sessionVarsPath := "/etc/yb-voyager/ybSessionVariables.sql"
10521076
if !utils.FileOrFolderExists(sessionVarsPath) {
1053-
return defaultSessionVars
1077+
log.Infof("YBSessionInitScript: %v\n", sessionVars)
1078+
return sessionVars
10541079
}
10551080

10561081
varsFile, err := os.Open(sessionVarsPath)
10571082
if err != nil {
10581083
utils.PrintAndLog("Unable to open %s : %v. Using default values.", sessionVarsPath, err)
1059-
return defaultSessionVars
1084+
log.Infof("YBSessionInitScript: %v\n", sessionVars)
1085+
return sessionVars
10601086
}
10611087
defer varsFile.Close()
10621088
fileScanner := bufio.NewScanner(varsFile)
10631089

10641090
var curLine string
10651091
for fileScanner.Scan() {
10661092
curLine = strings.TrimSpace(fileScanner.Text())
1067-
sessionVars = append(sessionVars, curLine)
1093+
if curLine != "" && checkSessionVariableSupport(curLine) {
1094+
sessionVars = append(sessionVars, curLine)
1095+
}
10681096
}
1097+
log.Infof("YBSessionInitScript: %v\n", sessionVars)
1098+
return sessionVars
1099+
}
10691100

1070-
//Only override the file if the flags are explicitly true (default false)
1071-
if enableUpsert {
1072-
sessionVars = append(sessionVars, enableUpsertCmd)
1101+
func checkSessionVariableSupport(sqlStmt string) bool {
1102+
conn, err := pgx.Connect(context.Background(), target.GetConnectionUri())
1103+
if err != nil {
1104+
utils.ErrExit("error while creating connection for checking session parameter(%q) support: %v", sqlStmt, err)
10731105
}
1074-
if disableTransactionalWrites {
1075-
sessionVars = append(sessionVars, disableTransactionalWritesCmd)
1106+
defer conn.Close(context.Background())
1107+
1108+
_, err = conn.Exec(context.Background(), sqlStmt)
1109+
if err != nil {
1110+
if !strings.Contains(err.Error(), "unrecognized configuration parameter") {
1111+
utils.ErrExit("error while executing sqlStatement=%q: %v", sqlStmt, err)
1112+
} else {
1113+
log.Warnf("Warning: %q is not supported: %v", sqlStmt, err)
1114+
}
10761115
}
1077-
return sessionVars
1116+
1117+
return err == nil
10781118
}
10791119

10801120
func removeExcludeTables(tableList []string, excludeTableList []string) []string {

yb-voyager/cmd/importDataFileCommand.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,18 @@ func prepareCopyCommands() {
110110
if err != nil {
111111
utils.ErrExit("opening datafile %q to prepare copy command: %v", err)
112112
}
113-
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s(%s) FROM STDIN WITH (FORMAT %s, DELIMITER '%c', ESCAPE '%s', QUOTE '%s', HEADER)`,
113+
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s(%s) FROM STDIN WITH (FORMAT %s, DELIMITER '%c', ESCAPE '%s', QUOTE '%s', HEADER,`,
114114
table, df.GetHeader(), fileFormat, []rune(delimiter)[0], fileOptsMap["escape_char"], fileOptsMap["quote_char"])
115115
} else {
116-
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s FROM STDIN WITH (FORMAT %s, DELIMITER '%c', ESCAPE '%s', QUOTE '%s')`,
116+
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s FROM STDIN WITH (FORMAT %s, DELIMITER '%c', ESCAPE '%s', QUOTE '%s', `,
117117
table, fileFormat, []rune(delimiter)[0], fileOptsMap["escape_char"], fileOptsMap["quote_char"])
118118
}
119119
} else if fileFormat == datafile.TEXT {
120-
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s FROM STDIN WITH (FORMAT %s, DELIMITER '%c')`, table, fileFormat, []rune(delimiter)[0])
120+
copyTableFromCommands[table] = fmt.Sprintf(`COPY %s FROM STDIN WITH (FORMAT %s, DELIMITER '%c', `, table, fileFormat, []rune(delimiter)[0])
121121
} else {
122122
panic(fmt.Sprintf("File Type %q not implemented\n", fileFormat))
123123
}
124+
copyTableFromCommands[table] += ` ROWS_PER_TRANSACTION %v)`
124125
}
125126

126127
log.Infof("copyTableFromCommands map: %+v", copyTableFromCommands)

yb-voyager/cmd/importSchemaYugabyteDB.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,10 @@ func YugabyteDBImportSchema(target *tgtdb.Target, exportDir string) {
7272
utils.ErrExit("Failed to run %q on target DB: %s", setSchemaQuery, err)
7373
}
7474

75-
setClientEncQuery := IMPORT_SESSION_SETTERS[0]
76-
log.Infof("Running query %q on the target DB", setClientEncQuery)
77-
_, err = conn.Exec(context.Background(), setClientEncQuery)
75+
log.Infof("Running query %q on the target DB", SET_CLIENT_ENCODING_TO_UTF8)
76+
_, err = conn.Exec(context.Background(), SET_CLIENT_ENCODING_TO_UTF8)
7877
if err != nil {
79-
utils.ErrExit("Failed to run %q on target DB: %s", setClientEncQuery, err)
78+
utils.ErrExit("Failed to run %q on target DB: %s", SET_CLIENT_ENCODING_TO_UTF8, err)
8079
}
8180
}
8281

yb-voyager/files/ybSessionVariables.sql

-2
This file was deleted.

yb-voyager/src/srcdb/postgres.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (pg *PostgreSQL) ExportDataPostProcessing(exportDir string, tablesProgressM
135135
renameDataFiles(tablesProgressMetadata)
136136
exportedRowCount := getExportedRowCount(tablesProgressMetadata)
137137
dfd := datafile.Descriptor{
138-
FileFormat: datafile.CSV,
138+
FileFormat: datafile.TEXT,
139139
TableRowCount: exportedRowCount,
140140
Delimiter: "\t",
141141
HasHeader: false,

yb-voyager/src/tgtdb/conn_pool.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package tgtdb
33
import (
44
"context"
55
"math/rand"
6-
"strings"
76
"sync"
87

98
"github.com/jackc/pgx/v4"
@@ -121,7 +120,7 @@ func (pool *ConnectionPool) getNextUriIndex() int {
121120
func (pool *ConnectionPool) initSession(conn *pgx.Conn) error {
122121
for _, v := range pool.params.SessionInitScript {
123122
_, err := conn.Exec(context.Background(), v)
124-
if err != nil && !strings.Contains(err.Error(), "unrecognized configuration parameter") {
123+
if err != nil {
125124
return err
126125
}
127126
}

yb-voyager/src/utils/utils.go

+11
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,14 @@ func LookupIP(name string) []string {
278278
}
279279
return result
280280
}
281+
282+
func InsensitiveSliceContains(slice []string, s string) bool {
283+
for i := 0; i < len(slice); i++ {
284+
if strings.Contains(strings.ToLower(s), strings.ToLower(slice[i])) {
285+
log.Infof("string s=%q contains slice[i]=%q", s, slice[i])
286+
return true
287+
}
288+
}
289+
log.Infof("string s=%q did not match with any string in %v", s, slice)
290+
return false
291+
}

0 commit comments

Comments
 (0)