Skip to content

Commit 939f633

Browse files
tac-emil-andresenrtkkroland
authored andcommitted
Add metrics to auto scale based on indexing pressure (prometheus-community#904)
* Add metrics indexing_pressure.memory.limit_in_bytes and indexing_pressure.memory.current.current.all_in_bytes to allow auto-scaling based on how close the cluster nodes are to dropping indexing requests due to the indxing request memory buffer reaching capacity. Signed-off-by: emilandresentac <[email protected]> * Reduce labels per metric for indexing pressure metrics to cluster, node, and name to save on storage space. Signed-off-by: emilandresentac <[email protected]> --------- Signed-off-by: emilandresentac <[email protected]>
1 parent 6320fc0 commit 939f633

File tree

2 files changed

+90
-17
lines changed

2 files changed

+90
-17
lines changed

collector/nodes.go

+61
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ var (
9696
defaultRoleLabels = []string{"cluster", "host", "name"}
9797
defaultThreadPoolLabels = append(defaultNodeLabels, "type")
9898
defaultBreakerLabels = append(defaultNodeLabels, "breaker")
99+
defaultIndexingPressureLabels = []string{"cluster", "host", "name", "indexing_pressure"}
99100
defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path")
100101
defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device")
101102
defaultCacheLabels = append(defaultNodeLabels, "cache")
@@ -150,6 +151,13 @@ type breakerMetric struct {
150151
Labels func(cluster string, node NodeStatsNodeResponse, breaker string) []string
151152
}
152153

154+
type indexingPressureMetric struct {
155+
Type prometheus.ValueType
156+
Desc *prometheus.Desc
157+
Value func(indexingPressureStats NodeStatsIndexingPressureResponse) float64
158+
Labels func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string
159+
}
160+
153161
type threadPoolMetric struct {
154162
Type prometheus.ValueType
155163
Desc *prometheus.Desc
@@ -185,6 +193,7 @@ type Nodes struct {
185193
nodeMetrics []*nodeMetric
186194
gcCollectionMetrics []*gcCollectionMetric
187195
breakerMetrics []*breakerMetric
196+
indexingPressureMetrics []*indexingPressureMetric
188197
threadPoolMetrics []*threadPoolMetric
189198
filesystemDataMetrics []*filesystemDataMetric
190199
filesystemIODeviceMetrics []*filesystemIODeviceMetric
@@ -1607,6 +1616,46 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no
16071616
},
16081617
},
16091618
},
1619+
indexingPressureMetrics: []*indexingPressureMetric{
1620+
{
1621+
Type: prometheus.GaugeValue,
1622+
Desc: prometheus.NewDesc(
1623+
prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"),
1624+
"Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.",
1625+
defaultIndexingPressureLabels, nil,
1626+
),
1627+
Value: func(indexingPressureMem NodeStatsIndexingPressureResponse) float64 {
1628+
return float64(indexingPressureMem.Current.AllInBytes)
1629+
},
1630+
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
1631+
return []string{
1632+
cluster,
1633+
node.Host,
1634+
node.Name,
1635+
indexingPressure,
1636+
}
1637+
},
1638+
},
1639+
{
1640+
Type: prometheus.GaugeValue,
1641+
Desc: prometheus.NewDesc(
1642+
prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"),
1643+
"Configured memory limit, in bytes, for the indexing requests",
1644+
defaultIndexingPressureLabels, nil,
1645+
),
1646+
Value: func(indexingPressureStats NodeStatsIndexingPressureResponse) float64 {
1647+
return float64(indexingPressureStats.LimitInBytes)
1648+
},
1649+
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
1650+
return []string{
1651+
cluster,
1652+
node.Host,
1653+
node.Name,
1654+
indexingPressure,
1655+
}
1656+
},
1657+
},
1658+
},
16101659
threadPoolMetrics: []*threadPoolMetric{
16111660
{
16121661
Type: prometheus.CounterValue,
@@ -1919,6 +1968,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) {
19191968
}
19201969
}
19211970

1971+
// Indexing Pressure stats
1972+
for indexingPressure, ipstats := range node.IndexingPressure {
1973+
for _, metric := range c.indexingPressureMetrics {
1974+
ch <- prometheus.MustNewConstMetric(
1975+
metric.Desc,
1976+
metric.Type,
1977+
metric.Value(ipstats),
1978+
metric.Labels(nodeStatsResp.ClusterName, node, indexingPressure)...,
1979+
)
1980+
}
1981+
}
1982+
19221983
// Thread Pool stats
19231984
for pool, pstats := range node.ThreadPool {
19241985
for _, metric := range c.threadPoolMetrics {

collector/nodes_response.go

+29-17
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,24 @@ type nodeStatsResponse struct {
2323

2424
// NodeStatsNodeResponse defines node stats information structure for nodes
2525
type NodeStatsNodeResponse struct {
26-
Name string `json:"name"`
27-
Host string `json:"host"`
28-
Timestamp int64 `json:"timestamp"`
29-
TransportAddress string `json:"transport_address"`
30-
Hostname string `json:"hostname"`
31-
Roles []string `json:"roles"`
32-
Attributes map[string]string `json:"attributes"`
33-
Indices NodeStatsIndicesResponse `json:"indices"`
34-
OS NodeStatsOSResponse `json:"os"`
35-
Network NodeStatsNetworkResponse `json:"network"`
36-
FS NodeStatsFSResponse `json:"fs"`
37-
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
38-
JVM NodeStatsJVMResponse `json:"jvm"`
39-
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
40-
HTTP map[string]interface{} `json:"http"`
41-
Transport NodeStatsTransportResponse `json:"transport"`
42-
Process NodeStatsProcessResponse `json:"process"`
26+
Name string `json:"name"`
27+
Host string `json:"host"`
28+
Timestamp int64 `json:"timestamp"`
29+
TransportAddress string `json:"transport_address"`
30+
Hostname string `json:"hostname"`
31+
Roles []string `json:"roles"`
32+
Attributes map[string]string `json:"attributes"`
33+
Indices NodeStatsIndicesResponse `json:"indices"`
34+
OS NodeStatsOSResponse `json:"os"`
35+
Network NodeStatsNetworkResponse `json:"network"`
36+
FS NodeStatsFSResponse `json:"fs"`
37+
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
38+
JVM NodeStatsJVMResponse `json:"jvm"`
39+
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
40+
HTTP map[string]interface{} `json:"http"`
41+
Transport NodeStatsTransportResponse `json:"transport"`
42+
Process NodeStatsProcessResponse `json:"process"`
43+
IndexingPressure map[string]NodeStatsIndexingPressureResponse `json:"indexing_pressure"`
4344
}
4445

4546
// NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker
@@ -50,6 +51,17 @@ type NodeStatsBreakersResponse struct {
5051
Tripped int64 `json:"tripped"`
5152
}
5253

54+
// NodeStatsIndexingPressureResponse is a representation of a elasticsearch indexing pressure
55+
type NodeStatsIndexingPressureResponse struct {
56+
Current NodeStatsIndexingPressureCurrentResponse `json:"current"`
57+
LimitInBytes int64 `json:"limit_in_bytes"`
58+
}
59+
60+
// NodeStatsIndexingPressureMemoryCurrentResponse is a representation of a elasticsearch indexing pressure current memory usage
61+
type NodeStatsIndexingPressureCurrentResponse struct {
62+
AllInBytes int64 `json:"all_in_bytes"`
63+
}
64+
5365
// NodeStatsJVMResponse is a representation of a JVM stats, memory pool information, garbage collection, buffer pools, number of loaded/unloaded classes
5466
type NodeStatsJVMResponse struct {
5567
BufferPools map[string]NodeStatsJVMBufferPoolResponse `json:"buffer_pools"`

0 commit comments

Comments
 (0)