Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistent state between WAL and saved Snapshot #3584

Open
wants to merge 2 commits into
base: release-2.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 61 additions & 36 deletions orderer/consensus/etcdraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,9 @@ func CreateStorage(
return nil, err
}

snapshot, err := sn.Load()
snapshot, w, st, ents, err := loadNewestAvailableSnapshot(lg, walDir, snapDir)
if err != nil {
if err == snap.ErrNoSnapshot {
lg.Debugf("No snapshot found at %s", snapDir)
} else {
return nil, errors.Errorf("failed to load snapshot: %s", err)
}
} else {
// snapshot found
lg.Debugf("Loaded snapshot at Term %d and Index %d, Nodes: %+v",
snapshot.Metadata.Term, snapshot.Metadata.Index, snapshot.Metadata.ConfState.Nodes)
}

w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
if err != nil {
return nil, errors.Errorf("failed to create or read WAL: %s", err)
return nil, errors.Errorf("Failed to load snapshot and WAL: %s", err)
}

if snapshot != nil {
Expand Down Expand Up @@ -120,26 +107,11 @@ func CreateStorage(
// ListSnapshots returns a list of RaftIndex of snapshots stored on disk.
// If a file is corrupted, rename the file.
func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64 {
dir, err := os.Open(snapDir)
snapfiles, err := listSnapshotFiles(logger, snapDir)
if err != nil {
logger.Errorf("Failed to open snapshot directory %s: %s", snapDir, err)
logger.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
return nil
}
defer dir.Close()

filenames, err := dir.Readdirnames(-1)
if err != nil {
logger.Errorf("Failed to read snapshot files: %s", err)
return nil
}

snapfiles := []string{}
for i := range filenames {
if strings.HasSuffix(filenames[i], ".snap") {
snapfiles = append(snapfiles, filenames[i])
}
}
sort.Strings(snapfiles)

var snapshots []uint64
for _, snapfile := range snapfiles {
Expand Down Expand Up @@ -242,15 +214,17 @@ func (rs *RaftStorage) Snapshot() raftpb.Snapshot {

// Store persists etcd/raft data
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}

if !raft.IsEmptySnap(snapshot) {
if err := rs.saveSnap(snapshot); err != nil {
Copy link
Contributor

@Param-S Param-S Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we need to swap the order of writing the snapshot entries and snapshot file(needs change in saveSnap) as it is done in etcdserver https://github.com/etcd-io/etcd/blob/6c2f5dc78af6b6970d48cecaac515c58a91efca8/server/storage/storage.go#L66

return err
}
}

if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}

if !raft.IsEmptySnap(snapshot) {
if err := rs.ram.ApplySnapshot(snapshot); err != nil {
if err == raft.ErrSnapOutOfDate {
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
Expand Down Expand Up @@ -447,3 +421,54 @@ func (rs *RaftStorage) Close() error {

return nil
}

func loadNewestAvailableSnapshot(lg *flogging.FabricLogger, walDir, snapDir string) (*raftpb.Snapshot, *wal.WAL, raftpb.HardState, []raftpb.Entry, error) {
snapfiles, err := listSnapshotFiles(lg, snapDir)
if err != nil {
lg.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
}
for i := len(snapfiles) - 1; i >= 0; i-- {
snapshot, err := snap.Read(lg.Zap(), filepath.Join(snapDir, snapfiles[i]))
if err != nil {
lg.Warnf("Can not read snapshot from %s: %s", snapfiles[i], err)
continue
}
w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
if err != nil {
lg.Warnf("Create or read wal error: %s", err)
continue
}
if snapshot.Metadata.Index <= st.Commit {
return snapshot, w, st, ents, nil
}
if err := w.Close(); err != nil {
return nil, nil, raftpb.HardState{}, nil, err
}
}
lg.Warnf("Not available snapshot found in %s", snapDir)
w, st, ents, err := createOrReadWAL(lg, walDir, nil)
return nil, w, st, ents, err
}

func listSnapshotFiles(logging *flogging.FabricLogger, snapDir string) ([]string, error) {
dir, err := os.Open(snapDir)
if err != nil {
return nil, err
}
defer dir.Close()

filenames, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}

snapfiles := []string{}
for i := range filenames {
if strings.HasSuffix(filenames[i], ".snap") {
snapfiles = append(snapfiles, filenames[i])
}
}
sort.Strings(snapfiles)

return snapfiles, nil
}
89 changes: 84 additions & 5 deletions orderer/consensus/etcdraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestOpenWAL(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how this test demonstrates the problem you describe.

Can you reproduce the problem in a unit test?

[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 10)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestApplyOutOfDateSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand All @@ -395,3 +395,82 @@ func TestApplyOutOfDateSnapshot(t *testing.T) {
assertFileCount(t, 12, 1)
})
}

func TestAbortWhenWritingSnapshot(t *testing.T) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the unit test to reproduce the problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks but I meant a unit test that uses an instance of a Fabric Raft chain, in orderer/consensus/etcdraft/chain.go, not a unit test that uses pure etcd.io/raft packages.

We need to be sure that a Fabric Raft chain instance can encounter the bespoken problem, so that:

  1. We know we really have a problem, because it might be that Fabric sidesteps this problem via some mechanism and etcd has this problem.
  2. If the problem occurs in a later point due to a code change, a unit test will notify us.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem can be reproduced with the following steps:

  1. Orderer A has not participated in consensus for a while.

  2. Other orderers generate new blocks in consensus.

  3. Other orderers generate a new snapshot.

  4. Orderer A back to normal and receives the new snapshot from other orderers.

  5. Orderer A persists the new snapshot but crashes before calling rs.saveSnap(snapshot).

  6. Orderer A restarts.

t.Run("Abort when writing snapshot", func(t *testing.T) {
setup(t)
defer clean(t)

// set SegmentSizeBytes to a small value so that
// every entry persisted to wal would result in
// a new wal being created.
oldSegmentSizeBytes := wal.SegmentSizeBytes
wal.SegmentSizeBytes = 10
defer func() {
wal.SegmentSizeBytes = oldSegmentSizeBytes
}()

// create 5 new entry
for i := 0; i < 5; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
assertFileCount(t, 6, 0)

// Assume an orderer missed some records due to exceptions and receives a new snapshot from other orderers.
commit := 10
store.Store(
[]raftpb.Entry{},
raftpb.HardState{Commit: uint64(commit)},
raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: uint64(commit),
},
Data: make([]byte, 100),
},
)
err = store.Close()
assert.NoError(t, err)

// In old logic, it will use rs.wal.Save(hardstate, entries) to save the state firstly, so we remove the snapshot files.
// sd, err := os.Open(snapDir)
// assert.NoError(t, err)
// defer sd.Close()
// names, err := sd.Readdirnames(-1)
// assert.NoError(t, err)
// sort.Sort(sort.Reverse(sort.StringSlice(names)))
// os.Remove(filepath.Join(snapDir, names[0]))
// wd, err := os.Open(walDir)
// assert.NoError(t, err)
// defer wd.Close()
// names, err = wd.Readdirnames(-1)
// assert.NoError(t, err)
// sort.Sort(sort.Reverse(sort.StringSlice(names)))
// os.Remove(filepath.Join(walDir, names[0]))

// But in the new logic, it will use rs.saveSnap(snapshot) to save the snapshot firstly, so we remove the WAL files.
wd, err := os.Open(walDir)
assert.NoError(t, err)
defer wd.Close()
names, err := wd.Readdirnames(-1)
assert.NoError(t, err)
sort.Sort(sort.Reverse(sort.StringSlice(names)))
os.Remove(filepath.Join(walDir, names[0]))

// Then restart the orderer.
ram := raft.NewMemoryStorage()
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)

// Check the state from go.etcd.io/etcd/raft/raft.go
// func (r *raft) loadState(state pb.HardState)
hd, _, err := store.ram.InitialState()
assert.NoError(t, err)
lastIndex, err := store.ram.LastIndex()
assert.NoError(t, err)
assert.False(t, hd.Commit > lastIndex)
})
}