Skip to content

Commit

Permalink
[PBM-1223] capture oldest active transaction timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Feb 21, 2024
1 parent 0190573 commit 19db163
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/pbm/lock"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/oplog"
"github.com/percona/percona-backup-mongodb/pbm/storage"
"github.com/percona/percona-backup-mongodb/pbm/topo"
"github.com/percona/percona-backup-mongodb/pbm/util"
Expand Down Expand Up @@ -174,7 +175,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l
return errors.Wrap(err, "get cluster info")
}

oplogTS, err := topo.OpTimeFromNodeInfo(inf, true)
oplogTS, err := oplog.GetOplogStartTime(ctx, b.nodeConn)
if err != nil {
return errors.Wrap(err, "define oplog start position")
}
Expand Down Expand Up @@ -622,7 +623,7 @@ func (b *Backup) waitForFirstLastWrite(
return first, last, errors.Errorf("backup stuck, last beat ts: %d", bmeta.Hb.T)
}

if bmeta.FirstWriteTS.T > 0 && bmeta.LastWriteTS.T > 0 {
if bmeta.FirstWriteTS.T > 1 && bmeta.LastWriteTS.T > 1 {
return bmeta.FirstWriteTS, bmeta.LastWriteTS, nil
}
case <-ctx.Done():
Expand Down
70 changes: 70 additions & 0 deletions pbm/oplog/oplog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package oplog

import (
"context"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"

"github.com/percona/percona-backup-mongodb/pbm/errors"
)

var errNoTransaction = errors.New("no transaction found")

func GetOplogStartTime(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
ts, err := findTransactionStartTime(ctx, m)
if errors.Is(err, errNoTransaction) {
return findLastOplogTS(ctx, m)
}

return ts, nil
}

func findTransactionStartTime(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
coll := m.Database("config").Collection("transactions", options.Collection().SetReadConcern(readconcern.Local()))
f := bson.D{{"state", bson.D{{"$in", bson.A{"prepared", "inProgress"}}}}}
o := options.FindOne().SetSort(bson.D{{"startOpTime", 1}})
doc, err := coll.FindOne(ctx, f, o).Raw()
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return primitive.Timestamp{}, errNoTransaction
}
return primitive.Timestamp{}, errors.Wrap(err, "query transactions")
}

rawTS, err := doc.LookupErr("startOpTime", "ts")
if err != nil {
return primitive.Timestamp{}, errors.Wrap(err, "lookup timestamp")
}

t, i, ok := rawTS.TimestampOK()
if !ok {
return primitive.Timestamp{}, errors.Wrap(err, "parse timestamp")
}

return primitive.Timestamp{T: t, I: i}, nil
}

func findLastOplogTS(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
coll := m.Database("local").Collection("oplog.rs")
o := options.FindOne().SetSort(bson.M{"$natural": -1})
doc, err := coll.FindOne(ctx, bson.D{}, o).Raw()
if err != nil {
return primitive.Timestamp{}, errors.Wrap(err, "query oplog")
}

rawTS, err := doc.LookupErr("ts")
if err != nil {
return primitive.Timestamp{}, errors.Wrap(err, "lookup oplog ts")
}

t, i, ok := rawTS.TimestampOK()
if !ok {
return primitive.Timestamp{}, errors.Wrap(err, "parse oplog ts")
}

return primitive.Timestamp{T: t, I: i}, nil
}

0 comments on commit 19db163

Please sign in to comment.