Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func logAppend(
value.InitChecksum(key)
var err error
if kvpb.RaftIndex(ent.Index) > prev.LastIndex {
_, err = storage.MVCCBlindPut(ctx, rw, key, hlc.Timestamp{}, *value, opts)
err = storage.MVCCBlindPutInline(rw, key, *value, diff)
} else {
_, err = storage.MVCCPut(ctx, rw, key, hlc.Timestamp{}, *value, opts)
}
Expand All @@ -447,8 +447,7 @@ func logAppend(
for i := newLastIndex + 1; i <= prev.LastIndex; i++ {
// Note that the caller is in charge of deleting any sideloaded payloads
// (which they must only do *after* the batch has committed).
_, _, err := storage.MVCCDelete(ctx, rw, keys.RaftLogKeyFromPrefix(raftLogPrefix, i),
hlc.Timestamp{}, opts)
err := storage.MVCCDeleteInline(ctx, rw, diff, keys.RaftLogKeyFromPrefix(raftLogPrefix, i))
if err != nil {
return RaftState{}, err
}
Expand Down Expand Up @@ -513,9 +512,7 @@ func Compact(
}
value.InitChecksum(key)

if _, err := storage.MVCCBlindPut(
ctx, writer, key, hlc.Timestamp{}, value, storage.MVCCWriteOptions{},
); err != nil {
if err := storage.MVCCBlindPutInline(writer, key, value, nil); err != nil {
return errors.Wrap(err, "unable to write RaftTruncatedState")
}
return nil
Expand Down
20 changes: 2 additions & 18 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,7 @@ func (sl StateLoader) SetRaftTruncatedState(
if (*truncState == kvserverpb.RaftTruncatedState{}) {
return errors.New("cannot persist empty RaftTruncatedState")
}
// "Blind" because opts.Stats == nil and timestamp.IsEmpty().
return storage.MVCCBlindPutProto(
ctx,
writer,
sl.RaftTruncatedStateKey(),
hlc.Timestamp{}, /* timestamp */
truncState,
storage.MVCCWriteOptions{}, /* txn */
)
return storage.MVCCBlindPutInlineProto(writer, sl.RaftTruncatedStateKey(), truncState, nil)
}

// ClearRaftTruncatedState clears the RaftTruncatedState.
Expand All @@ -158,15 +150,7 @@ func (sl StateLoader) LoadHardState(
func (sl StateLoader) SetHardState(
ctx context.Context, writer storage.Writer, hs raftpb.HardState,
) error {
// "Blind" because opts.Stats == nil and timestamp.IsEmpty().
return storage.MVCCBlindPutProto(
ctx,
writer,
sl.RaftHardStateKey(),
hlc.Timestamp{}, /* timestamp */
&hs,
storage.MVCCWriteOptions{}, /* opts */
)
return storage.MVCCBlindPutInlineProto(writer, sl.RaftHardStateKey(), &hs, nil)
}

// SynthesizeHardState synthesizes an on-disk HardState from the given input,
Expand Down
69 changes: 69 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,75 @@ func MVCCBlindPut(
return mvccPutUsingIter(ctx, writer, nil, nil, key, timestamp, value, nil, opts)
}

// MVCCBlindPutInline writes an inline (unversioned) value directly, without
// reading the existing value or checking for conflicts. This is a fast path for
// non-transactional blind writes of inline values, such as raft log appends.
func MVCCBlindPutInline(
writer Writer, key roachpb.Key, value roachpb.Value, ms *enginepb.MVCCStats,
) error {
metaKey := MakeMVCCMetadataKey(key)
buf := newPutBuffer()
defer buf.release()
buf.newMeta = enginepb.MVCCMetadata{RawBytes: value.RawBytes}
metaKeySize, metaValSize, err := buf.putInlineMeta(writer, metaKey, &buf.newMeta)
if err != nil {
return err
}
if ms != nil {
updateStatsForInline(ms, key, 0, 0, metaKeySize, metaValSize)
}
return nil
}

// MVCCBlindPutInlineProto is like MVCCBlindPutInline but accepts a protobuf
// message which is serialized into a roachpb.Value.
func MVCCBlindPutInlineProto(
writer Writer, key roachpb.Key, msg protoutil.Message, ms *enginepb.MVCCStats,
) error {
var value roachpb.Value
if err := value.SetProto(msg); err != nil {
return err
}
value.InitChecksum(key)
return MVCCBlindPutInline(writer, key, value, ms)
}

// MVCCDeleteInline deletes an inline (unversioned) value. Unlike a blind put,
// this reads the existing value to determine its size for MVCC stats
// accounting. This is a fast path for non-transactional inline deletes, such
// as raft log truncation.
func MVCCDeleteInline(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, key roachpb.Key,
) error {
metaKey := MakeMVCCMetadataKey(key)
iter, err := rw.NewMVCCIterator(ctx, MVCCKeyIterKind, IterOptions{Prefix: true})
if err != nil {
return err
}
defer iter.Close()

// Read the existing inline metadata to determine its size for stats.
var origMetaKeySize, origMetaValSize int64
iter.SeekGE(metaKey)
if ok, err := iter.Valid(); err != nil {
return err
} else if ok && iter.UnsafeKey().Key.Equal(metaKey.Key) {
origMetaKeySize = int64(iter.UnsafeKey().EncodedSize())
origMetaValSize = int64(iter.ValueLen())
}

if err := rw.ClearUnversioned(metaKey.Key, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(origMetaValSize),
}); err != nil {
return err
}
if ms != nil {
updateStatsForInline(ms, key, origMetaKeySize, origMetaValSize, 0, 0)
}
return nil
}

// MVCCDelete marks the key deleted so that it will not be returned in
// future get responses.
//
Expand Down
Loading