Skip to content

Commit 1ace7f9

Browse files
craig[bot]pav-kv
andcommitted
Merge #155436
155436: kvserver: add replica info struct for destruction r=arulajmani a=pav-kv Related to #152845 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 4b80cd5 + 8f21d57 commit 1ace7f9

File tree

7 files changed

+105
-72
lines changed

7 files changed

+105
-72
lines changed

pkg/kv/kvserver/kvstorage/destroy.go

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,49 +108,60 @@ func ClearRangeData(
108108
// atomically, and 1 is not written.
109109
const DestroyReplicaTODO = 0
110110

111+
// DestroyReplicaInfo contains the replica's metadata needed for its removal
112+
// from storage.
113+
//
114+
// TODO(pav-kv): for separated storage, add the applied raft log span. #152845
115+
type DestroyReplicaInfo struct {
116+
// FullReplicaID identifies the replica on its store.
117+
roachpb.FullReplicaID
118+
// Keys is the user key span of this replica, taken from its RangeDescriptor.
119+
// Non-empty iff the replica is initialized.
120+
Keys roachpb.RSpan
121+
}
122+
111123
// DestroyReplica destroys all or a part of the Replica's state, installing a
112-
// RangeTombstone in its place. Due to merges, splits, etc, there is a need
113-
// to control which part of the state this method actually gets to remove,
114-
// which is done via the provided options[^1]; the caller is always responsible
115-
// for managing the remaining disk state accordingly.
124+
// RangeTombstone in its place. Due to merges, splits, etc, there is a need to
125+
// control which part of the state this method actually gets to remove, which is
126+
// done via the provided options[^1]; the caller is always responsible for
127+
// managing the remaining disk state accordingly.
116128
//
117129
// [^1] e.g., on a merge, the user data moves to the subsuming replica and must
118130
// not be cleared.
119131
func DestroyReplica(
120132
ctx context.Context,
121-
id roachpb.FullReplicaID,
122133
reader storage.Reader,
123134
writer storage.Writer,
124-
nextReplicaID roachpb.ReplicaID,
135+
info DestroyReplicaInfo,
136+
next roachpb.ReplicaID,
125137
opts ClearRangeDataOptions,
126138
) error {
127-
diskReplicaID, err := stateloader.Make(id.RangeID).LoadRaftReplicaID(ctx, reader)
128-
if err != nil {
129-
return err
139+
if next <= info.ReplicaID {
140+
return errors.AssertionFailedf("%v must not survive its own tombstone", info.FullReplicaID)
130141
}
131-
if repID := diskReplicaID.ReplicaID; repID != id.ReplicaID {
132-
return errors.AssertionFailedf("replica %v has a mismatching ID %d", id, repID)
133-
} else if repID >= nextReplicaID {
134-
return errors.AssertionFailedf("replica %v must not survive its own tombstone", id)
142+
sl := stateloader.Make(info.RangeID)
143+
// Assert that the ReplicaID in storage matches the one being removed.
144+
if loaded, err := sl.LoadRaftReplicaID(ctx, reader); err != nil {
145+
return err
146+
} else if id := loaded.ReplicaID; id != info.ReplicaID {
147+
return errors.AssertionFailedf("%v has a mismatching ID %d", info.FullReplicaID, id)
135148
}
136-
_ = DestroyReplicaTODO // 2.1 + 2.2 + 3.1
137-
if err := ClearRangeData(ctx, id.RangeID, reader, writer, opts); err != nil {
149+
// Assert that the provided tombstone moves the existing one strictly forward.
150+
// A failure would indicate that something is wrong in the replica lifecycle.
151+
if ts, err := sl.LoadRangeTombstone(ctx, reader); err != nil {
138152
return err
153+
} else if next <= ts.NextReplicaID {
154+
return errors.AssertionFailedf("%v cannot rewind tombstone from %d to %d",
155+
info.FullReplicaID, ts.NextReplicaID, next)
139156
}
140157

141-
// Save a tombstone to ensure that replica IDs never get reused. Assert that
142-
// the provided tombstone moves the existing one strictly forward. Failure to
143-
// do so indicates that something is going wrong in the replica lifecycle.
144-
sl := stateloader.Make(id.RangeID)
145-
ts, err := sl.LoadRangeTombstone(ctx, reader)
146-
if err != nil {
158+
_ = DestroyReplicaTODO // 2.1 + 2.2 + 3.1
159+
if err := ClearRangeData(ctx, info.RangeID, reader, writer, opts); err != nil {
147160
return err
148-
} else if ts.NextReplicaID >= nextReplicaID {
149-
return errors.AssertionFailedf(
150-
"cannot rewind tombstone from %d to %d", ts.NextReplicaID, nextReplicaID)
151161
}
162+
// Save a tombstone to ensure that replica IDs never get reused.
152163
_ = DestroyReplicaTODO // 2.3
153164
return sl.SetRangeTombstone(ctx, writer, kvserverpb.RangeTombstone{
154-
NextReplicaID: nextReplicaID, // NB: nextReplicaID > 0
165+
NextReplicaID: next, // NB: NextReplicaID > 0
155166
})
156167
}

pkg/kv/kvserver/kvstorage/destroy_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,15 @@ func TestDestroyReplica(t *testing.T) {
6868
r.createStateMachine(ctx, t, rw)
6969
})
7070
mutate("destroy", func(rw storage.ReadWriter) {
71-
require.NoError(t, DestroyReplica(ctx, r.id, rw, rw, r.id.ReplicaID+1, ClearRangeDataOptions{
72-
ClearUnreplicatedByRangeID: true,
73-
ClearReplicatedByRangeID: true,
74-
ClearReplicatedBySpan: r.keys,
75-
}))
71+
require.NoError(t, DestroyReplica(
72+
ctx, rw, rw,
73+
DestroyReplicaInfo{FullReplicaID: r.id, Keys: r.keys}, r.id.ReplicaID+1,
74+
ClearRangeDataOptions{
75+
ClearUnreplicatedByRangeID: true,
76+
ClearReplicatedByRangeID: true,
77+
ClearReplicatedBySpan: r.keys,
78+
},
79+
))
7680
})
7781

7882
str := strings.ReplaceAll(sb.String(), "\n\n", "\n")

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,15 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
364364
// required for correctness, since the merge protocol should guarantee that
365365
// no new replicas of the RHS can ever be created, but it doesn't hurt to
366366
// be careful.
367-
if err := kvstorage.DestroyReplica(ctx, rhsRepl.ID(), b.batch, b.batch, mergedTombstoneReplicaID, kvstorage.ClearRangeDataOptions{
368-
ClearReplicatedByRangeID: true,
369-
ClearUnreplicatedByRangeID: true,
370-
}); err != nil {
367+
if err := kvstorage.DestroyReplica(
368+
ctx, b.batch, b.batch,
369+
rhsRepl.destroyInfoRaftMuLocked(),
370+
mergedTombstoneReplicaID,
371+
kvstorage.ClearRangeDataOptions{
372+
ClearReplicatedByRangeID: true,
373+
ClearUnreplicatedByRangeID: true,
374+
},
375+
); err != nil {
371376
return errors.Wrapf(err, "unable to destroy replica before merge")
372377
}
373378

@@ -455,11 +460,16 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
455460
// We've set the replica's in-mem status to reflect the pending destruction
456461
// above, and DestroyReplica will also add a range tombstone to the
457462
// batch, so that when we commit it, the removal is finalized.
458-
if err := kvstorage.DestroyReplica(ctx, b.r.ID(), b.batch, b.batch, change.NextReplicaID(), kvstorage.ClearRangeDataOptions{
459-
ClearReplicatedBySpan: span,
460-
ClearReplicatedByRangeID: true,
461-
ClearUnreplicatedByRangeID: true,
462-
}); err != nil {
463+
if err := kvstorage.DestroyReplica(
464+
ctx, b.batch, b.batch,
465+
b.r.destroyInfoRaftMuLocked(),
466+
change.NextReplicaID(),
467+
kvstorage.ClearRangeDataOptions{
468+
ClearReplicatedBySpan: span,
469+
ClearReplicatedByRangeID: true,
470+
ClearUnreplicatedByRangeID: true,
471+
},
472+
); err != nil {
463473
return errors.Wrapf(err, "unable to destroy replica before removal")
464474
}
465475
}

pkg/kv/kvserver/replica_destroy.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
111111
ClearUnreplicatedByRangeID: true,
112112
}
113113
// TODO(sep-raft-log): need both engines separately here.
114-
if err := kvstorage.DestroyReplica(ctx, r.ID(), r.store.TODOEngine(), batch, nextReplicaID, opts); err != nil {
114+
if err := kvstorage.DestroyReplica(
115+
ctx, r.store.TODOEngine(), batch,
116+
r.destroyInfoRaftMuLocked(), nextReplicaID, opts,
117+
); err != nil {
115118
return err
116119
}
117120
preTime := timeutil.Now()

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/keys"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/snaprecv"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
@@ -568,7 +569,7 @@ func (r *Replica) applySnapshotRaftMuLocked(
568569
Term: kvpb.RaftTerm(nonemptySnap.Metadata.Term),
569570
}
570571

571-
subsume := make([]destroyReplicaInfo, 0, len(subsumedRepls))
572+
subsume := make([]kvstorage.DestroyReplicaInfo, 0, len(subsumedRepls))
572573
for _, sr := range subsumedRepls {
573574
// We mark the replica as destroyed so that new commands are not
574575
// accepted. This destroy status will be detected after the batch
@@ -582,19 +583,18 @@ func (r *Replica) applySnapshotRaftMuLocked(
582583
sr.shMu.destroyStatus.Set(
583584
kvpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()),
584585
destroyReasonRemoved)
585-
srDesc := sr.descRLocked()
586586
sr.mu.Unlock()
587587
sr.readOnlyCmdMu.Unlock()
588588

589-
subsume = append(subsume, destroyReplicaInfo{id: sr.ID(), desc: srDesc})
589+
subsume = append(subsume, sr.destroyInfoRaftMuLocked())
590590
}
591591

592-
// NB: subsumedDescs in snapWriteBuilder must be sorted by start key. This
592+
// NB: subsumed replicas in snapWriteBuilder must be sorted by start key. This
593593
// should be the case, by construction, but add a test-only assertion just in
594594
// case this ever changes.
595-
testingAssert(slices.IsSortedFunc(subsume, func(a, b destroyReplicaInfo) int {
596-
return a.desc.StartKey.Compare(b.desc.StartKey)
597-
}), "subsumedDescs must be sorted by start key")
595+
testingAssert(slices.IsSortedFunc(subsume, func(a, b kvstorage.DestroyReplicaInfo) int {
596+
return a.Keys.Key.Compare(b.Keys.Key)
597+
}), "subsumed replicas must be sorted by start key")
598598

599599
sb := snapWriteBuilder{
600600
id: r.ID(),
@@ -842,3 +842,17 @@ func testingAssert(cond bool, msg string) {
842842
panic(msg)
843843
}
844844
}
845+
846+
// destroyInfoRaftMuLocked returns the information necessary for constructing a
847+
// storage write destroying this replica.
848+
//
849+
// NB: since raftMu is locked, there is no concurrent write that would be able
850+
// to change this replica. In particular, no concurrent log truncations. The
851+
// caller must make sure to complete the destruction before raftMu is released.
852+
func (r *Replica) destroyInfoRaftMuLocked() kvstorage.DestroyReplicaInfo {
853+
r.raftMu.AssertHeld()
854+
return kvstorage.DestroyReplicaInfo{
855+
FullReplicaID: r.ID(),
856+
Keys: r.shMu.state.Desc.RSpan(),
857+
}
858+
}

pkg/kv/kvserver/snapshot_apply_prepare.go

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,6 @@ import (
2020
"github.com/cockroachdb/errors"
2121
)
2222

23-
// destroyReplicaInfo contains the replica's metadata needed for its removal
24-
// from storage.
25-
// TODO(pav-kv): for WAG, add the truncated state and applied index. See #152845.
26-
type destroyReplicaInfo struct {
27-
id roachpb.FullReplicaID
28-
desc *roachpb.RangeDescriptor
29-
}
30-
3123
// snapWriteBuilder contains the data needed to prepare the on-disk state for a
3224
// snapshot.
3325
type snapWriteBuilder struct {
@@ -41,9 +33,8 @@ type snapWriteBuilder struct {
4133
hardState raftpb.HardState
4234
desc *roachpb.RangeDescriptor // corresponds to the range descriptor in the snapshot
4335
origDesc *roachpb.RangeDescriptor // pre-snapshot range descriptor
44-
// NB: subsume, if set, must be in sorted (by destroyReplicaInfo.desc start
45-
// key) order.
46-
subsume []destroyReplicaInfo
36+
// NB: subsume must be in sorted order by DestroyReplicaInfo start key.
37+
subsume []kvstorage.DestroyReplicaInfo
4738

4839
// cleared contains the spans that this snapshot application clears before
4940
// writing new state on top.
@@ -112,7 +103,7 @@ func (s *snapWriteBuilder) rewriteRaftState(ctx context.Context, w storage.Write
112103
// the Reader reflects the latest I/O each of the subsumed replicas has done
113104
// (i.e. Reader was instantiated after all raftMu were acquired).
114105
//
115-
// NB: does nothing if s.subsumedDescs is empty.
106+
// NB: does nothing if there are no subsumed replicas.
116107
func (s *snapWriteBuilder) clearSubsumedReplicaDiskData(ctx context.Context) error {
117108
if len(s.subsume) == 0 {
118109
return nil // no subsumed replicas to speak of; early return
@@ -123,10 +114,10 @@ func (s *snapWriteBuilder) clearSubsumedReplicaDiskData(ctx context.Context) err
123114
// the left implies that either we merged "to the left" (we don't), or that
124115
// we're applying a snapshot for another range (we don't do that either).
125116
// Something is severely wrong for this to happen, so perform a sanity check.
126-
if s.subsume[0].desc.StartKey.Compare(s.desc.StartKey) < 0 { // subsumedDescs are sorted by StartKey
117+
if s.subsume[0].Keys.Key.Compare(s.desc.StartKey) < 0 { // subsume is sorted by start key
127118
log.KvDistribution.Fatalf(ctx,
128119
"subsuming replica to our left; subsumed desc start key: %v; snapshot desc start key %v",
129-
s.subsume[0].desc.StartKey, s.desc.StartKey,
120+
s.subsume[0].Keys.Key, s.desc.StartKey,
130121
)
131122
}
132123

@@ -167,12 +158,12 @@ func (s *snapWriteBuilder) clearSubsumedReplicaDiskData(ctx context.Context) err
167158
ClearUnreplicatedByRangeID: true,
168159
MustUseClearRange: true,
169160
}
170-
s.cleared = append(s.cleared, rditer.Select(sub.id.RangeID, rditer.SelectOpts{
161+
s.cleared = append(s.cleared, rditer.Select(sub.RangeID, rditer.SelectOpts{
171162
ReplicatedByRangeID: opts.ClearReplicatedByRangeID,
172163
UnreplicatedByRangeID: opts.ClearUnreplicatedByRangeID,
173164
})...)
174165
// NB: Actually clear RangeID local key spans.
175-
return kvstorage.DestroyReplica(ctx, sub.id, reader, w, mergedTombstoneReplicaID, opts)
166+
return kvstorage.DestroyReplica(ctx, reader, w, sub, mergedTombstoneReplicaID, opts)
176167
}); err != nil {
177168
return err
178169
}
@@ -223,23 +214,22 @@ func (s *snapWriteBuilder) clearResidualDataOnNarrowSnapshot(ctx context.Context
223214
return nil
224215
}
225216

226-
rightMostDesc := s.origDesc
217+
endKey := s.origDesc.EndKey
227218
if len(s.subsume) != 0 {
228219
// NB: s.subsume are non-overlapping and sorted by start key. Pick the last
229220
// one to determine whether the snapshot is narrowing the keyspace or not.
230-
rightMostDesc = s.subsume[len(s.subsume)-1].desc
221+
endKey = s.subsume[len(s.subsume)-1].Keys.EndKey
231222
}
232223

233-
if rightMostDesc.EndKey.Compare(s.desc.EndKey) <= 0 {
224+
if endKey.Compare(s.desc.EndKey) <= 0 {
234225
return nil // we aren't narrowing anything; no-op
235226
}
236227

237228
// TODO(sep-raft-log): read from the state machine engine here.
238229
reader := storage.Reader(s.todoEng)
239230
for _, span := range rditer.Select(0, rditer.SelectOpts{
240-
Ranged: rditer.SelectRangedOptions{RSpan: roachpb.RSpan{
241-
Key: s.desc.EndKey, EndKey: rightMostDesc.EndKey,
242-
},
231+
Ranged: rditer.SelectRangedOptions{
232+
RSpan: roachpb.RSpan{Key: s.desc.EndKey, EndKey: endKey},
243233
SystemKeys: true,
244234
LockTable: true,
245235
UserKeys: true,

pkg/kv/kvserver/snapshot_apply_prepare_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/keys"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/print"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
@@ -93,9 +94,9 @@ func TestPrepareSnapApply(t *testing.T) {
9394
hardState: raftpb.HardState{Term: 20, Commit: 100},
9495
desc: desc(id.RangeID, "a", "k"),
9596
origDesc: desc(id.RangeID, "a", "k"),
96-
subsume: []destroyReplicaInfo{
97-
{id: roachpb.FullReplicaID{RangeID: descA.RangeID, ReplicaID: replicaID}, desc: descA},
98-
{id: roachpb.FullReplicaID{RangeID: descB.RangeID, ReplicaID: replicaID}, desc: descB},
97+
subsume: []kvstorage.DestroyReplicaInfo{
98+
{FullReplicaID: roachpb.FullReplicaID{RangeID: descA.RangeID, ReplicaID: replicaID}, Keys: descA.RSpan()},
99+
{FullReplicaID: roachpb.FullReplicaID{RangeID: descB.RangeID, ReplicaID: replicaID}, Keys: descB.RSpan()},
99100
},
100101
}
101102

0 commit comments

Comments
 (0)