Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 1 addition & 82 deletions pkg/backup/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/backup/backupbase"
"github.com/cockroachdb/cockroach/pkg/backup/backupdest"
"github.com/cockroachdb/cockroach/pkg/backup/backupencryption"
"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
Expand Down Expand Up @@ -124,10 +122,8 @@ func backup(
details jobspb.BackupDetails,
settings *cluster.Settings,
defaultStore cloud.ExternalStorage,
storageByLocalityKV map[string]*cloudpb.ExternalStorage,
resumer *backupResumer,
backupManifest *backuppb.BackupManifest,
makeExternalStorage cloud.ExternalStorageFactory,
) (_ roachpb.RowCount, numBackupInstances int, _ error) {
resumerSpan := tracing.SpanFromContext(ctx)
var lastCheckpoint time.Time
Expand Down Expand Up @@ -384,77 +380,11 @@ func backup(
}
}

backupID := uuid.MakeV4()
backupManifest.ID = backupID
// Write additional partial descriptors to each node for partitioned backups.
if len(storageByLocalityKV) > 0 {
resumerSpan.RecordStructured(&types.StringValue{Value: "writing partition descriptors for partitioned backup"})
filesByLocalityKV := make(map[string][]backuppb.BackupManifest_File)
for _, file := range backupManifest.Files {
filesByLocalityKV[file.LocalityKV] = append(filesByLocalityKV[file.LocalityKV], file)
}

nextPartitionedDescFilenameID := 1
for kv, conf := range storageByLocalityKV {
backupManifest.LocalityKVs = append(backupManifest.LocalityKVs, kv)
// Set a unique filename for each partition backup descriptor. The ID
// ensures uniqueness, and the kv string appended to the end is for
// readability.
filename := fmt.Sprintf("%s_%d_%s", backupPartitionDescriptorPrefix,
nextPartitionedDescFilenameID, backupinfo.SanitizeLocalityKV(kv))
nextPartitionedDescFilenameID++
backupManifest.PartitionDescriptorFilenames = append(backupManifest.PartitionDescriptorFilenames, filename)
desc := backuppb.BackupPartitionDescriptor{
LocalityKV: kv,
Files: filesByLocalityKV[kv],
BackupID: backupID,
}

if err := func() error {
store, err := makeExternalStorage(ctx, *conf)
if err != nil {
return err
}
defer store.Close()
return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename,
encryption, &kmsEnv, &desc)
}(); err != nil {
return roachpb.RowCount{}, 0, err
}
}
}

// TODO(msbutler): version gate writing the old manifest once we can guarantee
// a cluster version that will not read the old manifest. This will occur when we delete
// LegacyFindPriorBackups and the fallback path in
// ListFullBackupsInCollection, which can occur when we completely rely on the
// backup index.
if err := backupinfo.WriteBackupManifest(ctx, defaultStore, backupbase.DeprecatedBackupManifestName,
encryption, &kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, 0, err
}

if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, defaultStore, encryption,
&kmsEnv, backupManifest); err != nil {
return roachpb.RowCount{}, 0, err
}

statsTable := getTableStatsForBackup(ctx, execCtx.ExecCfg().InternalDB.Executor(), backupManifest.Descriptors)
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &kmsEnv, &statsTable); err != nil {
if err := backupinfo.WriteBackupMetadata(ctx, execCtx, defaultStore, details, &kmsEnv, backupManifest, statsTable); err != nil {
return roachpb.RowCount{}, 0, err
}

if err := backupinfo.WriteBackupIndexMetadata(
ctx,
execCtx.ExecCfg(),
execCtx.User(),
execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
details,
backupManifest.RevisionStartTime,
); err != nil {
return roachpb.RowCount{}, 0, errors.Wrapf(err, "writing backup index metadata")
}

return backupManifest.EntryCounts, numBackupInstances, nil
}

Expand Down Expand Up @@ -692,15 +622,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

storageByLocalityKV := make(map[string]*cloudpb.ExternalStorage)
for kv, uri := range details.URIsByLocalityKV {
conf, err := cloud.ExternalStorageConfFromURI(uri, p.User())
if err != nil {
return err
}
storageByLocalityKV[kv] = &conf
}

mem := p.ExecCfg().RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)
var memSize int64
Expand Down Expand Up @@ -742,10 +663,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
details,
p.ExecCfg().Settings,
defaultStore,
storageByLocalityKV,
b,
backupManifest,
p.ExecCfg().DistSQLSrv.ExternalStorage,
)
if err == nil {
break
Expand Down
4 changes: 0 additions & 4 deletions pkg/backup/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ import (
)

const (
// backupPartitionDescriptorPrefix is the file name prefix for serialized
// BackupPartitionDescriptor protos.
backupPartitionDescriptorPrefix = "BACKUP_PART"

deprecatedPrivilegesBackupPreamble = "The existing privileges are being deprecated " +
"in favour of a fine-grained privilege model explained here " +
"https://www.cockroachlabs.com/docs/stable/backup.html#required-privileges. In a future release, to run"
Expand Down
4 changes: 4 additions & 0 deletions pkg/backup/backupbase/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
// LATEST files will be stored as we no longer want to overwrite it.
LatestHistoryDirectory = backupMetadataDirectory + "/" + "latest"

// BackupPartitionDescriptorPrefix is the file name prefix for serialized
// BackupPartitionDescriptor protos.
BackupPartitionDescriptorPrefix = "BACKUP_PART"

// DateBasedIncFolderName is the date format used when creating sub-directories
// storing incremental backups for auto-appendable backups.
// It is exported for testing backup inspection tooling.
Expand Down
1 change: 1 addition & 0 deletions pkg/backup/backupinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_klauspost_compress//gzip",
Expand Down
97 changes: 97 additions & 0 deletions pkg/backup/backupinfo/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gzip "github.com/klauspost/compress/gzip"
)
Expand Down Expand Up @@ -1776,3 +1777,99 @@ func ConstructDateBasedIncrementalFolderName(start, end time.Time) string {
start.Format(backupbase.DateBasedIncFolderNameSuffix),
)
}

// WriteBackupMetadata writes the manifest, backup index, and statistics to
// external storage.
func WriteBackupMetadata(
ctx context.Context,
execCtx sql.JobExecContext,
store cloud.ExternalStorage,
details jobspb.BackupDetails,
kmsEnv cloud.KMSEnv,
backupManifest *backuppb.BackupManifest,
statsTable backuppb.StatsTable,
) error {
backupID := uuid.MakeV4()
backupManifest.ID = backupID

// Write additional partial descriptors to each node for partitioned backups.
//
// TODO(msbutler): put this locality logic in a helper.
if len(details.URIsByLocalityKV) > 0 {

storageByLocalityKV := make(map[string]*cloudpb.ExternalStorage)
for kv, uri := range details.URIsByLocalityKV {
conf, err := cloud.ExternalStorageConfFromURI(uri, execCtx.User())
if err != nil {
return err
}
storageByLocalityKV[kv] = &conf
}

filesByLocalityKV := make(map[string][]backuppb.BackupManifest_File)
for _, file := range backupManifest.Files {
filesByLocalityKV[file.LocalityKV] = append(filesByLocalityKV[file.LocalityKV], file)
}

nextPartitionedDescFilenameID := 1
for kv, conf := range storageByLocalityKV {
backupManifest.LocalityKVs = append(backupManifest.LocalityKVs, kv)
// Set a unique filename for each partition backup descriptor. The ID
// ensures uniqueness, and the kv string appended to the end is for
// readability.
filename := fmt.Sprintf("%s_%d_%s", backupbase.BackupPartitionDescriptorPrefix,
nextPartitionedDescFilenameID, SanitizeLocalityKV(kv))
nextPartitionedDescFilenameID++
backupManifest.PartitionDescriptorFilenames = append(backupManifest.PartitionDescriptorFilenames, filename)
desc := backuppb.BackupPartitionDescriptor{
LocalityKV: kv,
Files: filesByLocalityKV[kv],
BackupID: backupID,
}

if err := func() error {
localityStore, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorage(ctx, *conf)
if err != nil {
return err
}
defer localityStore.Close()
return WriteBackupPartitionDescriptor(ctx, localityStore, filename,
details.EncryptionOptions, kmsEnv, &desc)
}(); err != nil {
return err
}
}
}

// TODO(msbutler): version gate writing the old manifest once we can guarantee
// a cluster version that will not read the old manifest. This will occur when we delete
// LegacyFindPriorBackups and the fallback path in
// ListFullBackupsInCollection, which can occur when we completely rely on the
// backup index.
if err := WriteBackupManifest(ctx, store, backupbase.DeprecatedBackupManifestName,
details.EncryptionOptions, kmsEnv, backupManifest); err != nil {
return err
}
if err := WriteMetadataWithExternalSSTs(ctx, store, details.EncryptionOptions,
kmsEnv, backupManifest); err != nil {
return err
}

if err := WriteTableStatistics(
ctx, store, details.EncryptionOptions, kmsEnv, &statsTable,
); err != nil {
return errors.Wrapf(err, "writing table statistics")
}

return errors.Wrapf(
WriteBackupIndexMetadata(
ctx,
execCtx.ExecCfg(),
execCtx.User(),
execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
details,
backupManifest.RevisionStartTime,
),
"writing backup index metadata",
)
}
50 changes: 3 additions & 47 deletions pkg/backup/compaction_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,50 +778,6 @@ func getBackupChain(
return manifests, localityInfo, baseEncryptionInfo, allIters, nil
}

// concludeBackupCompaction completes the backup compaction process after the backup has been
// completed by writing the manifest and associated metadata to the backup destination.
//
// TODO (kev-cao): Can move this helper to the backup code at some point.
func concludeBackupCompaction(
ctx context.Context,
execCtx sql.JobExecContext,
store cloud.ExternalStorage,
details jobspb.BackupDetails,
kmsEnv cloud.KMSEnv,
backupManifest *backuppb.BackupManifest,
) error {
backupID := uuid.MakeV4()
backupManifest.ID = backupID

if err := backupinfo.WriteBackupManifest(ctx, store, backupbase.DeprecatedBackupManifestName,
details.EncryptionOptions, kmsEnv, backupManifest); err != nil {
return err
}
if err := backupinfo.WriteMetadataWithExternalSSTs(ctx, store, details.EncryptionOptions,
kmsEnv, backupManifest); err != nil {
return err
}

statsTable := getTableStatsForBackup(ctx, execCtx.ExecCfg().InternalDB.Executor(), backupManifest.Descriptors)
if err := backupinfo.WriteTableStatistics(
ctx, store, details.EncryptionOptions, kmsEnv, &statsTable,
); err != nil {
return errors.Wrapf(err, "writing table statistics")
}

return errors.Wrapf(
backupinfo.WriteBackupIndexMetadata(
ctx,
execCtx.ExecCfg(),
execCtx.User(),
execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
details,
backupManifest.RevisionStartTime,
),
"writing backup index metadata",
)
}

// processProgress processes progress updates from the bulk processor for a backup and updates
// the associated manifest.
func processProgress(
Expand Down Expand Up @@ -922,9 +878,9 @@ func doCompaction(
); err != nil {
return err
}

return concludeBackupCompaction(
ctx, execCtx, defaultStore, details, kmsEnv, manifest,
statsTable := getTableStatsForBackup(ctx, execCtx.ExecCfg().InternalDB.Executor(), manifest.Descriptors)
return backupinfo.WriteBackupMetadata(
ctx, execCtx, defaultStore, details, kmsEnv, manifest, statsTable,
)
}

Expand Down