Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/backup/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,25 @@ message BackupProgressTraceEvent {
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// RestoreDescRef is an (ID, Version) tuple identifying a descriptor that a
// RESTORE job has materialized in OFFLINE state. The descriptor body is
// fetched from KV when needed; persisting only the reference avoids
// embedding the full descriptor in the job's RestoreDetails payload, which
// can exceed the per-row raft command size limit at large descriptor counts.
message RestoreDescRef {
uint32 id = 1 [(gogoproto.customname) = "ID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
uint32 version = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion"];
}

// RestoreDescRefs is the value stored under each per-type info-key
// (restoreTableDescRefsKey, etc.) in system.job_info during a RESTORE.
// One row per descriptor type keeps individual rows well under the raft
// command size limit even at multi-million-descriptor scale.
message RestoreDescRefs {
repeated RestoreDescRef refs = 1 [(gogoproto.nullable) = false];
}

// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
Expand Down
302 changes: 302 additions & 0 deletions pkg/backup/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ const (
// restoreTempSystemDBPrefix is the prefix used for the temporary system
// database created during restore.
restoreTempSystemDBPrefix = "crdb_temp_system"

// Per-type info-keys used by RESTORE to persist (ID, Version) tuples for
// every descriptor materialized in OFFLINE state. The descriptor body is
// fetched from KV when needed; persisting only references keeps the
// per-row size well under the raft command size limit at multi-million
// descriptor scale. See backuppb.RestoreDescRefs for the value shape.
restoreTableDescRefsKey = "restore_table_desc_refs"
restoreTypeDescRefsKey = "restore_type_desc_refs"
restoreSchemaDescRefsKey = "restore_schema_desc_refs"
restoreDatabaseDescRefsKey = "restore_database_desc_refs"
restoreFunctionDescRefsKey = "restore_function_desc_refs"
)

var restoreStatsInsertionConcurrency = settings.RegisterIntSetting(
Expand Down Expand Up @@ -168,6 +179,242 @@ func restoreTempSystemName(

var laggingRestoreProcErr = errors.New("try re-planning due to lagging restore processors")

// getDescRefs reads the RestoreDescRefs proto stored at the given info-key for
// the given restore job. Returns (nil, false, nil) when the key is absent.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func getDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, infoKey string,
) ([]backuppb.RestoreDescRef, bool, error) {
raw, ok, err := jobs.InfoStorageForJob(txn, jobID).Get(ctx, "restore-desc-refs", infoKey)
if err != nil || !ok {
return nil, false, err
}
var msg backuppb.RestoreDescRefs
if err := protoutil.Unmarshal(raw, &msg); err != nil {
return nil, false, errors.Wrap(err, "decoding restore desc refs")
}
return msg.Refs, true, nil
}

// writeDescRefs marshals refs as a RestoreDescRefs proto and writes it to the
// given info-key for the restore job.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func writeDescRefs(
ctx context.Context,
txn isql.Txn,
jobID jobspb.JobID,
infoKey string,
refs []backuppb.RestoreDescRef,
) error {
bytes, err := protoutil.Marshal(&backuppb.RestoreDescRefs{Refs: refs})
if err != nil {
return errors.Wrap(err, "encoding restore desc refs")
}
return jobs.InfoStorageForJob(txn, jobID).Write(ctx, infoKey, bytes)
}

// descRefsFromTableDescs extracts (ID, Version) tuples from a table-descriptor
// slice for persisting via writeDescRefs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func descRefsFromTableDescs(descs []*descpb.TableDescriptor) []backuppb.RestoreDescRef {
out := make([]backuppb.RestoreDescRef, len(descs))
for i, d := range descs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out
}

// descRefsFromTypeDescs is the type-descriptor analog of
// descRefsFromTableDescs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func descRefsFromTypeDescs(descs []*descpb.TypeDescriptor) []backuppb.RestoreDescRef {
out := make([]backuppb.RestoreDescRef, len(descs))
for i, d := range descs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out
}

// descRefsFromSchemaDescs is the schema-descriptor analog of
// descRefsFromTableDescs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func descRefsFromSchemaDescs(descs []*descpb.SchemaDescriptor) []backuppb.RestoreDescRef {
out := make([]backuppb.RestoreDescRef, len(descs))
for i, d := range descs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out
}

// descRefsFromDatabaseDescs is the database-descriptor analog of
// descRefsFromTableDescs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func descRefsFromDatabaseDescs(descs []*descpb.DatabaseDescriptor) []backuppb.RestoreDescRef {
out := make([]backuppb.RestoreDescRef, len(descs))
for i, d := range descs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out
}

// descRefsFromFunctionDescs is the function-descriptor analog of
// descRefsFromTableDescs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func descRefsFromFunctionDescs(descs []*descpb.FunctionDescriptor) []backuppb.RestoreDescRef {
out := make([]backuppb.RestoreDescRef, len(descs))
for i, d := range descs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out
}

// tableDescRefs returns (ID, Version) tuples for tables this restore job is
// materializing. Prefers the dedicated info-key row; falls back to the legacy
// details.TableDescs slice for jobs created before info-key writes existed.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func tableDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
refs, ok, err := getDescRefs(ctx, txn, jobID, restoreTableDescRefsKey)
if err != nil {
return nil, err
}
if ok {
return refs, nil
}
out := make([]backuppb.RestoreDescRef, len(details.TableDescs))
for i, d := range details.TableDescs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out, nil
}

// typeDescRefs is the type-descriptor analog of tableDescRefs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func typeDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
refs, ok, err := getDescRefs(ctx, txn, jobID, restoreTypeDescRefsKey)
if err != nil {
return nil, err
}
if ok {
return refs, nil
}
out := make([]backuppb.RestoreDescRef, len(details.TypeDescs))
for i, d := range details.TypeDescs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out, nil
}

// schemaDescRefs is the schema-descriptor analog of tableDescRefs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func schemaDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
refs, ok, err := getDescRefs(ctx, txn, jobID, restoreSchemaDescRefsKey)
if err != nil {
return nil, err
}
if ok {
return refs, nil
}
out := make([]backuppb.RestoreDescRef, len(details.SchemaDescs))
for i, d := range details.SchemaDescs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out, nil
}

// databaseDescRefs is the database-descriptor analog of tableDescRefs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func databaseDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
refs, ok, err := getDescRefs(ctx, txn, jobID, restoreDatabaseDescRefsKey)
if err != nil {
return nil, err
}
if ok {
return refs, nil
}
out := make([]backuppb.RestoreDescRef, len(details.DatabaseDescs))
for i, d := range details.DatabaseDescs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out, nil
}

// functionDescRefs is the function-descriptor analog of tableDescRefs.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func functionDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
refs, ok, err := getDescRefs(ctx, txn, jobID, restoreFunctionDescRefsKey)
if err != nil {
return nil, err
}
if ok {
return refs, nil
}
out := make([]backuppb.RestoreDescRef, len(details.FunctionDescs))
for i, d := range details.FunctionDescs {
out[i] = backuppb.RestoreDescRef{ID: d.ID, Version: d.Version}
}
return out, nil
}

// allDescRefs returns (ID, Version) tuples for every descriptor this restore
// job is materializing — tables, types, schemas, databases, and functions —
// suitable for a single batched MutableByID().Descs fetch.
//
// TODO (kev-cao): remove this helper and flatten call-sites in 27.1+.
func allDescRefs(
ctx context.Context, txn isql.Txn, jobID jobspb.JobID, details jobspb.RestoreDetails,
) ([]backuppb.RestoreDescRef, error) {
tables, err := tableDescRefs(ctx, txn, jobID, details)
if err != nil {
return nil, err
}
types, err := typeDescRefs(ctx, txn, jobID, details)
if err != nil {
return nil, err
}
schemas, err := schemaDescRefs(ctx, txn, jobID, details)
if err != nil {
return nil, err
}
databases, err := databaseDescRefs(ctx, txn, jobID, details)
if err != nil {
return nil, err
}
functions, err := functionDescRefs(ctx, txn, jobID, details)
if err != nil {
return nil, err
}
out := make([]backuppb.RestoreDescRef, 0,
len(tables)+len(types)+len(schemas)+len(databases)+len(functions))
out = append(out, tables...)
out = append(out, types...)
out = append(out, schemas...)
out = append(out, databases...)
out = append(out, functions...)
return out, nil
}

// rewriteBackupSpanKey rewrites a backup span start key for the purposes of
// splitting up the target key-space to send out the actual work of restoring.
//
Expand Down Expand Up @@ -2079,6 +2326,36 @@ func createImportingDescriptors(
details.FunctionDescs[i] = fn.FuncDesc()
}

// Dual-write each descriptor type's (ID, Version) tuples to a
// dedicated system.job_info row alongside the legacy slice
// population above. The info-key rows are the long-term source of
// truth — later phases of the restore fetch descriptor bodies from
// KV by ID rather than relying on the full payloads on
// RestoreDetails — but the legacy slices remain populated here for
// mixed-version compatibility. A later commit gates the legacy
// writes off once the cluster has crossed the corresponding
// cluster version.
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreTableDescRefsKey,
descRefsFromTableDescs(details.TableDescs)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreTypeDescRefsKey,
descRefsFromTypeDescs(details.TypeDescs)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreSchemaDescRefsKey,
descRefsFromSchemaDescs(details.SchemaDescs)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreDatabaseDescRefsKey,
descRefsFromDatabaseDescs(details.DatabaseDescs)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreFunctionDescRefsKey,
descRefsFromFunctionDescs(details.FunctionDescs)); err != nil {
return err
}

// Update the job once all descs have been prepared for ingestion.
//
//lint:ignore SA1019 TODO: migrate to job_info_storage.go API
Expand Down Expand Up @@ -3318,6 +3595,31 @@ func (r *restoreResumer) publishDescriptors(
if details.OnlineImpl() {
details.PostDownloadTableAutoStatsSettings = tableAutoStatsSettings
}
// Dual-write the published (ID, Version) tuples to the dedicated
// info-key rows alongside the legacy slice updates above. See the
// matching block in createImportingDescriptors for the rationale; a
// later commit gates the legacy writes off once the cluster has
// crossed the corresponding cluster version.
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreTableDescRefsKey,
descRefsFromTableDescs(newTables)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreTypeDescRefsKey,
descRefsFromTypeDescs(newTypes)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreSchemaDescRefsKey,
descRefsFromSchemaDescs(newSchemas)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreDatabaseDescRefsKey,
descRefsFromDatabaseDescs(newDBs)); err != nil {
return err
}
if err := writeDescRefs(ctx, txn, r.job.ID(), restoreFunctionDescRefsKey,
descRefsFromFunctionDescs(newFunctions)); err != nil {
return err
}
//lint:ignore SA1019 TODO: migrate to job_info_storage.go API
if err := r.job.DeprecatedWithTxn(txn).SetDetails(ctx, details); err != nil {
return errors.Wrap(err,
Expand Down
9 changes: 9 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ const (
// to requiring REFERENCES on both the origin and referenced tables.
V26_3_GrantReferencesToUsersWithCreate

// V26_3_DescriptorIDsInRestoreDetails gates RESTORE's transition from
// embedding full descriptor payloads in RestoreDetails to persisting
// (ID, Version) tuples in dedicated system.job_info rows. Once a cluster
// is at this version, new RESTORE jobs only write the info-key rows and
// leave the legacy descriptor slices on RestoreDetails empty.
V26_3_DescriptorIDsInRestoreDetails

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -394,6 +401,8 @@ var versionTable = [numKeys]roachpb.Version{
V26_3_CrdbInternalTSDB: {Major: 26, Minor: 2, Internal: 12},

V26_3_GrantReferencesToUsersWithCreate: {Major: 26, Minor: 2, Internal: 14},

V26_3_DescriptorIDsInRestoreDetails: {Major: 26, Minor: 2, Internal: 16},
// *************************************************
// Step (2): Add new versions above this comment.
// *************************************************
Expand Down
Loading