Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3476,14 +3476,15 @@ func (d *DB) compactAndWrite(
writerOpts.Compression = block.FastestCompression
}
vSep := valueSeparation
switch spanPolicy.ValueStoragePolicy {
case ValueStorageLowReadLatency:
switch spanPolicy.ValueStoragePolicy.PolicyAdjustment {
case NoValueSeparation:
vSep = compact.NeverSeparateValues{}
case ValueStorageLatencyTolerant:
case Override:
// This span of keyspace is more tolerant of latency, so set a more
// aggressive value separation policy for this output.
vSep.SetNextOutputConfig(compact.ValueSeparationOutputConfig{
MinimumSize: latencyTolerantMinimumSize,
MinimumSize: spanPolicy.ValueStoragePolicy.MinimumSize,
MinimumMVCCGarbageSize: spanPolicy.ValueStoragePolicy.MinimumMVCCGarbageSize,
})
}
objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)
Expand Down
19 changes: 15 additions & 4 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,21 +1783,27 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
}
policy := SpanPolicy{
DisableValueSeparationBySuffix: true,
ValueStoragePolicy: ValueStorageLowReadLatency,
ValueStoragePolicy: ValueStoragePolicy{
PolicyAdjustment: NoValueSeparation,
},
}
spanPolicies = append(spanPolicies, SpanAndPolicy{
KeyRange: span,
Policy: policy,
})
case "latency-tolerant-span":
case "override-span":
if len(cmdArg.Vals) != 2 {
return errors.New("latency-tolerant-span expects 2 arguments: <start-key> <end-key>")
return errors.New("override-span expects 2 arguments: <start-key> <end-key>")
}
span := KeyRange{
Start: []byte(cmdArg.Vals[0]),
End: []byte(cmdArg.Vals[1]),
}
policy := SpanPolicy{ValueStoragePolicy: ValueStorageLatencyTolerant}
policy := SpanPolicy{ValueStoragePolicy: ValueStoragePolicy{
PolicyAdjustment: Override,
MinimumSize: latencyTolerantMinimumSize,
MinimumMVCCGarbageSize: 1,
}}
spanPolicies = append(spanPolicies, SpanAndPolicy{
KeyRange: span,
Policy: policy,
Expand Down Expand Up @@ -1871,6 +1877,11 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
if err != nil {
return err
}
case "min-mvcc-garbage-size":
policy.MinimumMVCCGarbageSize, err = strconv.Atoi(value)
if err != nil {
return err
}
default:
return errors.Newf("unrecognized value-separation argument %q", name)
}
Expand Down
7 changes: 4 additions & 3 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ func TestBlobCorruptionEvent(t *testing.T) {
}
opts.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy {
return ValueSeparationPolicy{
Enabled: true,
MinimumSize: 1,
MaxBlobReferenceDepth: 10,
Enabled: true,
MinimumSize: 1,
MinimumMVCCGarbageSize: 1,
MaxBlobReferenceDepth: 10,
}
}
d, err := Open("", opts)
Expand Down
5 changes: 4 additions & 1 deletion internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ type ValueSeparationOutputConfig struct {
// MinimumSize is the minimum size of a value that will be separated into a
// blob file.
MinimumSize int
// MinimumMVCCGarbageSize is the minimum size of a value that will be
// separated into a blob file if the value is likely MVCC garbage.
MinimumMVCCGarbageSize int
}

// ValueSeparation defines an interface for writing some values to separate blob
Expand Down Expand Up @@ -344,7 +347,7 @@ func (r *Runner) writeKeysToTable(
}

valueLen := kv.V.Len()
isLikelyMVCCGarbage := sstable.IsLikelyMVCCGarbage(kv.K.UserKey, prevKeyKind, kv.K.Kind(), valueLen, prefixEqual)
isLikelyMVCCGarbage := sstable.IsLikelyMVCCGarbage(kv.K.UserKey, prevKeyKind, kv.K.Kind(), prefixEqual)
// Add the value to the sstable, possibly separating its value into a
// blob file. The ValueSeparation implementation is responsible for
// writing the KV to the sstable.
Expand Down
2 changes: 2 additions & 0 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func defaultOptions(kf KeyFormat) *pebble.Options {
return pebble.ValueSeparationPolicy{
Enabled: true,
MinimumSize: 5,
MinimumMVCCGarbageSize: 1,
MaxBlobReferenceDepth: 3,
RewriteMinimumAge: 50 * time.Millisecond,
GarbageRatioLowPriority: 0.10, // 10% garbage
Expand Down Expand Up @@ -926,6 +927,7 @@ func RandomOptions(rng *rand.Rand, kf KeyFormat, cfg RandomOptionsCfg) *TestOpti
policy := pebble.ValueSeparationPolicy{
Enabled: true,
MinimumSize: 1 + rng.IntN(maxValueSize),
MinimumMVCCGarbageSize: 1 + rng.IntN(10), // [1, 11)
MaxBlobReferenceDepth: 2 + rng.IntN(9), // 2-10
RewriteMinimumAge: time.Duration(rng.IntN(90)+10) * time.Millisecond, // [10ms, 100ms)
GarbageRatioLowPriority: lowPri,
Expand Down
7 changes: 4 additions & 3 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ func TestMetrics(t *testing.T) {
opts.Experimental.EnableValueBlocks = func() bool { return true }
opts.Experimental.ValueSeparationPolicy = func() ValueSeparationPolicy {
return ValueSeparationPolicy{
Enabled: true,
MinimumSize: 3,
MaxBlobReferenceDepth: 5,
Enabled: true,
MinimumSize: 3,
MinimumMVCCGarbageSize: 1,
MaxBlobReferenceDepth: 5,
}
}
opts.TargetFileSizes[0] = 50
Expand Down
62 changes: 46 additions & 16 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,13 @@ type ValueSeparationPolicy struct {
//
// MinimumSize must be > 0.
MinimumSize int
// MinimumMVCCGarbageSize specifies the minimum size of a value that can be
// separated into a blob file if said value is likely MVCC garbage. This
// applies only to SpanPolicies that permit separation of MVCC garbage,
// which is also the default.
//
// MinimumMVCCGarbageSize must be > 0.
MinimumMVCCGarbageSize int
// MaxBlobReferenceDepth limits the number of potentially overlapping (in
// the keyspace) blob files that can be referenced by a single sstable. If a
// compaction may produce an output sstable referencing more than this many
Expand Down Expand Up @@ -1311,36 +1318,51 @@ func (p SpanPolicy) String() string {
if p.DisableValueSeparationBySuffix {
sb.WriteString("disable-value-separation-by-suffix,")
}
switch p.ValueStoragePolicy {
case ValueStorageLowReadLatency:
sb.WriteString("low-read-latency,")
case ValueStorageLatencyTolerant:
sb.WriteString("latency-tolerant,")
switch p.ValueStoragePolicy.PolicyAdjustment {
case NoValueSeparation:
sb.WriteString("no-value-separation,")
case Override:
sb.WriteString("override,")
}
return strings.TrimSuffix(sb.String(), ",")
}

// ValueStoragePolicy is a hint used to determine where to store the values for
// KVs.
type ValueStoragePolicy uint8
// ValueStoragePolicy is used to determine where to store the values for
// KVs. If the PolicyAdjustment specified is Override, the remaining fields
// are used to override the global configuration for value separation.
type ValueStoragePolicy struct {
// PolicyAdjustment specifies the policy adjustment to apply.
PolicyAdjustment ValueStoragePolicyAdjustment
// Remaining fields are ignored, unless the PolicyAdjustment is Override.

// MinimumSize is the minimum size of the value.
MinimumSize int
// MinimumMVCCGarbageSize is the minimum size of the value that is likely
// MVCC garbage.
MinimumMVCCGarbageSize int
}

// ValueStoragePolicyAdjustment is a hint used to determine where to store the
// values for KVs.
type ValueStoragePolicyAdjustment uint8

const (
// ValueStorageDefault is the default value; Pebble will respect global
// configuration for value blocks and value separation.
ValueStorageDefault ValueStoragePolicy = iota
// UseDefault is the default value; Pebble will respect global
// configuration for value separation.
UseDefault ValueStoragePolicyAdjustment = iota

// ValueStorageLowReadLatency indicates Pebble should prefer storing values
// NoValueSeparation indicates Pebble should prefer storing values
// in-place.
ValueStorageLowReadLatency
NoValueSeparation

// ValueStorageLatencyTolerant indicates value retrieval can tolerate
// Override indicates value retrieval can tolerate
// additional latency, so Pebble should aggressively prefer storing values
// separately if it can reduce write amplification.
//
// If the global Options' enable value separation, Pebble may choose to
// separate values under the LatencyTolerant policy even if they do not meet
// separate values under the Override policy even if they do not meet
// the minimum size threshold of the global Options' ValueSeparationPolicy.
ValueStorageLatencyTolerant
Override
)

// SpanPolicyFunc is used to determine the SpanPolicy for a key region.
Expand Down Expand Up @@ -1855,6 +1877,7 @@ func (o *Options) String() string {
fmt.Fprintln(&buf, "[Value Separation]")
fmt.Fprintf(&buf, " enabled=%t\n", policy.Enabled)
fmt.Fprintf(&buf, " minimum_size=%d\n", policy.MinimumSize)
fmt.Fprintf(&buf, " minimum_mvcc_garbage_size=%d\n", policy.MinimumMVCCGarbageSize)
fmt.Fprintf(&buf, " max_blob_reference_depth=%d\n", policy.MaxBlobReferenceDepth)
fmt.Fprintf(&buf, " rewrite_minimum_age=%s\n", policy.RewriteMinimumAge)
fmt.Fprintf(&buf, " garbage_ratio_low_priority=%.2f\n", policy.GarbageRatioLowPriority)
Expand Down Expand Up @@ -2300,6 +2323,10 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
var minimumSize int
minimumSize, err = strconv.Atoi(value)
valSepPolicy.MinimumSize = minimumSize
case "minimum_mvcc_garbage_size":
var minimumMVCCGarbageSize int
minimumMVCCGarbageSize, err = strconv.Atoi(value)
valSepPolicy.MinimumMVCCGarbageSize = minimumMVCCGarbageSize
case "max_blob_reference_depth":
valSepPolicy.MaxBlobReferenceDepth, err = strconv.Atoi(value)
case "rewrite_minimum_age":
Expand Down Expand Up @@ -2571,6 +2598,9 @@ func (o *Options) Validate() error {
if policy.MinimumSize <= 0 {
fmt.Fprintf(&buf, "ValueSeparationPolicy.MinimumSize (%d) must be > 0\n", policy.MinimumSize)
}
if policy.MinimumMVCCGarbageSize <= 0 {
fmt.Fprintf(&buf, "ValueSeparationPolicy.MinimumMVCCGarbageSize (%d) must be > 0\n", policy.MinimumMVCCGarbageSize)
}
if policy.MaxBlobReferenceDepth <= 0 {
fmt.Fprintf(&buf, "ValueSeparationPolicy.MaxBlobReferenceDepth (%d) must be > 0\n", policy.MaxBlobReferenceDepth)
}
Expand Down
16 changes: 8 additions & 8 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func (o *Options) randomizeForTesting(t testing.TB) {
if o.FormatMajorVersion >= FormatValueSeparation && o.Experimental.ValueSeparationPolicy == nil && rand.Int64N(4) > 0 {
lowPri := 0.1 + rand.Float64()*0.9 // [0.1, 1.0)
policy := ValueSeparationPolicy{
Enabled: true,
MinimumSize: 1 << rand.IntN(10), // [1, 512]
MaxBlobReferenceDepth: rand.IntN(10) + 1, // [1, 10)
Enabled: true,
MinimumSize: 1 << rand.IntN(10), // [1, 512]
MinimumMVCCGarbageSize: 1 + rand.IntN(10), // [1, 10]
MaxBlobReferenceDepth: 1 + rand.IntN(10), // [1, 10]
// Constrain the rewrite minimum age to [0, 15s).
RewriteMinimumAge: time.Duration(rand.IntN(15)) * time.Second,
GarbageRatioLowPriority: lowPri,
Expand Down Expand Up @@ -259,7 +260,6 @@ func TestOptionsCheckCompatibility(t *testing.T) {

// Check that an OPTIONS file that configured an explicit WALDir that will
// no longer be used errors if it's not also present in WALRecoveryDirs.
//require.Equal(t, ErrMissingWALRecoveryDir{Dir: "external-wal-dir"},
err := DefaultOptions().CheckCompatibility(storeDir, `
[Options]
wal_dir=external-wal-dir
Expand Down Expand Up @@ -648,10 +648,10 @@ func TestStaticSpanPolicyFunc(t *testing.T) {
sap.KeyRange.End = []byte(p.Next())
p.Expect(":")
switch tok := p.Next(); tok {
case "lowlatency":
sap.Policy.ValueStoragePolicy = ValueStorageLowReadLatency
case "latencytolerant":
sap.Policy.ValueStoragePolicy = ValueStorageLatencyTolerant
case "novalueseparation":
sap.Policy.ValueStoragePolicy.PolicyAdjustment = NoValueSeparation
case "override":
sap.Policy.ValueStoragePolicy.PolicyAdjustment = Override
default:
t.Fatalf("unknown policy: %s", tok)
}
Expand Down
9 changes: 5 additions & 4 deletions replay/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,11 @@ func collectCorpus(t *testing.T, fs *vfs.MemFS, name string) {
}
opts.Experimental.ValueSeparationPolicy = func() pebble.ValueSeparationPolicy {
return pebble.ValueSeparationPolicy{
Enabled: true,
MinimumSize: 3,
MaxBlobReferenceDepth: 5,
RewriteMinimumAge: 15 * time.Minute,
Enabled: true,
MinimumSize: 3,
MinimumMVCCGarbageSize: 1,
MaxBlobReferenceDepth: 5,
RewriteMinimumAge: 15 * time.Minute,
}
}
setDefaultExperimentalOpts(opts)
Expand Down
5 changes: 3 additions & 2 deletions replay/testdata/replay_val_sep
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tree
0 LOCK
152 MANIFEST-000010
250 MANIFEST-000013
2947 OPTIONS-000003
2977 OPTIONS-000003
0 marker.format-version.000011.024
0 marker.manifest.000003.MANIFEST-000013
simple_val_sep/
Expand All @@ -32,7 +32,7 @@ tree
11 000011.log
707 000012.sst
187 MANIFEST-000013
2947 OPTIONS-000003
2977 OPTIONS-000003
0 marker.format-version.000001.024
0 marker.manifest.000001.MANIFEST-000013

Expand Down Expand Up @@ -93,6 +93,7 @@ cat build/OPTIONS-000003
[Value Separation]
enabled=true
minimum_size=3
minimum_mvcc_garbage_size=1
max_blob_reference_depth=5
rewrite_minimum_age=15m0s
garbage_ratio_low_priority=0.00
Expand Down
24 changes: 10 additions & 14 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,17 @@ func (w *RawColumnWriter) evaluatePoint(
// We require:
// . Value blocks to be enabled.
// . IsLikelyMVCCGarbage to be true; see comment for MVCC garbage criteria.
// . The value to be sufficiently large. (Currently we simply require a
// non-zero length, so all non-empty values are eligible for storage
// out-of-band in a value block.)
//
// Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
// valueHandle, this should be > 3. But tiny values are common in test and
// unlikely in production, so we use 0 here for better test coverage.
useValueBlock := !w.opts.DisableValueBlocks &&
w.valueBlock != nil &&
IsLikelyMVCCGarbage(key.UserKey, prevKeyKind, keyKind, valueLen, prefixEqual)
valueLen > 0 &&
IsLikelyMVCCGarbage(key.UserKey, prevKeyKind, keyKind, prefixEqual)
if !useValueBlock {
return eval, nil
}
Expand Down Expand Up @@ -1285,26 +1293,14 @@ func (w *RawColumnWriter) SetValueSeparationProps(kind ValueSeparationKind, minV
//
// . The previous key to be a SET/SETWITHDEL.
// . The current key to be a SET/SETWITHDEL.
// . The value to be sufficiently large. (Currently we simply require a
// non-zero length, so all non-empty values are eligible for storage
// out-of-band in a value block.)
// . The current key to have the same prefix as the previous key.
//
// Use of 0 here is somewhat arbitrary. Given the minimum 3 byte encoding of
// valueHandle, this should be > 3. But tiny values are common in test and
// unlikely in production, so we use 0 here for better test coverage.
func IsLikelyMVCCGarbage(
k []byte,
prevKeyKind, keyKind base.InternalKeyKind,
valueLen int,
prefixEqual func(k []byte) bool,
k []byte, prevKeyKind, keyKind base.InternalKeyKind, prefixEqual func(k []byte) bool,
) bool {
const tinyValueThreshold = 0
isSetStarKind := func(k base.InternalKeyKind) bool {
return k == InternalKeyKindSet || k == InternalKeyKindSetWithDelete
}
return isSetStarKind(prevKeyKind) &&
isSetStarKind(keyKind) &&
valueLen > tinyValueThreshold &&
prefixEqual(k)
}
2 changes: 1 addition & 1 deletion testdata/checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ L6:
# Create a new database with value separation enabled to test that blob files
# are included in checkpoints.

open valsepdb value-separation=(enabled, min-size=1, max-ref-depth=5, rw-min-age=0s, garbage-ratios=1.0:1.0)
open valsepdb value-separation=(enabled, min-size=1, max-ref-depth=5, rw-min-age=0s, garbage-ratios=1.0:1.0, min-mvcc-garbage-size=1)
----
mkdir-all: valsepdb 0755
open-dir: .
Expand Down
2 changes: 1 addition & 1 deletion testdata/compaction/backing_value_size
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Test a blob file rewrite compaction with virtual sstable references that
# track backing value sizes do not trigger unnecessary blob file rewrites.

define value-separation=(enabled=true, min-size=1, max-ref-depth=10, garbage-ratios=0.01:0.2)
define value-separation=(enabled=true, min-size=1, max-ref-depth=10, garbage-ratios=0.01:0.2, min-mvcc-garbage-size=1)
----

batch
Expand Down
2 changes: 1 addition & 1 deletion testdata/compaction/mvcc_garbage_blob
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Set the minimum size for a separated value to 5.

define value-separation=(enabled, min-size=5, max-ref-depth=3, garbage-ratios=1.0:1.0)
define value-separation=(enabled, min-size=5, max-ref-depth=3, garbage-ratios=1.0:1.0, min-mvcc-garbage-size=1)
----

batch
Expand Down
Loading