This repository was archived by the owner on Sep 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcmd_fullcopy.go
125 lines (104 loc) · 2.7 KB
/
cmd_fullcopy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
)
type cmdFullCopy struct {
srcDB *mssql
dstDB *postgres
metaDB *sqlite
batchSize uint
tables []string
}
func newCmdFullCopy(srcDB *mssql, dstDB *postgres, metaDB *sqlite, batchSize uint, tables []string) *cmdFullCopy {
return &cmdFullCopy{
srcDB: srcDB,
dstDB: dstDB,
metaDB: metaDB,
batchSize: batchSize,
tables: tables,
}
}
func (cmd *cmdFullCopy) start(ctx context.Context) error {
tables, err := cmd.srcDB.getTables(cmd.tables, nil)
if err != nil {
return fmt.Errorf("get table list from source: %w", err)
}
tables = tables[len(tables)-len(cmd.tables):]
if err := stashDstForeignKeys(cmd.dstDB, cmd.metaDB); err != nil {
return fmt.Errorf("backup & drop foreign keys: %w", err)
}
for _, t := range tables {
log.Info().Msgf("[%s] Starting table copy...", t)
for {
err := cmd.truncateAndCopy(ctx, t)
if err == nil {
break
}
if errors.Is(err, context.Canceled) {
return fmt.Errorf("copy table: %w", err)
}
log.Err(err).Msgf("[%s] Error copying table (will retry).", t)
select {
case <-ctx.Done():
return fmt.Errorf("aborted, reason: %w", err)
case <-time.After(5 * time.Minute):
}
}
log.Info().Msgf("[%s] Table copied.", t)
}
return nil
}
func (cmd *cmdFullCopy) truncateAndCopy(ctx context.Context, t tableInfo) error {
if err := stashDstIndexes(cmd.dstDB, cmd.metaDB, t); err != nil {
return fmt.Errorf("save and drop dst indexes: %w", err)
}
var (
rowChan = make(chan rowData)
errChan = make(chan error, 2)
newCtx, cancel = context.WithCancel(ctx)
wg = &sync.WaitGroup{}
)
wg.Add(2)
defer cancel()
srcCount, _ := cmd.srcDB.getRowCount(t)
bar := progressbar.NewOptions64(srcCount,
progressbar.OptionThrottle(500*time.Millisecond),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetItsString("rows"),
)
cb := func() {
bar.Add64(1)
}
go func() {
if err := cmd.srcDB.readRows(newCtx, t, rowChan, fmt.Sprintf("SELECT * FROM [%s].[%s]", t.schema, t.name)); err != nil {
errChan <- err
cancel()
}
close(rowChan)
wg.Done()
}()
go func() {
if _, err := cmd.dstDB.insertRows(newCtx, dstTable(t), true, cmd.batchSize, rowChan, cb); err != nil {
errChan <- err
cancel()
}
wg.Done()
}()
wg.Wait()
close(errChan)
if err := <-errChan; err != nil {
return fmt.Errorf("read/write data: %w", err)
}
log.Info().Msgf("[%s] Rebuilding indexes...", t)
if err := restoreDstIndexes(ctx, cmd.dstDB, cmd.metaDB, t); err != nil {
return fmt.Errorf("restore dst indexes: %w", err)
}
return nil
}