diff --git a/main.go b/main.go index 6b2690a..8ead2d5 100644 --- a/main.go +++ b/main.go @@ -343,9 +343,19 @@ func main() { return err } - bucket := c.String("s3-bucket") - prefix := c.String("s3-prefix") - return migrate.MigrateLogsS3(source, bucket, prefix) + target, err := sql.Open( + c.GlobalString("target-database-driver"), + c.GlobalString("target-database-datasource"), + ) + + if err != nil { + return err + } + + return migrate.MigrateLogsS3( + source, target, + c.GlobalString("s3-bucket"), + c.GlobalString("s3-prefix")) }, }, { diff --git a/migrate/db/mysql/ddl_gen.go b/migrate/db/mysql/ddl_gen.go index 64e46d8..9661fe2 100644 --- a/migrate/db/mysql/ddl_gen.go +++ b/migrate/db/mysql/ddl_gen.go @@ -124,6 +124,14 @@ var migrations = []struct { name: "create-table-org-secrets", stmt: createTableOrgSecrets, }, + { + name: "create-table-last-migrated-log-id", + stmt: createTableLastMigratedLogID, + }, + { + name: "create-record-last-migrated-log-id", + stmt: createRecordLastMigratedLogID, + }, } // Migrate performs the database migration. If the migration fails @@ -577,3 +585,22 @@ CREATE TABLE IF NOT EXISTS orgsecrets ( ,UNIQUE(secret_namespace, secret_name) ); ` + +// +// 013_create_table_last_migrated_log_id.sql +// + +var createTableLastMigratedLogID = ` +CREATE TABLE IF NOT EXISTS last_migrated_log_id ( + id INTEGER PRIMARY KEY +,log_id INTEGER +); +` + +// +// 014_create_record_last_migrated_log_id.sql +// + +var createRecordLastMigratedLogID = ` +INSERT IGNORE INTO last_migrated_log_id (id, log_id) VALUES (1, -1); +` diff --git a/migrate/db/postgres/ddl_gen.go b/migrate/db/postgres/ddl_gen.go index 7246dad..2bb02c7 100644 --- a/migrate/db/postgres/ddl_gen.go +++ b/migrate/db/postgres/ddl_gen.go @@ -120,6 +120,14 @@ var migrations = []struct { name: "create-table-org-secrets", stmt: createTableOrgSecrets, }, + { + name: "create-table-last-migrated-log-id", + stmt: createTableLastMigratedLogID, + }, + { + name: "create-record-last-migrated-log-id", + stmt: createRecordLastMigratedLogID, + }, } // Migrate performs the database migration. If the migration fails @@ -555,3 +563,23 @@ CREATE TABLE IF NOT EXISTS orgsecrets ( ,UNIQUE(secret_namespace, secret_name) ); ` + +// +// 013_create_table_last_migrated_log_id.sql +// + +var createTableLastMigratedLogID = ` +CREATE TABLE IF NOT EXISTS last_migrated_log_id ( + id INTEGER PRIMARY KEY +,log_id INTEGER +); +` + +// +// 014_create_record_last_migrated_log_id.sql +// + +var createRecordLastMigratedLogID = ` +INSERT INTO last_migrated_log_id (id, log_id) VALUES (1, -1) +ON CONFLICT DO NOTHING; +` diff --git a/migrate/db/sqlite/ddl_gen.go b/migrate/db/sqlite/ddl_gen.go index 6c0bb8b..9dc6708 100644 --- a/migrate/db/sqlite/ddl_gen.go +++ b/migrate/db/sqlite/ddl_gen.go @@ -120,6 +120,14 @@ var migrations = []struct { name: "create-table-org-secrets", stmt: createTableOrgSecrets, }, + { + name: "create-table-last-migrated-log-id", + stmt: createTableLastMigratedLogID, + }, + { + name: "create-record-last-migrated-log-id", + stmt: createRecordLastMigratedLogID, + }, } // Migrate performs the database migration. If the migration fails @@ -557,3 +565,22 @@ CREATE TABLE IF NOT EXISTS orgsecrets ( ,UNIQUE(secret_namespace, secret_name) ); ` + +// +// 013_create_table_last_migrated_log_id.sql +// + +var createTableLastMigratedLogID = ` +CREATE TABLE IF NOT EXISTS last_migrated_log_id ( + id INTEGER PRIMARY KEY +,log_id INTEGER +); +` + +// +// 014_create_record_last_migrated_log_id.sql +// + +var createRecordLastMigratedLogID = ` +INSERT OR IGNORE INTO last_migrated_log_id (id, log_id) VALUES (1, -1); +` diff --git a/migrate/logs.go b/migrate/logs.go index 16e0c3e..793fbda 100644 --- a/migrate/logs.go +++ b/migrate/logs.go @@ -64,11 +64,29 @@ func MigrateLogs(source, target *sql.DB) error { } // MigrateLogsS3 migrates the steps from the V0 database to S3. -func MigrateLogsS3(source *sql.DB, bucket, prefix string) error { +func MigrateLogsS3(source, target *sql.DB, bucket, prefix string) error { stepsV0 := []*StepV0{} + // get the last migrated log id + lastMigratedLog := &LastMigratedLogID{} + if err := meddler.QueryRow(target, lastMigratedLog, lastMigratedLogIDQuery); err != nil { + return err + } + lastMigratedLogID := lastMigratedLog.LogID + + defer func() { + err := meddler.Update(target, "last_migrated_log_id", &LastMigratedLogID{ + ID: 1, + LogID: lastMigratedLogID, + }) + if err != nil { + logrus.WithError(err).Warnf("cannot update last migrated log id: %d", lastMigratedLogID) + } + }() + // 1. load all stages from the V0 database. - err := meddler.QueryAll(source, &stepsV0, stepListQuery) + err := meddler.QueryAll( + source, &stepsV0, stepListFilterByIDQuery, lastMigratedLogID) if err != nil { return err } @@ -109,6 +127,7 @@ func MigrateLogsS3(source *sql.DB, bucket, prefix string) error { logrus.WithError(err).Errorln("migration failed") return err } + lastMigratedLogID = logsV0.ProcID } logrus.Infof("migration complete") diff --git a/migrate/steps.go b/migrate/steps.go index 95e0f93..dd11733 100644 --- a/migrate/steps.go +++ b/migrate/steps.go @@ -86,7 +86,23 @@ WHERE proc_ppid != 0 AND repo_user_id > 0 ` +const stepListFilterByIDQuery = ` +SELECT procs.* +FROM procs +INNER JOIN builds ON procs.proc_build_id = builds.build_id +INNER JOIN repos ON builds.build_repo_id = repos.repo_id +WHERE proc_ppid != 0 + AND repo_user_id > 0 + AND proc_id > ? +ORDER BY proc_id +` + const updateStepSeq = ` ALTER SEQUENCE steps_step_id_seq RESTART WITH %d ` + +const lastMigratedLogIDQuery = ` +SELECT log_id FROM last_migrated_log_id +WHERE id = 1 +` diff --git a/migrate/types.go b/migrate/types.go index 8d60342..b4d24eb 100644 --- a/migrate/types.go +++ b/migrate/types.go @@ -296,6 +296,12 @@ type ( Token string `meddler:"registry_token"` } + // LastMigratedLogID is a last migrated log id. + LastMigratedLogID struct { + ID int64 `meddler:"id,pk"` + LogID int64 `meddler:"log_id"` + } + // DockerConfig defines required attributes from Docker registry credentials. DockerConfig struct { AuthConfigs map[string]AuthConfig `json:"auths"`