Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3865,7 +3865,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

testutils.RunTrueAndFalse(t, "rebalanceRHSAway", func(t *testing.T, rebalanceRHSAway bool) {
// We will be testing the SSTs written on store3's engine.
var receivingEng, sendingEng storage.Engine
var receivingStateEng, receivingLogEng, sendingStateEng storage.Engine
// All of these variables will be populated later, after starting the
// cluster.
var keyStart, keyA, keyB, keyC, keyD, keyEnd roachpb.Key
Expand Down Expand Up @@ -3925,7 +3925,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Construct SSTs for the the first 4 bullets as numbered above, but
// only ultimately keep the last one.
snapReader := sendingEng.NewSnapshot()
snapReader := sendingStateEng.NewSnapshot()
defer snapReader.Close()

// Write a Pebble range deletion tombstone to each of the SSTs then
Expand Down Expand Up @@ -4018,9 +4018,15 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
kvserverpb.RangeTombstone{NextReplicaID: math.MaxInt32},
))
// Ditto for the unreplicated RangeID keys. Note that it is also split
// into two range clears, to work around the RaftReplicaID key.
// into three range clears, to work around the RaftReplicaID key and
// to force all raft log clearing to go through logstore.
require.NoError(t, sst.ClearRawRange(
keys.RaftHardStateKey(rangeID), sl.RaftReplicaIDKey(), true, false,
keys.RaftHardStateKey(rangeID), sl.RaftLogPrefix(), true, false,
))
require.NoError(t, storage.ClearRangeWithHeuristic(
ctx, receivingLogEng, &sst,
sl.RaftLogPrefix(), sl.RaftLogPrefix().PrefixEnd(),
kvstorage.ClearRangeThresholdPointKeys(),
))
require.NoError(t, sl.ClearRaftReplicaID(&sst))
require.NoError(t, sst.ClearRawRange(
Expand All @@ -4042,7 +4048,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
EndKey: roachpb.RKey(keyEnd),
}
require.NoError(t, storage.ClearRangeWithHeuristic(
ctx, receivingEng, &sst,
ctx, receivingStateEng, &sst,
desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(),
kvstorage.ClearRangeThresholdPointKeys(),
))
Expand All @@ -4053,7 +4059,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// byte-by-byte equal.
var dumpDir string
for i := range sstNamesSubset {
actualSST, err := fs.ReadFile(receivingEng.Env(), sstNamesSubset[i])
actualSST, err := fs.ReadFile(receivingStateEng.Env(), sstNamesSubset[i])
require.NoError(t, err)
if !bytes.Equal(expectedSSTs[i], actualSST) { // intentionally not printing
t.Logf("%d=%s", i, sstNamesSubset[i])
Expand Down Expand Up @@ -4094,8 +4100,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)
store1, store3 := tc.GetFirstStoreFromServer(t, 0), tc.GetFirstStoreFromServer(t, 2)
sendingEng = store1.StateEngine()
receivingEng = store3.StateEngine()
sendingStateEng = store1.StateEngine()
receivingStateEng = store3.StateEngine()
receivingLogEng = store3.LogEngine()
distSender := tc.Servers[0].DistSenderI().(kv.Sender)

// This test works across 5 ranges in total. We start with a scratch
Expand Down
39 changes: 28 additions & 11 deletions pkg/kv/kvserver/kvstorage/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,32 @@ func destroyReplicaImpl(
); err != nil {
return err
}
// TODO(ibrahim): We could know `hi` if DestroyReplicaInfo passes down the
// log's last index.
if err := logstore.ClearRange(
ctx, rw.Raft.RO, rw.Raft.WO, buf.RaftLogPrefix(),
info.RaftAppliedIndex+1 /* lo */, math.MaxUint64 /* hi */, ClearRangeThresholdPointKeys(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make hi known here if the DestroyReplicaInfo passes down the log's last index. Dunno if we want to do it (it's "safer" to clear the whole suffix pretending we don't know where it ends), but it's a possibility.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I want this PR to be as much of a mechanical refactor as possible. I left a TODO to address this in another PR

); err != nil {
return err
}
} else {
if err := storage.ClearRangeWithHeuristic(
ctx, rw.Raft.RO, rw.Raft.WO,
buf.RaftLogKey(info.RaftAppliedIndex+1), sl.RaftReplicaIDKey(),
buf.RangeTombstoneKey().Next(), sl.RaftLogPrefix(),
ClearRangeThresholdPointKeys(),
); err != nil {
return err
}
} else if err := storage.ClearRangeWithHeuristic(
ctx, rw.Raft.RO, rw.Raft.WO,
buf.RangeTombstoneKey().Next(), sl.RaftReplicaIDKey(),
ClearRangeThresholdPointKeys(),
); err != nil {
return err
// Note: We could just clear the while raft log in the
// ClearRangeWithHeuristic() above. However, we want to funnel all raft log
// deletions through the logstore package to make it easier to reason about
// them.
if err := logstore.ClearRange(
ctx, rw.Raft.RO, rw.Raft.WO, buf.RaftLogPrefix(),
0 /* lo */, math.MaxUint64 /* hi */, ClearRangeThresholdPointKeys(),
); err != nil {
return err
}
}
if err := sl.ClearRaftReplicaID(rw.State.WO); err != nil {
return err
Expand Down Expand Up @@ -265,10 +278,14 @@ func RewriteRaftState(
if err := sl.SetHardState(ctx, raftWO, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState")
}
// Clear the raft log. Note that there are no Pebble range keys in this span.
raftLog := sl.RaftLogPrefix() // NB: use only until next StateLoader call
if err := raftWO.ClearRawRange(
raftLog, raftLog.PrefixEnd(), true /* pointKeys */, false, /* rangeKeys */
// Clear the raft log via the logstore. Note that there are no Pebble range
// keys in this span. We use ClearRangeSizeKnown with pointKeyThreshold=0 to
// force a single range tombstone over the whole log span, without scanning.
// TODO(ibrahim): We can actually know the log bounds using truncIndex and lastIndex.
if err := logstore.ClearRangeSizeKnown(
raftWO, sl.RaftLogPrefix(),
0 /* lo */, math.MaxUint64 /* hi */, 0, /* pointKeyThreshold */
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably make it a "size known" case. If we are applying a snapshot to an existing replica, it knows its log. Otherwise, the log is empty.

Btw, I just realized we need more code here for separated engines case. We shouldn't be deleting the entire log - the applied part is deferred to the WAG truncator.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this PR to be as much of a mechanical refactor as possible. I left a TODO to address this in another PR

false, /* maybeUseSingleDel */
); err != nil {
return errors.Wrapf(err, "unable to clear the raft log")
}
Expand Down
79 changes: 41 additions & 38 deletions pkg/kv/kvserver/kvstorage/wag_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,25 +271,29 @@ func (t *WAGTruncator) maybeAdvanceAllowedIndex() {
}
}

// clearReplicaRaftLogAndSideloaded clears raft log entries at or below the given index for
// a destroyed or subsumed replica, and it also deletes the sideloaded files associated with the
// deleted entries.
// clearReplicaRaftLogAndSideloaded clears raft log entries at or below the
// given index for a destroyed or subsumed replica, and it also deletes the
// sideloaded files associated with the deleted entries.
func (t *WAGTruncator) clearReplicaRaftLogAndSideloaded(
ctx context.Context, raft Raft, rangeID roachpb.RangeID, lastIndex kvpb.RaftIndex,
) error {
if logstore.UseRaftLogSingleDelete(t.eng.Separated()) {
if err := clearRaftLogWithSingleDelete(
ctx, raft.RO, raft.WO, rangeID, lastIndex,
prefixBuf := keys.MakeRangeIDPrefixBuf(rangeID)
// We want to delete all raft log entries < hi. Since Raft log doesn't have
// holes, we can get the first index, calculate the log size, and call
// ClearRangeSizeKnown(). If no entries exist in [RaftLogPrefix, hi) (e.g.,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no entries exist in [RaftLogPrefix, hi)
@pav-kv Is this possible? could we destroy an uninitialized replica and it have the Addr.index = 0?

// this replica never received entries) this operation is a no-op.
hi := lastIndex + 1
lo, ok, err := firstRaftLogIndex(ctx, raft.RO, prefixBuf, hi)
if err != nil {
return errors.Wrapf(err, "finding first raft log index for r%d", rangeID)
}
if ok {
if err := logstore.ClearRangeSizeKnown(
raft.WO, prefixBuf.RaftLogPrefix(), lo, hi, ClearRangeThresholdPointKeys(),
logstore.UseRaftLogSingleDelete(t.eng.Separated()),
); err != nil {
return errors.Wrapf(err, "clearing raft log entries for r%d", rangeID)
}
} else if err := storage.ClearRangeWithHeuristic(
ctx, raft.RO, raft.WO,
keys.RaftLogPrefix(rangeID), /* start */
keys.RaftLogKey(rangeID, lastIndex+1), /* end */
ClearRangeThresholdPointKeys(),
); err != nil {
return errors.Wrapf(err, "clearing raft log entries for r%d", rangeID)
}

// In general, we shouldn't delete sideloaded files before committing the
Expand All @@ -312,39 +316,38 @@ func (t *WAGTruncator) clearReplicaRaftLogAndSideloaded(
return ss.Sync()
}

// clearRaftLogWithSingleDelete clears raft log entries using SingleDelete for
// each point key. Unlike the regular truncation path, this always uses point
// deletions and never falls back to a range tombstone for simplicity.
// TODO(ibrahim): Let this function use the same pointDelThreshold heuristic
// when clearning the raft log.
func clearRaftLogWithSingleDelete(
ctx context.Context,
r storage.Reader,
w storage.Writer,
rangeID roachpb.RangeID,
lastIndex kvpb.RaftIndex,
) error {
start := keys.RaftLogPrefix(rangeID)
end := keys.RaftLogKey(rangeID, lastIndex+1)
// firstRaftLogIndex returns the smallest raft log index in [RaftLogPrefix, hi)
// that exists.
// Returns a boolean indicating whether the raft log index was found or not. If
// yes, it returns the index.
func firstRaftLogIndex(
ctx context.Context, r storage.Reader, prefixBuf keys.RangeIDPrefixBuf, hi kvpb.RaftIndex,
) (kvpb.RaftIndex, bool, error) {
start := prefixBuf.RaftLogPrefix().Clone()
end := prefixBuf.RaftLogKey(hi).Clone()
iter, err := r.NewEngineIterator(ctx, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: start,
UpperBound: end,
})
if err != nil {
return err
return 0, false, err
}
defer iter.Close()

ok, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: start})
for ; ok; ok, err = iter.NextEngineKey() {
key, kerr := iter.UnsafeEngineKey()
if kerr != nil {
return kerr
}
if err := w.SingleClearEngineKey(key); err != nil {
return err
}
if err != nil {
return 0, false, err
}
if !ok {
return 0, false, nil // no raft entry was found in the [0, hi) span.
}
key, err := iter.UnsafeEngineKey()
if err != nil {
return 0, false, err
}
lo, err := keys.DecodeRaftLogKeyFromSuffix(key.Key[len(start):])
if err != nil {
return 0, false, err
}
return err
return lo, true, nil
}
41 changes: 39 additions & 2 deletions pkg/kv/kvserver/kvstorage/wag_truncator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,45 @@ func TestTruncateAppliedOnly(t *testing.T) {
}
}

// TestFirstRaftLogIndex exercises the search for the smallest raft log index
// in [RaftLogPrefix, hi).
func TestFirstRaftLogIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
const rangeID = roachpb.RangeID(1)
prefixBuf := keys.MakeRangeIDPrefixBuf(rangeID)

tests := []struct {
entries []kvpb.RaftIndex
hi kvpb.RaftIndex
wantIdx kvpb.RaftIndex
wantOK bool
}{
{entries: nil, hi: 100, wantOK: false},
{entries: []kvpb.RaftIndex{}, hi: 0, wantOK: false},
{entries: []kvpb.RaftIndex{}, hi: 100, wantOK: false},
{entries: []kvpb.RaftIndex{15}, hi: 0, wantOK: false},
{entries: []kvpb.RaftIndex{15}, hi: 15, wantOK: false},
{entries: []kvpb.RaftIndex{15}, hi: 16, wantIdx: 15, wantOK: true},
{entries: []kvpb.RaftIndex{15, 16}, hi: 16, wantIdx: 15, wantOK: true},
{entries: []kvpb.RaftIndex{15, 16}, hi: 100, wantIdx: 15, wantOK: true},
}
for _, tc := range tests {
t.Run("", func(t *testing.T) {
e := makeTestEngines()
defer e.Close()
for _, idx := range tc.entries {
e.writeRaftLogEntry(t, rangeID, idx)
}
idx, ok, err := firstRaftLogIndex(ctx, e.LogEngine(), prefixBuf, tc.hi)
require.NoError(t, err)
require.Equal(t, tc.wantOK, ok)
require.Equal(t, tc.wantIdx, idx)
})
}
}

// TestTruncateAndClearRaftState verifies that WAG truncation only clears raft
// log entries and sideloaded files up to the destroyed/subsumed replica's last
// index. Entries and files beyond that index may belong to a newer replica and
Expand Down Expand Up @@ -719,8 +758,6 @@ func BenchmarkWAGTruncation(b *testing.B) {
b.StopTimer()
st := cluster.MakeTestingClusterSettings()
wagTruncatorBatchSize.Override(ctx, &st.SV, batchSize)
// Disable WAG retention to allow full WAG truncation.
wagSuffixRetentionCount.Override(ctx, &st.SV, 0)
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
engines := MakeEngines(eng)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_test(
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
Loading
Loading