diff --git a/internal/models/running_output.go b/internal/models/running_output.go index 752cf34ef127d..c48bccd3c018e 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -210,6 +210,7 @@ func (ro *RunningOutput) WriteBatch() error { return nil } +// Close closes the output func (r *RunningOutput) Close() { err := r.Output.Close() if err != nil { diff --git a/plugins/inputs/ecs/client.go b/plugins/inputs/ecs/client.go index d1d92f097c921..93074ad79c878 100644 --- a/plugins/inputs/ecs/client.go +++ b/plugins/inputs/ecs/client.go @@ -1,6 +1,9 @@ package ecs import ( + "fmt" + "io" + "io/ioutil" "net/http" "net/url" "time" @@ -50,10 +53,16 @@ func (c *EcsClient) Task() (*Task, error) { req, _ := http.NewRequest("GET", c.taskURL, nil) resp, err := c.client.Do(req) - if err != nil { return nil, err } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) + return nil, fmt.Errorf("%s returned HTTP status %s: %q", c.taskURL, resp.Status, body) + } task, err := unmarshalTask(resp.Body) if err != nil { @@ -71,11 +80,18 @@ func (c *EcsClient) ContainerStats() (map[string]types.StatsJSON, error) { req, _ := http.NewRequest("GET", c.statsURL, nil) resp, err := c.client.Do(req) - if err != nil { return map[string]types.StatsJSON{}, err } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) + return nil, fmt.Errorf("%s returned HTTP status %s: %q", c.statsURL, resp.Status, body) + } + statsMap, err := unmarshalStats(resp.Body) if err != nil { return map[string]types.StatsJSON{}, err diff --git a/plugins/inputs/ecs/client_test.go b/plugins/inputs/ecs/client_test.go index d6fbd1165d49b..6532e5d51d0b0 100644 --- a/plugins/inputs/ecs/client_test.go +++ b/plugins/inputs/ecs/client_test.go @@ -107,7 +107,8 @@ func TestEcsClient_Task(t *testing.T) { client: mockDo{ do: func(req *http.Request) (*http.Response, error) { return &http.Response{ - Body: ioutil.NopCloser(rc), + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(rc), }, nil }, }, @@ -123,11 +124,24 @@ func TestEcsClient_Task(t *testing.T) { wantErr: true, }, { - name: "malformed resp", + name: "malformed 500 resp", client: mockDo{ do: func(req *http.Request) (*http.Response, error) { return &http.Response{ - Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), + StatusCode: http.StatusInternalServerError, + Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), + }, nil + }, + }, + wantErr: true, + }, + { + name: "malformed 200 resp", + client: mockDo{ + do: func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), }, nil }, }, @@ -164,7 +178,8 @@ func TestEcsClient_ContainerStats(t *testing.T) { client: mockDo{ do: func(req *http.Request) (*http.Response, error) { return &http.Response{ - Body: ioutil.NopCloser(rc), + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(rc), }, nil }, }, @@ -181,17 +196,31 @@ func TestEcsClient_ContainerStats(t *testing.T) { wantErr: true, }, { - name: "malformed resp", + name: "malformed 200 resp", client: mockDo{ do: func(req *http.Request) (*http.Response, error) { return &http.Response{ - Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), }, nil }, }, want: map[string]types.StatsJSON{}, wantErr: true, }, + { + name: "malformed 500 resp", + client: mockDo{ + do: func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: ioutil.NopCloser(bytes.NewReader([]byte("foo"))), + }, nil + }, + }, + want: nil, + wantErr: true, + }, } for _, tt := range tests { diff --git a/plugins/inputs/fibaro/fibaro.go b/plugins/inputs/fibaro/fibaro.go index 187b74a504306..492feaf036e45 100644 --- a/plugins/inputs/fibaro/fibaro.go +++ b/plugins/inputs/fibaro/fibaro.go @@ -97,6 +97,7 @@ func (f *Fibaro) getJSON(path string, dataStruct interface{}) error { if err != nil { return err } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)", @@ -108,8 +109,6 @@ func (f *Fibaro) getJSON(path string, dataStruct interface{}) error { return err } - defer resp.Body.Close() - dec := json.NewDecoder(resp.Body) err = dec.Decode(&dataStruct) if err != nil { diff --git a/plugins/inputs/haproxy/haproxy.go b/plugins/inputs/haproxy/haproxy.go index 9c22acad9ef1b..7179540d72e21 100644 --- a/plugins/inputs/haproxy/haproxy.go +++ b/plugins/inputs/haproxy/haproxy.go @@ -181,6 +181,7 @@ func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error { if err != nil { return fmt.Errorf("Unable to connect to haproxy server '%s': %s", addr, err) } + defer res.Body.Close() if res.StatusCode != 200 { return fmt.Errorf("Unable to get valid stat result from '%s', http response code : %d", addr, res.StatusCode) diff --git a/plugins/inputs/kibana/kibana.go b/plugins/inputs/kibana/kibana.go index 4b7e3c5c5e27a..8589224517332 100644 --- a/plugins/inputs/kibana/kibana.go +++ b/plugins/inputs/kibana/kibana.go @@ -3,6 +3,8 @@ package kibana import ( "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" "strconv" "strings" @@ -250,6 +252,12 @@ func (k *Kibana) gatherJsonData(url string, v interface{}) (host string, err err defer response.Body.Close() + if response.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(response.Body, 200)) + return request.Host, fmt.Errorf("%s returned HTTP status %s: %q", url, response.Status, body) + } + if err = json.NewDecoder(response.Body).Decode(v); err != nil { return request.Host, err } diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go index b97600700c3ee..1abcfa3a322e5 100644 --- a/plugins/inputs/logstash/logstash.go +++ b/plugins/inputs/logstash/logstash.go @@ -3,6 +3,8 @@ package logstash import ( "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" "net/url" "strings" @@ -197,6 +199,11 @@ func (logstash *Logstash) gatherJsonData(url string, value interface{}) error { } defer response.Body.Close() + if response.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(response.Body, 200)) + return fmt.Errorf("%s returned HTTP status %s: %q", url, response.Status, body) + } err = json.NewDecoder(response.Body).Decode(value) if err != nil { diff --git a/plugins/inputs/mailchimp/chimp_api.go b/plugins/inputs/mailchimp/chimp_api.go index 6e4ec2d4f120e..066ffb4e7fe8d 100644 --- a/plugins/inputs/mailchimp/chimp_api.go +++ b/plugins/inputs/mailchimp/chimp_api.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "log" "net/http" @@ -143,6 +144,12 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) { } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) + return nil, fmt.Errorf("%s returned HTTP status %s: %q", api.url.String(), resp.Status, body) + } + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err diff --git a/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go b/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go index 1293f946ee841..8e662849fb7a8 100644 --- a/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go +++ b/plugins/inputs/nginx_upstream_check/nginx_upstream_check.go @@ -2,14 +2,18 @@ package nginx_upstream_check import ( "encoding/json" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/tls" - "github.com/influxdata/telegraf/plugins/inputs" + "fmt" + "io" + "io/ioutil" "net/http" "net/url" "strconv" "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" ) const sampleConfig = ` @@ -148,6 +152,11 @@ func (check *NginxUpstreamCheck) gatherJsonData(url string, value interface{}) e } defer response.Body.Close() + if response.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(response.Body, 200)) + return fmt.Errorf("%s returned HTTP status %s: %q", url, response.Status, body) + } err = json.NewDecoder(response.Body).Decode(value) if err != nil { diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go index 1e8264d065fd8..e07b125ccdb8f 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer_test.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go @@ -51,8 +51,6 @@ func TestReadsMetricsFromNSQ(t *testing.T) { assert.Equal(t, 0, len(acc.Metrics), "There should not be any points") if err := consumer.Start(&acc); err != nil { t.Fatal(err.Error()) - } else { - defer consumer.Stop() } waitForPoint(&acc, t) diff --git a/plugins/inputs/salesforce/salesforce.go b/plugins/inputs/salesforce/salesforce.go index 096550db5982f..ad40ec5668d51 100644 --- a/plugins/inputs/salesforce/salesforce.go +++ b/plugins/inputs/salesforce/salesforce.go @@ -5,6 +5,7 @@ import ( "encoding/xml" "errors" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -200,6 +201,11 @@ func (s *Salesforce) login() error { return err } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + // ignore the err here; LimitReader returns io.EOF and we're not interested in read errors. + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) + return fmt.Errorf("%s returned HTTP status %s: %q", loginEndpoint, resp.Status, body) + } respBody, err := ioutil.ReadAll(resp.Body) if err != nil {