Skip to content

Commit

Permalink
delete backups by type
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Nov 27, 2023
1 parent e718479 commit fc56fe0
Show file tree
Hide file tree
Showing 26 changed files with 1,377 additions and 895 deletions.
243 changes: 0 additions & 243 deletions cmd/pbm-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"

"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"golang.org/x/sync/errgroup"

"github.com/percona/percona-backup-mongodb/internal"
"github.com/percona/percona-backup-mongodb/internal/backup"
"github.com/percona/percona-backup-mongodb/internal/config"
"github.com/percona/percona-backup-mongodb/internal/connect"
"github.com/percona/percona-backup-mongodb/internal/ctrl"
"github.com/percona/percona-backup-mongodb/internal/defs"
"github.com/percona/percona-backup-mongodb/internal/errors"
"github.com/percona/percona-backup-mongodb/internal/lock"
"github.com/percona/percona-backup-mongodb/internal/log"
"github.com/percona/percona-backup-mongodb/internal/oplog/oplogtmp"
"github.com/percona/percona-backup-mongodb/internal/resync"
"github.com/percona/percona-backup-mongodb/internal/storage"
"github.com/percona/percona-backup-mongodb/internal/topo"
Expand Down Expand Up @@ -158,244 +153,6 @@ func (a *Agent) Start(ctx context.Context) error {
}
}

// Delete deletes backup(s) from the store and cleans up its metadata
func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if d == nil {
l := logger.NewEvent(string(ctrl.CmdDeleteBackup), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := logger.NewEvent(string(ctrl.CmdDeleteBackup), "", opid.String(), ep.TS())
ctx = log.SetLogEventToContext(ctx, l)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info data: %v", err)
return
}

if !nodeInfo.IsLeader() {
l.Info("not a member of the leader rs, skipping")
return
}

epts := ep.TS()
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
Replset: a.brief.SetName,
Node: a.brief.Me,
Type: ctrl.CmdDeleteBackup,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(ctx, lock, l, nil)
if err != nil {
l.Error("acquire lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
return
}
defer func() {
if err := lock.Release(); err != nil {
l.Error("release lock: %v", err)
}
}()

switch {
case d.OlderThan > 0:
t := time.Unix(d.OlderThan, 0).UTC()
obj := t.Format("2006-01-02T15:04:05Z")
l = logger.NewEvent(string(ctrl.CmdDeleteBackup), obj, opid.String(), ep.TS())
l.Info("deleting backups older than %v", t)
err := backup.DeleteOlderThan(ctx, a.leadConn, t, l)
if err != nil {
l.Error("deleting: %v", err)
return
}
case d.Backup != "":
l = logger.NewEvent(string(ctrl.CmdDeleteBackup), d.Backup, opid.String(), ep.TS())
l.Info("deleting backup")
err := backup.DeleteBackup(ctx, a.leadConn, d.Backup, l)
if err != nil {
l.Error("deleting: %v", err)
return
}
default:
l.Error("malformed command received in Delete() of backup: %v", d)
return
}

l.Info("done")
}

// DeletePITR deletes PITR chunks from the store and cleans up its metadata
func (a *Agent) DeletePITR(ctx context.Context, d *ctrl.DeletePITRCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if d == nil {
l := logger.NewEvent(string(ctrl.CmdDeletePITR), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := logger.NewEvent(string(ctrl.CmdDeletePITR), "", opid.String(), ep.TS())
ctx = log.SetLogEventToContext(ctx, l)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info data: %v", err)
return
}

if !nodeInfo.IsLeader() {
l.Info("not a member of the leader rs, skipping")
return
}

epts := ep.TS()
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
Replset: a.brief.SetName,
Node: a.brief.Me,
Type: ctrl.CmdDeletePITR,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(ctx, lock, l, nil)
if err != nil {
l.Error("acquire lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
return
}
defer func() {
if err := lock.Release(); err != nil {
l.Error("release lock: %v", err)
}
}()

if d.OlderThan > 0 {
t := time.Unix(d.OlderThan, 0).UTC()
obj := t.Format("2006-01-02T15:04:05Z")
l = logger.NewEvent(string(ctrl.CmdDeletePITR), obj, opid.String(), ep.TS())
l.Info("deleting pitr chunks older than %v", t)
err = oplogtmp.DeletePITR(ctx, a.leadConn, &t, l)
} else {
l = logger.NewEvent(string(ctrl.CmdDeletePITR), "_all_", opid.String(), ep.TS())
l.Info("deleting all pitr chunks")
err = oplogtmp.DeletePITR(ctx, a.leadConn, nil, l)
}
if err != nil {
l.Error("deleting: %v", err)
return
}

l.Info("done")
}

// Cleanup deletes backups and PITR chunks from the store and cleans up its metadata
func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if d == nil {
l := logger.NewEvent(string(ctrl.CmdCleanup), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := logger.NewEvent(string(ctrl.CmdCleanup), "", opid.String(), ep.TS())
ctx = log.SetLogEventToContext(ctx, l)

if d == nil {
l.Error("missed command")
return
}

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info data: %v", err)
return
}
if !nodeInfo.IsLeader() {
l.Info("not a member of the leader rs, skipping")
return
}

epts := ep.TS()
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
Replset: a.brief.SetName,
Node: a.brief.Me,
Type: ctrl.CmdCleanup,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(ctx, lock, l, nil)
if err != nil {
l.Error("acquire lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
return
}
defer func() {
if err := lock.Release(); err != nil {
l.Error("release lock: %v", err)
}
}()

stg, err := util.GetStorage(ctx, a.leadConn, l)
if err != nil {
l.Error("get storage: " + err.Error())
}

eg := errgroup.Group{}
eg.SetLimit(runtime.NumCPU())

cr, err := internal.MakeCleanupInfo(ctx, a.leadConn, d.OlderThan)
if err != nil {
l.Error("make cleanup report: " + err.Error())
return
}

for i := range cr.Chunks {
name := cr.Chunks[i].FName

eg.Go(func() error {
err := stg.Delete(name)
return errors.Wrapf(err, "delete chunk file %q", name)
})
}
if err := eg.Wait(); err != nil {
l.Error(err.Error())
}

for i := range cr.Backups {
bcp := &cr.Backups[i]

eg.Go(func() error {
err := backup.DeleteBackupFiles(bcp, stg)
return errors.Wrapf(err, "delete backup files %q", bcp.Name)
})
}
if err := eg.Wait(); err != nil {
l.Error(err.Error())
}

err = resync.ResyncStorage(ctx, a.leadConn, l)
if err != nil {
l.Error("storage resync: " + err.Error())
}
}

// Resync uploads a backup list from the remote store
func (a *Agent) Resync(ctx context.Context, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}

l := logger.NewEvent(string(ctrl.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLoggerToContext(ctx, logger)
ctx = log.SetLogEventToContext(ctx, l)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
Expand Down
Loading

0 comments on commit fc56fe0

Please sign in to comment.