Skip to content

Commit d3c6964

Browse files
authored
Add metric for number of series with discarded samples (#6996)
* Add metric for number of series with discarded samples Signed-off-by: Essam Eldaly <[email protected]> * Add cleanup for metric Signed-off-by: Essam Eldaly <[email protected]> * Update changelog Signed-off-by: Essam Eldaly <[email protected]> * Rename metric to include per labelset Signed-off-by: Essam Eldaly <[email protected]> * Rename variables and help text Signed-off-by: Essam Eldaly <[email protected]> * Test fix Signed-off-by: Essam Eldaly <[email protected]> * Seperate and fix per labelset tracker Signed-off-by: Essam Eldaly <[email protected]> * Add per labelset tests Signed-off-by: Essam Eldaly <[email protected]> * Update changelog and remove unneeded async calls Signed-off-by: Essam Eldaly <[email protected]> --------- Signed-off-by: Essam Eldaly <[email protected]>
1 parent 4fee6f1 commit d3c6964

File tree

7 files changed

+546
-0
lines changed

7 files changed

+546
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
7878
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
7979
* [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947
80+
* [ENHANCEMENT] Add new metric `cortex_discarded_series` and `cortex_discarded_series_per_labelset` to track number of series that have a discarded sample. #6995
8081
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
8182
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
8283
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/ingester/ingester.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,22 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12591259
switch cause := errors.Cause(err); {
12601260
case errors.Is(cause, storage.ErrOutOfBounds):
12611261
sampleOutOfBoundsCount++
1262+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash())
12621263
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12631264

12641265
case errors.Is(cause, storage.ErrOutOfOrderSample):
12651266
sampleOutOfOrderCount++
1267+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash())
12661268
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12671269

12681270
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
12691271
newValueForTimestampCount++
1272+
i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash())
12701273
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12711274

12721275
case errors.Is(cause, storage.ErrTooOldSample):
12731276
sampleTooOldCount++
1277+
i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash())
12741278
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
12751279

12761280
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
12771281
perUserSeriesLimitCount++
1282+
i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash())
12781283
updateFirstPartial(func() error {
12791284
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
12801285
})
@@ -1287,12 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12871292

12881293
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
12891294
perMetricSeriesLimitCount++
1295+
i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash())
12901296
updateFirstPartial(func() error {
12911297
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels))
12921298
})
12931299

12941300
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
12951301
perLabelSetSeriesLimitCount++
1302+
i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash())
1303+
for _, matchedLabelset := range matchedLabelSetLimits {
1304+
i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id)
1305+
}
12961306
// We only track per labelset discarded samples for throttling by labelset limit.
12971307
reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit)
12981308
updateFirstPartial(func() error {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package discardedseries
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
// TODO: if we change per labelset series limit from one reasoning to many, we can remove the hardcoded reasoning and add an extra reasoning map
11+
const (
12+
perLabelsetSeriesLimit = "per_labelset_series_limit"
13+
)
14+
15+
type labelsetCounterStruct struct {
16+
*sync.RWMutex
17+
labelsetSeriesMap map[uint64]*seriesCounterStruct
18+
}
19+
20+
type DiscardedSeriesPerLabelsetTracker struct {
21+
*sync.RWMutex
22+
userLabelsetMap map[string]*labelsetCounterStruct
23+
discardedSeriesPerLabelsetGauge *prometheus.GaugeVec
24+
}
25+
26+
func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker {
27+
tracker := &DiscardedSeriesPerLabelsetTracker{
28+
RWMutex: &sync.RWMutex{},
29+
userLabelsetMap: make(map[string]*labelsetCounterStruct),
30+
discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge,
31+
}
32+
return tracker
33+
}
34+
35+
func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) {
36+
t.RLock()
37+
labelsetCounter, ok := t.userLabelsetMap[user]
38+
t.RUnlock()
39+
if !ok {
40+
t.Lock()
41+
labelsetCounter, ok = t.userLabelsetMap[user]
42+
if !ok {
43+
labelsetCounter = &labelsetCounterStruct{
44+
RWMutex: &sync.RWMutex{},
45+
labelsetSeriesMap: make(map[uint64]*seriesCounterStruct),
46+
}
47+
t.userLabelsetMap[user] = labelsetCounter
48+
}
49+
t.Unlock()
50+
}
51+
52+
labelsetCounter.RLock()
53+
seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
54+
labelsetCounter.RUnlock()
55+
if !ok {
56+
labelsetCounter.Lock()
57+
seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
58+
if !ok {
59+
seriesCounter = &seriesCounterStruct{
60+
RWMutex: &sync.RWMutex{},
61+
seriesCountMap: make(map[uint64]struct{}),
62+
labelsetId: matchedLabelsetId,
63+
}
64+
labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter
65+
}
66+
labelsetCounter.Unlock()
67+
}
68+
69+
seriesCounter.RLock()
70+
_, ok = seriesCounter.seriesCountMap[series]
71+
seriesCounter.RUnlock()
72+
if !ok {
73+
seriesCounter.Lock()
74+
_, ok = seriesCounter.seriesCountMap[series]
75+
if !ok {
76+
seriesCounter.seriesCountMap[series] = struct{}{}
77+
}
78+
seriesCounter.Unlock()
79+
}
80+
}
81+
82+
func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() {
83+
usersToDelete := make([]string, 0)
84+
labelsetsToDelete := make([]uint64, 0)
85+
t.RLock()
86+
for user, labelsetCounter := range t.userLabelsetMap {
87+
labelsetCounter.RLock()
88+
if len(labelsetCounter.labelsetSeriesMap) == 0 {
89+
usersToDelete = append(usersToDelete, user)
90+
}
91+
for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap {
92+
seriesCounter.Lock()
93+
count := len(seriesCounter.seriesCountMap)
94+
t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count))
95+
clear(seriesCounter.seriesCountMap)
96+
if count == 0 {
97+
labelsetsToDelete = append(labelsetsToDelete, labelsetHash)
98+
}
99+
seriesCounter.Unlock()
100+
}
101+
labelsetCounter.RUnlock()
102+
if len(labelsetsToDelete) > 0 {
103+
labelsetCounter.Lock()
104+
for _, labelsetHash := range labelsetsToDelete {
105+
if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok {
106+
labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId
107+
t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId)
108+
delete(labelsetCounter.labelsetSeriesMap, labelsetHash)
109+
}
110+
}
111+
labelsetCounter.Unlock()
112+
}
113+
}
114+
t.RUnlock()
115+
if len(usersToDelete) > 0 {
116+
t.Lock()
117+
for _, user := range usersToDelete {
118+
delete(t.userLabelsetMap, user)
119+
}
120+
t.Unlock()
121+
}
122+
}
123+
124+
func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() {
125+
go func() {
126+
ticker := time.NewTicker(vendMetricsInterval)
127+
for range ticker.C {
128+
t.UpdateMetrics()
129+
}
130+
}()
131+
}
132+
133+
// only used in testing
134+
func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int {
135+
if labelsetCounter, ok := t.userLabelsetMap[user]; ok {
136+
if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok {
137+
return len(seriesCounter.seriesCountMap)
138+
}
139+
}
140+
return 0
141+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package discardedseries
2+
3+
import (
4+
"testing"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/testutil"
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) {
13+
gauge := prometheus.NewGaugeVec(
14+
prometheus.GaugeOpts{
15+
Name: "cortex_discarded_series_per_labelset",
16+
Help: "The number of series that include discarded samples for each labelset.",
17+
},
18+
[]string{"reason", "user", "labelset"},
19+
)
20+
21+
tracker := NewDiscardedSeriesPerLabelsetTracker(gauge)
22+
user1 := "user1"
23+
user2 := "user2"
24+
series1 := labels.FromStrings("__name__", "1")
25+
series2 := labels.FromStrings("__name__", "2")
26+
labelset1 := uint64(10)
27+
labelset2 := uint64(20)
28+
labelset3 := uint64(30)
29+
labelsetId1 := "ten"
30+
labelsetId2 := "twenty"
31+
labelsetId3 := "thirty"
32+
33+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
34+
tracker.Track(user1, series1.Hash(), labelset2, labelsetId2)
35+
36+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
37+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
38+
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
39+
tracker.Track(user2, series2.Hash(), labelset1, labelsetId1)
40+
41+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
42+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1)
43+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
44+
45+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
46+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
47+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
48+
49+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2)
50+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
51+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
52+
53+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
54+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
55+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
56+
57+
tracker.UpdateMetrics()
58+
59+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
60+
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
61+
62+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
63+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
64+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
65+
66+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
67+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1)
68+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
69+
70+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
71+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
72+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
73+
74+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2)
75+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
76+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
77+
78+
tracker.UpdateMetrics()
79+
80+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
81+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
82+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
83+
84+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
85+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
86+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
87+
88+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
89+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
90+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
91+
92+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
93+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
94+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
95+
96+
tracker.UpdateMetrics()
97+
98+
require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
99+
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
100+
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)
101+
102+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
103+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
104+
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)
105+
106+
require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
107+
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
108+
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)
109+
110+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
111+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
112+
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
113+
}
114+
115+
func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) {
116+
gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId)
117+
require.Equal(t, testutil.ToFloat64(gauge), float64(val))
118+
}

0 commit comments

Comments
 (0)