Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blob_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
// We don't know the size of the output blob file--it may have
// been half-written. We use the input blob file size as an
// approximation for deletion pacing.
FileSize: c.input.Physical.Size,
IsLocal: true,
FileSize: c.input.Physical.Size,
Placement: base.Local,
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,7 +2677,7 @@ func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
FileNum: ve.NewBlobFiles[i].Physical.FileNum,
FileSize: ve.NewBlobFiles[i].Physical.Size,
},
isLocal: objstorage.IsLocalBlobFile(d.objProvider, ve.NewBlobFiles[i].Physical.FileNum),
placement: objstorage.Placement(d.objProvider, base.FileTypeBlob, ve.NewBlobFiles[i].Physical.FileNum),
})
}
for i := range ve.NewTables {
Expand Down Expand Up @@ -2706,7 +2706,7 @@ func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
FileNum: of.DiskFileNum,
FileSize: of.Size,
},
isLocal: objstorage.IsLocalTable(d.objProvider, of.DiskFileNum),
placement: objstorage.Placement(d.objProvider, base.FileTypeTable, of.DiskFileNum),
})
}
d.mu.versions.addObsoleteLocked(obsoleteFiles)
Expand Down
39 changes: 23 additions & 16 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/problemspans"
"github.com/cockroachdb/pebble/metrics"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -539,7 +540,8 @@ type DB struct {
}
}

fileSizeAnnotator manifest.TableAnnotator[fileSizeByBacking]
tableDiskUsageAnnotator manifest.TableAnnotator[TableUsageByPlacement]
blobFileDiskUsageAnnotator manifest.BlobFileAnnotator[metrics.CountAndSizeByPlacement]

// problemSpans keeps track of spans of keys within LSM levels where
// compactions have failed; used to avoid retrying these compactions too
Expand Down Expand Up @@ -1880,7 +1882,6 @@ func (d *DB) AsyncFlush() (<-chan struct{}, error) {
func (d *DB) Metrics() *Metrics {
metrics := &Metrics{}
walStats := d.mu.log.manager.Stats()
deletePacerMetrics := d.deletePacer.Metrics()

d.mu.Lock()
vers := d.mu.versions.currentVersion()
Expand Down Expand Up @@ -1940,35 +1941,28 @@ func (d *DB) Metrics() *Metrics {
metrics.Levels[level].CompensatedFillFactor = lm.compensatedFillFactor
}
}
metrics.Table.Zombie.All.Count = uint64(d.mu.versions.zombieTables.Count())
metrics.Table.Zombie.All.Bytes = d.mu.versions.zombieTables.TotalSize()
metrics.Table.Zombie.Local.Count, metrics.Table.Zombie.Local.Bytes = d.mu.versions.zombieTables.LocalStats()
metrics.Table.Physical.Zombie = d.mu.versions.zombieTables.Metrics()
metrics.BlobFiles.Zombie = d.mu.versions.zombieBlobs.Metrics()

// Populate obsolete blob/table metrics from both the not-yet-enqueued lists
// in the versionSet, and what is already in the delete pacer queue.
metrics.Table.Obsolete = deletePacerMetrics.InQueue.Tables
deletePacerMetrics := d.deletePacer.Metrics()
metrics.Table.Physical.Obsolete = deletePacerMetrics.InQueue.Tables
for _, fi := range d.mu.versions.obsoleteTables {
metrics.Table.Obsolete.Inc(fi.FileSize, fi.IsLocal)
metrics.Table.Physical.Obsolete.Inc(fi.FileSize, fi.Placement)
}
metrics.BlobFiles.Obsolete = deletePacerMetrics.InQueue.BlobFiles
for _, fi := range d.mu.versions.obsoleteBlobs {
metrics.BlobFiles.Obsolete.Inc(fi.FileSize, fi.IsLocal)
metrics.BlobFiles.Obsolete.Inc(fi.FileSize, fi.Placement)
}

metrics.private.optionsFileSize = d.optionsFileSize

d.mu.versions.logLock()
metrics.private.manifestFileSize = uint64(d.mu.versions.manifest.Size())
backingCount, backingTotalSize := d.mu.versions.latest.virtualBackings.Stats()
metrics.Table.BackingTable.Count = uint64(backingCount)
metrics.Table.BackingTable.Bytes = backingTotalSize
backingStats := d.mu.versions.latest.virtualBackings.Stats()
blobStats, _ := d.mu.versions.latest.blobFiles.Stats()
d.mu.versions.logUnlock()
metrics.BlobFiles.Live.All.Count = blobStats.Count
metrics.BlobFiles.Live.All.Bytes = blobStats.PhysicalSize
metrics.BlobFiles.ValueSize = blobStats.ValueSize
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize

metrics.LogWriter.FsyncLatency = d.mu.log.metrics.fsyncLatency
if err := metrics.LogWriter.Merge(&d.mu.log.metrics.LogWriterMetrics); err != nil {
Expand All @@ -1984,6 +1978,13 @@ func (d *DB) Metrics() *Metrics {

d.mu.Unlock()

// The table disk usage is due to physical tables plus backings for virtual tables.
tableDiskUsage := d.tableDiskUsageAnnotator.MultiLevelAnnotation(vers.Levels[:])
metrics.Table.Physical.Live.Local = tableDiskUsage.Local.Physical
metrics.Table.Physical.Live.Shared = tableDiskUsage.Shared.Physical
metrics.Table.Physical.Live.External = tableDiskUsage.External.Physical
metrics.Table.Physical.Live.Accumulate(backingStats)

// TODO(jackson): Consider making these metrics optional.
aggProps := tablePropsAnnotator.MultiLevelAnnotation(vers.Levels[:])
metrics.Keys.RangeKeySetsCount = aggProps.NumRangeKeySets
Expand All @@ -1999,6 +2000,12 @@ func (d *DB) Metrics() *Metrics {
metrics.Table.Compression.MergeWith(&aggProps.CompressionMetrics)
}

metrics.BlobFiles.Live = d.blobFileDiskUsageAnnotator.Annotation(&vers.BlobFiles)

metrics.BlobFiles.ValueSize = blobStats.ValueSize
metrics.BlobFiles.ReferencedValueSize = blobStats.ReferencedValueSize
metrics.BlobFiles.ReferencedBackingValueSize = blobStats.ReferencedBackingValueSize

blobCompressionMetrics := blobCompressionStatsAnnotator.Annotation(&vers.BlobFiles)
metrics.BlobFiles.Compression.MergeWith(&blobCompressionMetrics)

Expand Down
140 changes: 93 additions & 47 deletions disk_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/metrics"
"github.com/cockroachdb/pebble/objstorage"
)

// EstimateDiskUsage returns the estimated filesystem space used in bytes for
Expand Down Expand Up @@ -48,71 +50,115 @@ func (d *DB) EstimateDiskUsageByBackingType(
readState := d.loadReadState()
defer readState.unref()

sizes := d.fileSizeAnnotator.VersionRangeAnnotation(readState.current, bounds)
return sizes.totalSize, sizes.remoteSize, sizes.externalSize, nil
sizes := d.tableDiskUsageAnnotator.VersionRangeAnnotation(readState.current, bounds)
externalSize = sizes.External.TotalBytes()
remoteSize = externalSize + sizes.Shared.TotalBytes()
totalSize = remoteSize + sizes.Local.TotalBytes()
return totalSize, remoteSize, externalSize, nil
}

// fileSizeByBacking contains the estimated file size for LSM data within some
// bounds. It is broken down by backing type. The file size refers to both the
// sstable size and an estimate of the referenced blob sizes.
type fileSizeByBacking struct {
// totalSize is the estimated size of all files for the given bounds.
totalSize uint64
// remoteSize is the estimated size of remote files for the given bounds.
remoteSize uint64
// externalSize is the estimated size of external files for the given bounds.
externalSize uint64
// TableUsageByPlacement contains space usage information for tables, broken
// down by where they are stored.
//
// Depending on context, this can refer to all tables in the LSM, all tables on
// a level, or tables within some specified bounds (in the latter case, for
// tables overlapping the bounds, the usage is a best-effort estimation).
type TableUsageByPlacement struct {
metrics.ByPlacement[TableDiskUsage]
}

func (d *DB) singleFileSizeByBacking(
fileSize uint64, t *manifest.TableMetadata,
) (_ fileSizeByBacking, ok bool) {
res := fileSizeByBacking{
totalSize: fileSize,
}
// Accumulate adds the rhs counts and sizes to the receiver.
func (u *TableUsageByPlacement) Accumulate(rhs TableUsageByPlacement) {
u.Local.Accumulate(rhs.Local)
u.Shared.Accumulate(rhs.Shared)
u.External.Accumulate(rhs.External)
}

// TableDiskUsage contains space usage information for a set of sstables.
type TableDiskUsage struct {
// Physical contains the count and total size of physical tables in the set.
Physical metrics.CountAndSize

// Virtual contains the count and total estimated referenced bytes of virtual
// tables in the set.
Virtual metrics.CountAndSize

// ReferencedBytes contains the total estimated size of values stored in blob
// files referenced by tables in this set (either physical or virtual).
ReferencedBytes uint64
}

// TotalBytes returns the sum of all the byte fields.
func (u TableDiskUsage) TotalBytes() uint64 {
return u.Physical.Bytes + u.Virtual.Bytes + u.ReferencedBytes
}

// Accumulate adds the rhs counts and sizes to the receiver.
func (u *TableDiskUsage) Accumulate(rhs TableDiskUsage) {
u.Physical.Accumulate(rhs.Physical)
u.Virtual.Accumulate(rhs.Virtual)
u.ReferencedBytes += rhs.ReferencedBytes
}

objMeta, err := d.objProvider.Lookup(base.FileTypeTable, t.TableBacking.DiskFileNum)
if err != nil {
return res, false
func (d *DB) singleTableDiskUsage(
fileSize uint64, referencedSize uint64, fileNum base.DiskFileNum, isVirtual bool,
) TableUsageByPlacement {
u := TableDiskUsage{
ReferencedBytes: referencedSize,
}
if objMeta.IsRemote() {
res.remoteSize += fileSize
if objMeta.IsExternal() {
res.externalSize += fileSize
}
if isVirtual {
u.Virtual.Inc(fileSize)
} else {
u.Physical.Inc(fileSize)
}
return res, true
placement := objstorage.Placement(d.objProvider, base.FileTypeTable, fileNum)
var res TableUsageByPlacement
res.Set(placement, u)
return res
}

var fileSizeAnnotatorIdx = manifest.NewTableAnnotationIdx()
var tableDiskUsageAnnotatorIdx = manifest.NewTableAnnotationIdx()

// makeFileSizeAnnotator returns an annotator that computes the storage size of
// files. When applicable, this includes both the sstable size and the size of
// any referenced blob files.
func (d *DB) makeFileSizeAnnotator() manifest.TableAnnotator[fileSizeByBacking] {
return manifest.MakeTableAnnotator[fileSizeByBacking](
fileSizeAnnotatorIdx,
manifest.TableAnnotatorFuncs[fileSizeByBacking]{
Merge: func(dst *fileSizeByBacking, src fileSizeByBacking) {
dst.totalSize += src.totalSize
dst.remoteSize += src.remoteSize
dst.externalSize += src.externalSize
},
Table: func(f *manifest.TableMetadata) (v fileSizeByBacking, cacheOK bool) {
return d.singleFileSizeByBacking(f.Size+f.EstimatedReferenceSize(), f)
// makeTableDiskSpaceUsageAnnotator returns an annotator that computes the
// storage size of files. When applicable, this includes both the sstable size
// and the size of any referenced blob files.
func (d *DB) makeTableDiskSpaceUsageAnnotator() manifest.TableAnnotator[TableUsageByPlacement] {
return manifest.MakeTableAnnotator[TableUsageByPlacement](
tableDiskUsageAnnotatorIdx,
manifest.TableAnnotatorFuncs[TableUsageByPlacement]{
Merge: (*TableUsageByPlacement).Accumulate,
Table: func(f *manifest.TableMetadata) (v TableUsageByPlacement, cacheOK bool) {
return d.singleTableDiskUsage(f.Size, f.EstimatedReferenceSize(), f.TableBacking.DiskFileNum, f.Virtual), true
},
PartialOverlap: func(f *manifest.TableMetadata, bounds base.UserKeyBounds) fileSizeByBacking {
PartialOverlap: func(f *manifest.TableMetadata, bounds base.UserKeyBounds) TableUsageByPlacement {
overlappingFileSize, err := d.fileCache.estimateSize(f, bounds.Start, bounds.End.Key)
if err != nil {
return fileSizeByBacking{}
return TableUsageByPlacement{}
}
overlapFraction := float64(overlappingFileSize) / float64(f.Size)
// Scale the blob reference size proportionally to the file
// overlap from the bounds to approximate only the blob
// references that overlap with the requested bounds.
size := overlappingFileSize + uint64(float64(f.EstimatedReferenceSize())*overlapFraction)
res, _ := d.singleFileSizeByBacking(size, f)
return res
referencedSize := uint64(float64(f.EstimatedReferenceSize()) * overlapFraction)
return d.singleTableDiskUsage(overlappingFileSize, referencedSize, f.TableBacking.DiskFileNum, f.Virtual)
},
})
}

var blobFileDiskUsageAnnotatorIdx = manifest.NewBlobAnnotationIdx()

// makeDiskSpaceUsageAnnotator returns an annotator that computes the storage size of
// files. When applicable, this includes both the sstable size and the size of
// any referenced blob files.
func (d *DB) makeBlobFileDiskSpaceUsageAnnotator() manifest.BlobFileAnnotator[metrics.CountAndSizeByPlacement] {
return manifest.MakeBlobFileAnnotator[metrics.CountAndSizeByPlacement](
blobFileDiskUsageAnnotatorIdx,
manifest.BlobFileAnnotatorFuncs[metrics.CountAndSizeByPlacement]{
Merge: (*metrics.CountAndSizeByPlacement).Accumulate,
BlobFile: func(m manifest.BlobFileMetadata) (res metrics.CountAndSizeByPlacement, cacheOK bool) {
placement := objstorage.Placement(d.objProvider, base.FileTypeBlob, m.Physical.FileNum)
res.Ptr(placement).Inc(m.Physical.Size)
return res, true
},
})
}
44 changes: 44 additions & 0 deletions internal/base/placement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package base

import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/redact"
)

// Placement identifies where a file/object is stored.
//
// The zero value is invalid (this is intentional to disambiguate accidentally
// uninitialized fields).
type Placement uint8

const (
Local Placement = 1 + iota
Shared
External
)

func (p Placement) String() string {
switch p {
case Local:
return "local"
case Shared:
return "shared"
case External:
return "external"
default:
if invariants.Enabled {
panic(errors.AssertionFailedf("invalid placement type %d", p))
}
return "invalid"
}
}

// SafeFormat implements redact.SafeFormatter.
func (p Placement) SafeFormat(w redact.SafePrinter, _ rune) {
w.Print(redact.SafeString(p.String()))
}
10 changes: 5 additions & 5 deletions internal/deletepacer/delete_pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (dp *DeletePacer) mainLoop() {
defer dp.mu.Lock()
dp.deleteFn(file.ObsoleteFile, file.JobID)
}()
dp.mu.metrics.InQueue.Dec(file.FileType, file.FileSize, file.IsLocal)
dp.mu.metrics.Deleted.Inc(file.FileType, file.FileSize, file.IsLocal)
dp.mu.metrics.InQueue.Dec(file.FileType, file.FileSize, file.Placement)
dp.mu.metrics.Deleted.Inc(file.FileType, file.FileSize, file.Placement)
dp.mu.deletedCond.Broadcast()
}
}
Expand All @@ -237,7 +237,7 @@ func (dp *DeletePacer) Enqueue(jobID int, files ...ObsoleteFile) {
dp.mu.queuedPacingBytes += b
dp.mu.queuedHistory.Add(now, b)
}
dp.mu.metrics.InQueue.Inc(file.FileType, file.FileSize, file.IsLocal)
dp.mu.metrics.InQueue.Inc(file.FileType, file.FileSize, file.Placement)
dp.mu.queue.PushBack(queueEntry{
ObsoleteFile: file,
JobID: jobID,
Expand Down Expand Up @@ -276,8 +276,8 @@ func (dp *DeletePacer) WaitForTesting() {
dp.mu.Lock()
defer dp.mu.Unlock()

n := dp.mu.metrics.Deleted.Totals().Count + dp.mu.metrics.InQueue.Totals().Count
for dp.mu.metrics.Deleted.Totals().Count < n {
n := dp.mu.metrics.Deleted.Total().Count + dp.mu.metrics.InQueue.Total().Count
for dp.mu.metrics.Deleted.Total().Count < n {
dp.mu.deletedCond.Wait()
}
}
Loading