From 780fbfecb2ac26b4eb8ed46867a8a5118e522b14 Mon Sep 17 00:00:00 2001 From: Thomas Schuetz <38893055+thschue@users.noreply.github.com> Date: Wed, 12 Aug 2020 17:51:53 +0200 Subject: [PATCH] Dynatrace output plugin (#7881) --- .gitignore | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/dynatrace/README.md | 24 ++ plugins/outputs/dynatrace/dynatrace.go | 274 ++++++++++++++++ plugins/outputs/dynatrace/dynatrace_test.go | 331 ++++++++++++++++++++ 5 files changed, 631 insertions(+) create mode 100644 plugins/outputs/dynatrace/README.md create mode 100644 plugins/outputs/dynatrace/dynatrace.go create mode 100644 plugins/outputs/dynatrace/dynatrace_test.go diff --git a/.gitignore b/.gitignore index a32255fbb4da7..df2b3d06643c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +/.idea /build /telegraf /telegraf.exe diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index c7f28bdb91e19..3a2813f4e1995 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -10,6 +10,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" + _ "github.com/influxdata/telegraf/plugins/outputs/dynatrace" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/outputs/exec" _ "github.com/influxdata/telegraf/plugins/outputs/execd" diff --git a/plugins/outputs/dynatrace/README.md b/plugins/outputs/dynatrace/README.md new file mode 100644 index 0000000000000..203d572679956 --- /dev/null +++ b/plugins/outputs/dynatrace/README.md @@ -0,0 +1,24 @@ +# Dynatrace Output Plugin + +This plugin writes telegraf metrics to a Dynatrace environment. + +An API token is necessary, which can be obtained in your Dynatrace environment. Navigate to **Dynatrace > Settings > Integration > Dynatrace API** and create a new token with +'Data ingest' access scope enabled. + +Telegraf measurements which can't be converted to a float64 are skipped. + +Metrics fields are added to the measurement name by using '.' in the metric name. + +### Configuration + +```toml +[[outputs.dynatrace]] + ## Dynatrace environment URL (e.g.: https://YOUR_DOMAIN/api/v2/metrics/ingest) or use the local ingest endpoint of your OneAgent monitored host (e.g.: http://127.0.0.1:14499/metrics/ingest). + environmentURL = "" + environmentApiToken = "" + ## Optional prefix for metric names (e.g.: "telegraf.") + prefix = "telegraf." + ## Flag for skipping the tls certificate check, just for testing purposes, should be false by default + skipCertificateCheck = false + +``` \ No newline at end of file diff --git a/plugins/outputs/dynatrace/dynatrace.go b/plugins/outputs/dynatrace/dynatrace.go new file mode 100644 index 0000000000000..4796d1d8fc444 --- /dev/null +++ b/plugins/outputs/dynatrace/dynatrace.go @@ -0,0 +1,274 @@ +package dynatrace + +import ( + "bytes" + "fmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "io/ioutil" + "math" + "net/http" + "regexp" + "sort" + "strconv" + "strings" + "time" +) + +const ( + oneAgentMetricsUrl = "http://127.0.0.1:14499/metrics/ingest" +) + +var ( + reNameAllowedCharList = regexp.MustCompile("[^A-Za-z0-9.]+") + maxDimKeyLen = 100 + maxMetricKeyLen = 250 +) + +// Dynatrace Configuration for the Dynatrace output plugin +type Dynatrace struct { + URL string `toml:"url"` + APIToken string `toml:"api_token"` + InsecureSkipVerify bool `toml:"insecure_skip_verify"` + Prefix string `toml:"prefix"` + Log telegraf.Logger `toml:"-"` + Timeout internal.Duration `toml:"timeout"` + + tls.ClientConfig + + client *http.Client +} + +const sampleConfig = ` + ## For usage with the Dynatrace OneAgent you can omit any configuration, + ## the only requirement is that the OneAgent is running on the same host. + ## Only setup environment url and token if you want to monitor a Host without the OneAgent present. + ## + ## Your Dynatrace environment URL. + ## For Dynatrace OneAgent you can leave this empty or set it to "http://127.0.0.1:14499/metrics/ingest" (default) + ## For Dynatrace SaaS environments the URL scheme is "https://{your-environment-id}.live.dynatrace.com/api/v2/metrics/ingest" + ## For Dynatrace Managed environments the URL scheme is "https://{your-domain}/e/{your-environment-id}/api/v2/metrics/ingest" + url = "" + + ## Your Dynatrace API token. + ## Create an API token within your Dynatrace environment, by navigating to Settings > Integration > Dynatrace API + ## The API token needs data ingest scope permission. When using OneAgent, no API token is required. + api_token = "" + + ## Optional prefix for metric names (e.g.: "telegraf.") + prefix = "telegraf." + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Optional flag for ignoring tls certificate check + # insecure_skip_verify = false + + + ## Connection timeout, defaults to "5s" if not set. + timeout = "5s" +` + +// Connect Connects the Dynatrace output plugin to the Telegraf stream +func (d *Dynatrace) Connect() error { + return nil +} + +// Close Closes the Dynatrace output plugin +func (d *Dynatrace) Close() error { + d.client = nil + return nil +} + +// SampleConfig Returns a sample configuration for the Dynatrace output plugin +func (d *Dynatrace) SampleConfig() string { + return sampleConfig +} + +// Description returns the description for the Dynatrace output plugin +func (d *Dynatrace) Description() string { + return "Send telegraf metrics to a Dynatrace environment" +} + +// Normalizes a metric keys or metric dimension identifiers +// according to Dynatrace format. +func (d *Dynatrace) normalize(s string, max int) (string, error) { + s = reNameAllowedCharList.ReplaceAllString(s, "_") + + // Strip Digits and underscores if they are at the beginning of the string + normalizedString := strings.TrimLeft(s, "_0123456789") + + for strings.HasPrefix(normalizedString, "_") { + normalizedString = normalizedString[1:] + } + + if len(normalizedString) > max { + normalizedString = normalizedString[:max] + } + + for strings.HasSuffix(normalizedString, "_") { + normalizedString = normalizedString[:len(normalizedString)-1] + } + + if len(normalizedString) == 0 { + return "", fmt.Errorf("error normalizing the string: %s", s) + } + return normalizedString, nil +} + +func (d *Dynatrace) escape(v string) string { + return strconv.Quote(v) +} + +func (d *Dynatrace) Write(metrics []telegraf.Metric) error { + var buf bytes.Buffer + var tagb bytes.Buffer + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + // first write the tags into a buffer + tagb.Reset() + if len(metric.Tags()) > 0 { + keys := make([]string, 0, len(metric.Tags())) + for k := range metric.Tags() { + keys = append(keys, k) + } + // sort tag keys to expect the same order in ech run + sort.Strings(keys) + + for _, k := range keys { + tagKey, err := d.normalize(k, maxDimKeyLen) + if err != nil { + continue + } + fmt.Fprintf(&tagb, ",%s=%s", strings.ToLower(tagKey), d.escape(metric.Tags()[k])) + + } + } + if len(metric.Fields()) > 0 { + for k, v := range metric.Fields() { + var value string + switch v := v.(type) { + case string: + continue + case float64: + if !math.IsNaN(v) && !math.IsInf(v, 0) { + value = fmt.Sprintf("%f", v) + } else { + continue + } + case uint64: + value = strconv.FormatUint(v, 10) + case int64: + value = strconv.FormatInt(v, 10) + case bool: + if v { + value = "1" + } else { + value = "0" + } + default: + d.Log.Debugf("Dynatrace type not supported! %s", v) + continue + } + + // metric name + metricKey, err := d.normalize(k, maxMetricKeyLen) + if err != nil { + continue + } + + metricID, err := d.normalize(d.Prefix+metric.Name()+"."+metricKey, maxMetricKeyLen) + // write metric name combined with its field + if err != nil { + continue + } + fmt.Fprintf(&buf, "%s", metricID) + // add the tag string + fmt.Fprintf(&buf, "%s", tagb.String()) + + // write measured value + fmt.Fprintf(&buf, " %v\n", value) + } + } + } + + return d.send(buf.Bytes()) +} + +func (d *Dynatrace) send(msg []byte) error { + var err error + req, err := http.NewRequest("POST", d.URL, bytes.NewBuffer(msg)) + if err != nil { + d.Log.Errorf("Dynatrace error: %s", err.Error()) + return fmt.Errorf("Dynatrace error while creating HTTP request:, %s", err.Error()) + } + req.Header.Add("Content-Type", "text/plain; charset=UTF-8") + + if len(d.APIToken) != 0 { + req.Header.Add("Authorization", "Api-Token "+d.APIToken) + } + // add user-agent header to identify metric source + req.Header.Add("User-Agent", "telegraf") + + resp, err := d.client.Do(req) + if err != nil { + d.Log.Errorf("Dynatrace error: %s", err.Error()) + fmt.Println(req) + return fmt.Errorf("Dynatrace error while sending HTTP request:, %s", err.Error()) + } + defer resp.Body.Close() + + // print metric line results as info log + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted { + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + d.Log.Errorf("Dynatrace error reading response") + } + bodyString := string(bodyBytes) + d.Log.Debugf("Dynatrace returned: %s", bodyString) + } else { + return fmt.Errorf("Dynatrace request failed with response code:, %d", resp.StatusCode) + } + + return nil +} + +func (d *Dynatrace) Init() error { + if len(d.URL) == 0 { + d.Log.Infof("Dynatrace URL is empty, defaulting to OneAgent metrics interface") + d.URL = oneAgentMetricsUrl + } + if d.URL != oneAgentMetricsUrl && len(d.APIToken) == 0 { + d.Log.Errorf("Dynatrace api_token is a required field for Dynatrace output") + return fmt.Errorf("api_token is a required field for Dynatrace output") + } + + tlsCfg, err := d.ClientConfig.TLSConfig() + if err != nil { + return err + } + + d.client = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + }, + Timeout: d.Timeout.Duration, + } + return nil +} + +func init() { + outputs.Add("dynatrace", func() telegraf.Output { + return &Dynatrace{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/outputs/dynatrace/dynatrace_test.go b/plugins/outputs/dynatrace/dynatrace_test.go new file mode 100644 index 0000000000000..cf6549c72ff11 --- /dev/null +++ b/plugins/outputs/dynatrace/dynatrace_test.go @@ -0,0 +1,331 @@ +package dynatrace + +import ( + "encoding/json" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNilMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + + err = d.Connect() + require.NoError(t, err) + + err = d.Write(nil) + require.NoError(t, err) +} + +func TestEmptyMetricsSlice(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + + err := d.Init() + require.NoError(t, err) + + err = d.Connect() + require.NoError(t, err) + empty := []telegraf.Metric{} + err = d.Write(empty) + require.NoError(t, err) +} + +func TestMockURL(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + err = d.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestMissingURL(t *testing.T) { + d := &Dynatrace{} + + d.Log = testutil.Logger{} + err := d.Init() + require.Equal(t, oneAgentMetricsUrl, d.URL) + err = d.Connect() + require.Equal(t, oneAgentMetricsUrl, d.URL) + require.NoError(t, err) +} + +func TestMissingAPITokenMissingURL(t *testing.T) { + d := &Dynatrace{} + + d.Log = testutil.Logger{} + err := d.Init() + require.Equal(t, oneAgentMetricsUrl, d.URL) + err = d.Connect() + require.Equal(t, oneAgentMetricsUrl, d.URL) + require.NoError(t, err) +} + +func TestMissingAPIToken(t *testing.T) { + d := &Dynatrace{} + + d.URL = "test" + d.Log = testutil.Logger{} + err := d.Init() + require.Error(t, err) +} + +func TestSendMetric(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the encoded result + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + require.NoError(t, err) + } + bodyString := string(bodyBytes) + expected := "mymeasurement.myfield,host=\"192.168.0.1\",nix=\"nix\" 3.140000\nmymeasurement.value,host=\"192.168.0.1\" 3.140000\n" + if bodyString != expected { + t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"linesOk":10,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + + // Init metrics + + m1, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1", "nix": "nix"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + m2, _ := metric.New( + "mymeasurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics := []telegraf.Metric{m1, m2} + + err = d.Write(metrics) + require.NoError(t, err) +} + +func TestSendSingleMetricWithUnorderedTags(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // check the encoded result + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + require.NoError(t, err) + } + bodyString := string(bodyBytes) + expected := "mymeasurement.myfield,a=\"test\",b=\"test\",c=\"test\" 3.140000\n" + if bodyString != expected { + t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + + // Init metrics + + m1, _ := metric.New( + "mymeasurement", + map[string]string{"a": "test", "c": "test", "b": "test"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics := []telegraf.Metric{m1} + + err = d.Write(metrics) + require.NoError(t, err) +} + +func TestSendMetricWithoutTags(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // check the encoded result + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + require.NoError(t, err) + } + bodyString := string(bodyBytes) + expected := "mymeasurement.myfield 3.140000\n" + if bodyString != expected { + t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString) + } + json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + + // Init metrics + + m1, _ := metric.New( + "mymeasurement", + map[string]string{}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics := []telegraf.Metric{m1} + + err = d.Write(metrics) + require.NoError(t, err) +} + +func TestSendMetricWithUpperCaseTagKeys(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // check the encoded result + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + require.NoError(t, err) + } + bodyString := string(bodyBytes) + expected := "mymeasurement.myfield,aaa=\"test\",b_b=\"test\",ccc=\"test\" 3.140000\n" + if bodyString != expected { + t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString) + } + json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + + // Init metrics + + m1, _ := metric.New( + "mymeasurement", + map[string]string{"AAA": "test", "CcC": "test", "B B": "test"}, + map[string]interface{}{"myfield": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics := []telegraf.Metric{m1} + + err = d.Write(metrics) + require.NoError(t, err) +} + +func TestSendBooleanMetricWithoutTags(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // check the encoded result + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + require.NoError(t, err) + } + bodyString := string(bodyBytes) + expected := "mymeasurement.myfield 1\n" + if bodyString != expected { + t.Errorf("Metric encoding failed. expected: %s but got: %s", expected, bodyString) + } + json.NewEncoder(w).Encode(`{"linesOk":1,"linesInvalid":0,"error":null}`) + })) + defer ts.Close() + + d := &Dynatrace{} + + d.URL = ts.URL + d.APIToken = "123" + d.Log = testutil.Logger{} + err := d.Init() + require.NoError(t, err) + err = d.Connect() + require.NoError(t, err) + + // Init metrics + + m1, _ := metric.New( + "mymeasurement", + map[string]string{}, + map[string]interface{}{"myfield": bool(true)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + metrics := []telegraf.Metric{m1} + + err = d.Write(metrics) + require.NoError(t, err) +}