Skip to content

Commit

Permalink
Add Elasticsearch query input plugin (influxdata#3536)
Browse files Browse the repository at this point in the history
  • Loading branch information
lpic10 authored Jun 21, 2021
1 parent 1453c47 commit 81f882a
Show file tree
Hide file tree
Showing 8 changed files with 2,046 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
/vendor
.DS_Store
process.yml
/.vscode
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/dpdk"
_ "github.com/influxdata/telegraf/plugins/inputs/ecs"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch_query"
_ "github.com/influxdata/telegraf/plugins/inputs/ethtool"
_ "github.com/influxdata/telegraf/plugins/inputs/eventhub_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
Expand Down
158 changes: 158 additions & 0 deletions plugins/inputs/elasticsearch_query/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Elasticsearch query input plugin

This [elasticsearch](https://www.elastic.co/) query plugin queries endpoints to obtain metrics from data stored in an Elasticsearch cluster.

The following is supported:

- return number of hits for a search query
- calculate the avg/max/min/sum for a numeric field, filtered by a query, aggregated per tag
- count number of terms for a particular field

## Elasticsearch support

This plugins is tested against Elasticsearch 5.x and 6.x releases.
Currently it is known to break on 7.x or greater versions.

## Configuration

```toml
[[inputs.elasticsearch_query]]
## The full HTTP endpoint URL for your Elasticsearch instance
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.

## Elasticsearch client timeout, defaults to "5s".
# timeout = "5s"

## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
# enable_sniffer = false

## Set the interval to check if the Elasticsearch nodes are available
## This option is only used if enable_sniffer is also set (0s to disable it)
# health_check_interval = "10s"

## HTTP basic authentication details (eg. when using x-pack)
# username = "telegraf"
# password = "mypassword"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

[[inputs.elasticsearch_query.aggregation]]
## measurement name for the results of the aggregation query
measurement_name = "measurement"

## Elasticsearch indexes to query (accept wildcards).
index = "index-*"

## The date/time field in the Elasticsearch index (mandatory).
date_field = "@timestamp"

## Time window to query (eg. "1m" to query documents from last minute).
## Normally should be set to same as collection interval
query_period = "1m"

## Lucene query to filter results
# filter_query = "*"

## Fields to aggregate values (must be numeric fields)
# metric_fields = ["metric"]

## Aggregation function to use on the metric fields
## Must be set if 'metric_fields' is set
## Valid values are: avg, sum, min, max, sum
# metric_function = "avg"

## Fields to be used as tags
## Must be text, non-analyzed fields. Metric aggregations are performed per tag
# tags = ["field.keyword", "field2.keyword"]

## Set to true to not ignore documents when the tag(s) above are missing
# include_missing_tag = false

## String value of the tag when the tag does not exist
## Used when include_missing_tag is true
# missing_tag_value = "null"
```

## Examples

Please note that the `[[inputs.elasticsearch_query]]` is still required for all of the examples below.

### Search the average response time, per URI and per response status code

```toml
[[inputs.elasticsearch_query.aggregation]]
measurement_name = "http_logs"
index = "my-index-*"
filter_query = "*"
metric_fields = ["response_time"]
metric_function = "avg"
tags = ["URI.keyword", "response.keyword"]
include_missing_tag = true
missing_tag_value = "null"
date_field = "@timestamp"
query_period = "1m"
```

### Search the maximum response time per method and per URI

```toml
[[inputs.elasticsearch_query.aggregation]]
measurement_name = "http_logs"
index = "my-index-*"
filter_query = "*"
metric_fields = ["response_time"]
metric_function = "max"
tags = ["method.keyword","URI.keyword"]
include_missing_tag = false
missing_tag_value = "null"
date_field = "@timestamp"
query_period = "1m"
```

### Search number of documents matching a filter query in all indices

```toml
[[inputs.elasticsearch_query.aggregation]]
measurement_name = "http_logs"
index = "*"
filter_query = "product_1 AND HEAD"
query_period = "1m"
date_field = "@timestamp"
```

### Search number of documents matching a filter query, returning per response status code

```toml
[[inputs.elasticsearch_query.aggregation]]
measurement_name = "http_logs"
index = "*"
filter_query = "downloads"
tags = ["response.keyword"]
include_missing_tag = false
date_field = "@timestamp"
query_period = "1m"
```

### Required parameters

- `measurement_name`: The target measurement to be stored the results of the aggregation query.
- `index`: The index name to query on Elasticsearch
- `query_period`: The time window to query (eg. "1m" to query documents from last minute). Normally should be set to same as collection
- `date_field`: The date/time field in the Elasticsearch index

### Optional parameters

- `filter_query`: Lucene query to filter the results (default: "\*")
- `metric_fields`: The list of fields to perform metric aggregation (these must be indexed as numeric fields)
- `metric_funcion`: The single-value metric aggregation function to be performed on the `metric_fields` defined. Currently supported aggregations are "avg", "min", "max", "sum". (see [https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html)
- `tags`: The list of fields to be used as tags (these must be indexed as non-analyzed fields). A "terms aggregation" will be done per tag defined
- `include_missing_tag`: Set to true to not ignore documents where the tag(s) specified above does not exist. (If false, documents without the specified tag field will be ignored in `doc_count` and in the metric aggregation)
- `missing_tag_value`: The value of the tag that will be set for documents in which the tag field does not exist. Only used when `include_missing_tag` is set to `true`.
153 changes: 153 additions & 0 deletions plugins/inputs/elasticsearch_query/aggregation_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package elasticsearch_query

import (
"fmt"

"github.com/influxdata/telegraf"
elastic5 "gopkg.in/olivere/elastic.v5"
)

type resultMetric struct {
name string
fields map[string]interface{}
tags map[string]string
}

func parseSimpleResult(acc telegraf.Accumulator, measurement string, searchResult *elastic5.SearchResult) {
fields := make(map[string]interface{})
tags := make(map[string]string)

fields["doc_count"] = searchResult.Hits.TotalHits

acc.AddFields(measurement, fields, tags)
}

func parseAggregationResult(acc telegraf.Accumulator, aggregationQueryList []aggregationQueryData, searchResult *elastic5.SearchResult) error {
measurements := map[string]map[string]string{}

// organize the aggregation query data by measurement
for _, aggregationQuery := range aggregationQueryList {
if measurements[aggregationQuery.measurement] == nil {
measurements[aggregationQuery.measurement] = map[string]string{
aggregationQuery.name: aggregationQuery.function,
}
} else {
t := measurements[aggregationQuery.measurement]
t[aggregationQuery.name] = aggregationQuery.function
measurements[aggregationQuery.measurement] = t
}
}

// recurse over query aggregation results per measurement
for measurement, aggNameFunction := range measurements {
var m resultMetric

m.fields = make(map[string]interface{})
m.tags = make(map[string]string)
m.name = measurement

_, err := recurseResponse(acc, aggNameFunction, searchResult.Aggregations, m)
if err != nil {
return err
}
}
return nil
}

func recurseResponse(acc telegraf.Accumulator, aggNameFunction map[string]string, bucketResponse elastic5.Aggregations, m resultMetric) (resultMetric, error) {
var err error

aggNames := getAggNames(bucketResponse)
if len(aggNames) == 0 {
// we've reached a single bucket or response without aggregation, nothing here
return m, nil
}

// metrics aggregations response can contain multiple field values, so we iterate over them
for _, aggName := range aggNames {
aggFunction, found := aggNameFunction[aggName]
if !found {
return m, fmt.Errorf("child aggregation function '%s' not found %v", aggName, aggNameFunction)
}

resp := getResponseAggregation(aggFunction, aggName, bucketResponse)
if resp == nil {
return m, fmt.Errorf("child aggregation '%s' not found", aggName)
}

switch resp := resp.(type) {
case *elastic5.AggregationBucketKeyItems:
// we've found a terms aggregation, iterate over the buckets and try to retrieve the inner aggregation values
for _, bucket := range resp.Buckets {
var s string
var ok bool
m.fields["doc_count"] = bucket.DocCount
if s, ok = bucket.Key.(string); !ok {
return m, fmt.Errorf("bucket key is not a string (%s, %s)", aggName, aggFunction)
}
m.tags[aggName] = s

// we need to recurse down through the buckets, as it may contain another terms aggregation
m, err = recurseResponse(acc, aggNameFunction, bucket.Aggregations, m)
if err != nil {
return m, err
}

// if there are fields present after finishing the bucket, it is a complete metric
// store it and clean the fields to start a new metric
if len(m.fields) > 0 {
acc.AddFields(m.name, m.fields, m.tags)
m.fields = make(map[string]interface{})
}

// after finishing the bucket, remove its tag from the tags map
delete(m.tags, aggName)
}

case *elastic5.AggregationValueMetric:
if resp.Value != nil {
m.fields[aggName] = *resp.Value
} else {
m.fields[aggName] = float64(0)
}

default:
return m, fmt.Errorf("aggregation type %T not supported", resp)
}
}

// if there are fields here it comes from a metrics aggregation without a parent terms aggregation
if len(m.fields) > 0 {
acc.AddFields(m.name, m.fields, m.tags)
m.fields = make(map[string]interface{})
}
return m, nil
}

func getResponseAggregation(function string, aggName string, aggs elastic5.Aggregations) (agg interface{}) {
switch function {
case "avg":
agg, _ = aggs.Avg(aggName)
case "sum":
agg, _ = aggs.Sum(aggName)
case "min":
agg, _ = aggs.Min(aggName)
case "max":
agg, _ = aggs.Max(aggName)
case "terms":
agg, _ = aggs.Terms(aggName)
}

return agg
}

// getAggNames returns the aggregation names from a response aggregation
func getAggNames(agg elastic5.Aggregations) (aggs []string) {
for k := range agg {
if (k != "key") && (k != "doc_count") {
aggs = append(aggs, k)
}
}

return aggs
}
Loading

0 comments on commit 81f882a

Please sign in to comment.