Skip to content

Commit

Permalink
server,etcdutl: Preserve etcd version in backend allowing etcdutl to …
Browse files Browse the repository at this point in the history
…read it from snapshot
  • Loading branch information
serathius committed Jun 25, 2021
1 parent 3f02686 commit e2740b4
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 27 deletions.
3 changes: 2 additions & 1 deletion etcdutl/etcdutl/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func newPrinterUnsupported(n string) printer {
func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }

func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) {
hdr = []string{"hash", "revision", "total keys", "total size"}
hdr = []string{"hash", "revision", "total keys", "total size", "version"}
rows = append(rows, []string{
fmt.Sprintf("%x", ds.Hash),
fmt.Sprint(ds.Revision),
fmt.Sprint(ds.TotalKey),
humanize.Bytes(uint64(ds.TotalSize)),
ds.Version,
})
return hdr, rows
}
Expand Down
1 change: 1 addition & 0 deletions etcdutl/etcdutl/printer_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ func (p *fieldsPrinter) DBStatus(r snapshot.Status) {
fmt.Println(`"Revision" :`, r.Revision)
fmt.Println(`"Keys" :`, r.TotalKey)
fmt.Println(`"Size" :`, r.TotalSize)
fmt.Println(`"Version" :`, r.Version)
}
8 changes: 8 additions & 0 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
Expand Down Expand Up @@ -106,6 +107,9 @@ type Status struct {
Revision int64 `json:"revision"`
TotalKey int `json:"totalKey"`
TotalSize int64 `json:"totalSize"`
// Version is equal to storageVersion of the snapshot
// Empty if server does not supports versioned snapshots (<v3.6)
Version string `json:"version"`
}

// Status returns the snapshot file information.
Expand All @@ -132,6 +136,10 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
v := version.ReadStorageVersionFromSnapshot(tx)
if v != nil {
ds.Version = v.String()
}
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
Expand Down
10 changes: 3 additions & 7 deletions server/etcdserver/api/membership/confstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"go.uber.org/zap"
)

var (
confStateKey = []byte("confState")
)

// MustUnsafeSaveConfStateToBackend persists confState using given transaction (tx).
// confState in backend is persisted since etcd v3.5.
func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confState *raftpb.ConfState) {
Expand All @@ -36,20 +32,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}

tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes)
tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes)
}

// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(buckets.Meta, confStateKey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
if len(keys) == 0 {
return nil
}

if len(keys) != 1 {
lg.Panic(
"unexpected number of key: "+string(confStateKey)+" when getting cluster version from backend",
"unexpected number of key: "+string(buckets.MetaConfStateName)+" when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
Expand Down
14 changes: 10 additions & 4 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"

Expand Down Expand Up @@ -100,6 +101,11 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
const snapshotSendBufferSize = 32 * 1024

func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
ver := serverversion.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
storageVersion = ver.String()
}
snap := ms.bg.Backend().Snapshot()
pr, pw := io.Pipe()

Expand All @@ -125,7 +131,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
ms.lg.Info("sending database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("etcd-version", version.Version),
zap.String("storage-version", storageVersion),
)
for total-sent > 0 {
// buffer just holds read bytes from stream
Expand All @@ -152,7 +158,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
resp := &pb.SnapshotResponse{
RemainingBytes: uint64(total - sent),
Blob: buf[:n],
Version: version.Version,
Version: storageVersion,
}
if err = srv.Send(resp); err != nil {
return togRPCError(err)
Expand All @@ -168,7 +174,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
zap.Int64("total-bytes", total),
zap.Int("checksum-size", len(sha)),
)
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: version.Version}
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: storageVersion}
if err := srv.Send(hresp); err != nil {
return togRPCError(err)
}
Expand All @@ -177,7 +183,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("took", humanize.Time(start)),
zap.String("etcd-version", version.Version),
zap.String("storage-version", storageVersion),
)
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ type EtcdServer struct {
firstCommitInTermC chan struct{}

*AccessController

// Ensure that storage version is updated only once.
storageVersionUpdated sync.Once
}

type backendHooks struct {
Expand Down Expand Up @@ -2371,6 +2374,12 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
s.storageVersionUpdated.Do(func() {
err := serverversion.UpdateStorageVersion(s.lg, s.be.BatchTx())
if err != nil {
s.lg.Warn("failed to update storage version", zap.Error(err))
}
})

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand Down
16 changes: 16 additions & 0 deletions server/etcdserver/version/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package version provides functions for getting/saving storage version.
package version
108 changes: 108 additions & 0 deletions server/etcdserver/version/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package version

import (
"fmt"

"github.com/coreos/go-semver/semver"
"go.etcd.io/bbolt"
"go.uber.org/zap"

"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)

var (
V3_5 = semver.Version{Major: 3, Minor: 5}
V3_6 = semver.Version{Major: 3, Minor: 6}
)

// UpdateStorageVersion updates storage version.
func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
tx.Lock()
defer tx.Unlock()
v, err := detectStorageVersion(lg, tx)
if err != nil {
return fmt.Errorf("cannot determine storage version: %w", err)
}
switch *v {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
unsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
}
return nil
}

func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
v := unsafeReadStorageVersion(tx)
if v != nil {
return v, nil
}
_, cfs := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
if len(cfs) == 0 {
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
}
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
}
copied := V3_5
return &copied, nil
}

// ReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func ReadStorageVersion(tx backend.ReadTx) *semver.Version {
tx.Lock()
defer tx.Unlock()
return unsafeReadStorageVersion(tx)
}

// unsafeReadStorageVersion loads storage version from given backend transaction.
// Populated since v3.6
func unsafeReadStorageVersion(tx backend.ReadTx) *semver.Version {
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaStorageVersionName, nil, 1)
if len(vs) == 0 {
return nil
}
v, err := semver.NewVersion(string(vs[0]))
if err != nil {
return nil
}
return v
}

// ReadStorageVersionFromSnapshot loads storage version from given bbolt transaction.
// Populated since v3.6
func ReadStorageVersionFromSnapshot(tx *bbolt.Tx) *semver.Version {
v := tx.Bucket(buckets.Meta.Name()).Get(buckets.MetaStorageVersionName)
version, err := semver.NewVersion(string(v))
if err != nil {
return nil
}
return version
}

// unsafeSetStorageVersion updates etcd storage version in backend.
// Populated since v3.6
func unsafeSetStorageVersion(tx backend.BatchTx, v *semver.Version) {
sv := semver.Version{Major: v.Major, Minor: v.Minor}
tx.UnsafePut(buckets.Meta, buckets.MetaStorageVersionName, []byte(sv.String()))
}
Loading

0 comments on commit e2740b4

Please sign in to comment.