Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage_id tag to StorageAdapter metrics #8786

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (a *Adapter) log(ctx context.Context) logging.Logger {

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) {
var err error
defer reportMetrics("Put", time.Now(), &sizeBytes, &err)
defer reportMetrics("Put", obj.StorageID, time.Now(), &sizeBytes, &err)
qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
return nil, err
Expand All @@ -216,7 +216,7 @@ func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes in

func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) {
var err error
defer reportMetrics("Get", time.Now(), nil, &err)
defer reportMetrics("Get", obj.StorageID, time.Now(), nil, &err)

return a.Download(ctx, obj, 0, blockblob.CountToEnd)
}
Expand Down Expand Up @@ -319,7 +319,7 @@ func (a *Adapter) getPreSignedURL(ctx context.Context, obj block.ObjectPointer,

func (a *Adapter) GetRange(ctx context.Context, obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) {
var err error
defer reportMetrics("GetRange", time.Now(), nil, &err)
defer reportMetrics("GetRange", obj.StorageID, time.Now(), nil, &err)

return a.Download(ctx, obj, startPosition, endPosition-startPosition+1)
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func (a *Adapter) Download(ctx context.Context, obj block.ObjectPointer, offset,

func (a *Adapter) Exists(ctx context.Context, obj block.ObjectPointer) (bool, error) {
var err error
defer reportMetrics("Exists", time.Now(), nil, &err)
defer reportMetrics("Exists", obj.StorageID, time.Now(), nil, &err)

qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
Expand All @@ -380,7 +380,7 @@ func (a *Adapter) Exists(ctx context.Context, obj block.ObjectPointer) (bool, er

func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (block.Properties, error) {
var err error
defer reportMetrics("GetProperties", time.Now(), nil, &err)
defer reportMetrics("GetProperties", obj.StorageID, time.Now(), nil, &err)

qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
Expand All @@ -402,7 +402,7 @@ func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (b

func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {
var err error
defer reportMetrics("Remove", time.Now(), nil, &err)
defer reportMetrics("Remove", obj.StorageID, time.Now(), nil, &err)
qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
return err
Expand All @@ -419,7 +419,7 @@ func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {

func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", time.Now(), nil, &err)
defer reportMetrics("Copy", sourceObj.StorageID, time.Now(), nil, &err)

qualifiedDestinationKey, err := resolveBlobURLInfo(destinationObj)
if err != nil {
Expand Down Expand Up @@ -511,7 +511,7 @@ func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.Obje
func (a *Adapter) CreateMultiPartUpload(_ context.Context, obj block.ObjectPointer, _ *http.Request, _ block.CreateMultiPartUploadOpts) (*block.CreateMultiPartUploadResponse, error) {
// Azure has no create multipart upload
var err error
defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err)
defer reportMetrics("CreateMultiPartUpload", obj.StorageID, time.Now(), nil, &err)

qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
Expand All @@ -525,7 +525,7 @@ func (a *Adapter) CreateMultiPartUpload(_ context.Context, obj block.ObjectPoint

func (a *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, _ string, _ int) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadPart", time.Now(), nil, &err)
defer reportMetrics("UploadPart", obj.StorageID, time.Now(), nil, &err)

qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
Expand Down Expand Up @@ -553,14 +553,14 @@ func (a *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int

func (a *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj block.ObjectPointer, _ string, _ int) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadPart", time.Now(), nil, &err)
defer reportMetrics("UploadPart", sourceObj.StorageID, time.Now(), nil, &err)

return a.copyPartRange(ctx, sourceObj, destinationObj, 0, blockblob.CountToEnd)
}

func (a *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj block.ObjectPointer, _ string, _ int, startPosition, endPosition int64) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadPart", time.Now(), nil, &err)
defer reportMetrics("UploadPart", sourceObj.StorageID, time.Now(), nil, &err)
return a.copyPartRange(ctx, sourceObj, destinationObj, startPosition, endPosition-startPosition+1)
}

Expand Down Expand Up @@ -593,7 +593,7 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada

func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectPointer, _ string, multipartList *block.MultipartUploadCompletion) (*block.CompleteMultiPartUploadResponse, error) {
var err error
defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err)
defer reportMetrics("CompleteMultiPartUpload", obj.StorageID, time.Now(), nil, &err)
qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/block/azure/stats.go → pkg/block/azure/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ var durationHistograms = promauto.NewHistogramVec(
Name: "azure_operation_duration_seconds",
Help: "durations of outgoing azure operations",
},
[]string{"operation", "error"})
[]string{"operation", "storage_id", "error"})

var requestSizeHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "azure_operation_size_bytes",
Help: "handled sizes of outgoing azure operations",
Buckets: prometheus.ExponentialBuckets(1, 10, 10), //nolint: mnd
}, []string{"operation", "error"})
}, []string{"operation", "storage_id", "error"})

func reportMetrics(operation string, start time.Time, sizeBytes *int64, err *error) {
func reportMetrics(operation, storageID string, start time.Time, sizeBytes *int64, err *error) {
isErrStr := strconv.FormatBool(*err != nil)
durationHistograms.WithLabelValues(operation, isErrStr).Observe(time.Since(start).Seconds())
durationHistograms.WithLabelValues(operation, storageID, isErrStr).Observe(time.Since(start).Seconds())
if sizeBytes != nil {
requestSizeHistograms.WithLabelValues(operation, isErrStr).Observe(float64(*sizeBytes))
requestSizeHistograms.WithLabelValues(operation, storageID, isErrStr).Observe(float64(*sizeBytes))
}
}
28 changes: 14 additions & 14 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (o *storageObjectHandle) newComposer(a *Adapter, srcs ...*storage.ObjectHan

func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, _ block.PutOpts) (*block.PutResponse, error) {
var err error
defer reportMetrics("Put", time.Now(), &sizeBytes, &err)
defer reportMetrics("Put", obj.StorageID, time.Now(), &sizeBytes, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand All @@ -175,7 +175,7 @@ func (a *Adapter) Put(ctx context.Context, obj block.ObjectPointer, sizeBytes in

func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadCloser, error) {
var err error
defer reportMetrics("Get", time.Now(), nil, &err)
defer reportMetrics("Get", obj.StorageID, time.Now(), nil, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -213,7 +213,7 @@ func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer,
}

var err error
defer reportMetrics("GetPreSignedURL", time.Now(), nil, &err)
defer reportMetrics("GetPreSignedURL", obj.StorageID, time.Now(), nil, &err)

bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func isErrNotFound(err error) bool {

func (a *Adapter) Exists(ctx context.Context, obj block.ObjectPointer) (bool, error) {
var err error
defer reportMetrics("Exists", time.Now(), nil, &err)
defer reportMetrics("Exists", obj.StorageID, time.Now(), nil, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return false, err
Expand All @@ -265,7 +265,7 @@ func (a *Adapter) Exists(ctx context.Context, obj block.ObjectPointer) (bool, er

func (a *Adapter) GetRange(ctx context.Context, obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) {
var err error
defer reportMetrics("GetRange", time.Now(), nil, &err)
defer reportMetrics("GetRange", obj.StorageID, time.Now(), nil, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand All @@ -285,7 +285,7 @@ func (a *Adapter) GetRange(ctx context.Context, obj block.ObjectPointer, startPo

func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (block.Properties, error) {
var err error
defer reportMetrics("GetProperties", time.Now(), nil, &err)
defer reportMetrics("GetProperties", obj.StorageID, time.Now(), nil, &err)
var props block.Properties
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
Expand All @@ -300,7 +300,7 @@ func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (b

func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {
var err error
defer reportMetrics("Remove", time.Now(), nil, &err)
defer reportMetrics("Remove", obj.StorageID, time.Now(), nil, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return err
Expand All @@ -314,7 +314,7 @@ func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {

func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", time.Now(), nil, &err)
defer reportMetrics("Copy", sourceObj.StorageID, time.Now(), nil, &err)
dstBucket, dstKey, err := a.extractParamsFromObj(destinationObj)
if err != nil {
return fmt.Errorf("resolve destination: %w", err)
Expand All @@ -337,7 +337,7 @@ func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.Obje

func (a *Adapter) CreateMultiPartUpload(ctx context.Context, obj block.ObjectPointer, _ *http.Request, _ block.CreateMultiPartUploadOpts) (*block.CreateMultiPartUploadResponse, error) {
var err error
defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err)
defer reportMetrics("CreateMultiPartUpload", obj.StorageID, time.Now(), nil, &err)
bucket, uploadID, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -368,7 +368,7 @@ func (a *Adapter) CreateMultiPartUpload(ctx context.Context, obj block.ObjectPoi

func (a *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadPart", time.Now(), &sizeBytes, &err)
defer reportMetrics("UploadPart", obj.StorageID, time.Now(), &sizeBytes, &err)
bucket, _, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand All @@ -395,7 +395,7 @@ func (a *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, sizeB

func (a *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber int) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadCopyPart", time.Now(), nil, &err)
defer reportMetrics("UploadCopyPart", sourceObj.StorageID, time.Now(), nil, &err)
bucket, _, err := a.extractParamsFromObj(destinationObj)
if err != nil {
return nil, err
Expand All @@ -422,7 +422,7 @@ func (a *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj

func (a *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*block.UploadPartResponse, error) {
var err error
defer reportMetrics("UploadCopyPartRange", time.Now(), nil, &err)
defer reportMetrics("UploadCopyPartRange", sourceObj.StorageID, time.Now(), nil, &err)
bucket, _, err := a.extractParamsFromObj(destinationObj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -460,7 +460,7 @@ func (a *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio

func (a *Adapter) AbortMultiPartUpload(ctx context.Context, obj block.ObjectPointer, uploadID string) error {
var err error
defer reportMetrics("AbortMultiPartUpload", time.Now(), nil, &err)
defer reportMetrics("AbortMultiPartUpload", obj.StorageID, time.Now(), nil, &err)
bucketName, _, err := a.extractParamsFromObj(obj)
if err != nil {
return err
Expand Down Expand Up @@ -489,7 +489,7 @@ func (a *Adapter) AbortMultiPartUpload(ctx context.Context, obj block.ObjectPoin

func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectPointer, uploadID string, multipartList *block.MultipartUploadCompletion) (*block.CompleteMultiPartUploadResponse, error) {
var err error
defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err)
defer reportMetrics("CompleteMultiPartUpload", obj.StorageID, time.Now(), nil, &err)
bucketName, key, err := a.extractParamsFromObj(obj)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/block/gs/stats.go → pkg/block/gs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ var durationHistograms = promauto.NewHistogramVec(
Name: "gs_operation_duration_seconds",
Help: "durations of outgoing gs operations",
},
[]string{"operation", "error"})
[]string{"operation", "storage_id", "error"})

var requestSizeHistograms = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gs_operation_size_bytes",
Help: "handled sizes of outgoing gs operations",
Buckets: prometheus.ExponentialBuckets(1, 10, 10), //nolint: mnd
}, []string{"operation", "error"})
}, []string{"operation", "storage_id", "error"})

func reportMetrics(operation string, start time.Time, sizeBytes *int64, err *error) {
func reportMetrics(operation, storageID string, start time.Time, sizeBytes *int64, err *error) {
isErrStr := strconv.FormatBool(*err != nil)
durationHistograms.WithLabelValues(operation, isErrStr).Observe(time.Since(start).Seconds())
durationHistograms.WithLabelValues(operation, storageID, isErrStr).Observe(time.Since(start).Seconds())
if sizeBytes != nil {
requestSizeHistograms.WithLabelValues(operation, isErrStr).Observe(float64(*sizeBytes))
requestSizeHistograms.WithLabelValues(operation, storageID, isErrStr).Observe(float64(*sizeBytes))
}
}
Loading
Loading