Skip to content

Commit c196a2a

Browse files
authored
select 1 as first stmt in event batch on YB (#1389)
1 parent 6df1fef commit c196a2a

File tree

1 file changed

+16
-1
lines changed

1 file changed

+16
-1
lines changed

yb-voyager/src/tgtdb/yugabytedb.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,16 @@ func (yb *TargetYugabyteDB) ExecuteBatch(migrationUUID uuid.UUID, batch *EventBa
602602
}
603603
}
604604

605+
// This is an additional safety net to workaround
606+
// issue in YB where in batched execution, transactions can be retried partially, breaking atomicity.
607+
// SELECT 1 causes the ysql layer to record that data was sent back to the user, thereby, preventing retries
608+
// https://yugabyte.slack.com/archives/CAR5BCH29/p1708320808330589
609+
res, err := tx.Exec(ctx, "SELECT 1")
610+
if err != nil || res.RowsAffected() == 0 {
611+
log.Errorf("error executing stmt: %v, rowsAffected: %v", err, res.RowsAffected())
612+
return false, fmt.Errorf("failed to run SELECT 1 query: %w, rowsAffected: %v",
613+
err, res.RowsAffected())
614+
}
605615
br := tx.SendBatch(ctx, &ybBatch)
606616
closeBatch := func() error {
607617
if closeErr := br.Close(); closeErr != nil {
@@ -624,7 +634,7 @@ func (yb *TargetYugabyteDB) ExecuteBatch(migrationUUID uuid.UUID, batch *EventBa
624634
}
625635

626636
updateVsnQuery := batch.GetChannelMetadataUpdateQuery(migrationUUID)
627-
res, err := tx.Exec(context.Background(), updateVsnQuery)
637+
res, err = tx.Exec(context.Background(), updateVsnQuery)
628638
if err != nil || res.RowsAffected() == 0 {
629639
log.Errorf("error executing stmt: %v, rowsAffected: %v", err, res.RowsAffected())
630640
return false, fmt.Errorf("failed to update vsn on target db via query-%s: %w, rowsAffected: %v",
@@ -884,6 +894,8 @@ const (
884894
SET_SESSION_REPLICATE_ROLE_TO_REPLICA = "SET session_replication_role TO replica" //Disable triggers or fkeys constraint checks.
885895
SET_YB_ENABLE_UPSERT_MODE = "SET yb_enable_upsert_mode to true"
886896
SET_YB_DISABLE_TRANSACTIONAL_WRITES = "SET yb_disable_transactional_writes to true" // Disable transactions to improve ingestion throughput.
897+
// The "SELECT 1" workaround introduced in ExecuteBatch does not work if isolation level is read_committed. Therefore, for now, we are forcing REPEATABLE READ.
898+
SET_DEFAULT_ISOLATION_LEVEL_REPEATABLE_READ = "SET default_transaction_isolation = 'repeatable read'"
887899
)
888900

889901
func getYBSessionInitScript(tconf *TargetConf) []string {
@@ -894,6 +906,9 @@ func getYBSessionInitScript(tconf *TargetConf) []string {
894906
if checkSessionVariableSupport(tconf, SET_SESSION_REPLICATE_ROLE_TO_REPLICA) {
895907
sessionVars = append(sessionVars, SET_SESSION_REPLICATE_ROLE_TO_REPLICA)
896908
}
909+
if checkSessionVariableSupport(tconf, SET_DEFAULT_ISOLATION_LEVEL_REPEATABLE_READ) {
910+
sessionVars = append(sessionVars, SET_DEFAULT_ISOLATION_LEVEL_REPEATABLE_READ)
911+
}
897912

898913
if tconf.EnableUpsert {
899914
// upsert_mode parameters was introduced later than yb_disable_transactional writes in yb releases

0 commit comments

Comments
 (0)