diff --git a/internal/beater/beater.go b/internal/beater/beater.go index f6c7f5e4d53..aee736e3fa1 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -700,10 +700,15 @@ func (s *Runner) newFinalBatchProcessor( } esConfig.FlushInterval = time.Second esConfig.Config = elasticsearch.DefaultConfig() + esConfig.MaxIdleConnsPerHost = 10 if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { return nil, nil, err } + if esConfig.MaxRequests != 0 { + esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests + } + var flushBytes int if esConfig.FlushBytes != "" { b, err := humanize.ParseBytes(esConfig.FlushBytes) diff --git a/internal/elasticsearch/config.go b/internal/elasticsearch/config.go index 3c493e75a20..8b9e6a423eb 100644 --- a/internal/elasticsearch/config.go +++ b/internal/elasticsearch/config.go @@ -44,18 +44,19 @@ var ( // Config holds all configurable fields that are used to create a Client type Config struct { - Hosts Hosts `config:"hosts" validate:"required"` - Protocol string `config:"protocol"` - Path string `config:"path"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` - Timeout time.Duration `config:"timeout"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` - APIKey string `config:"api_key"` - Headers map[string]string `config:"headers"` - MaxRetries int `config:"max_retries"` + Hosts Hosts `config:"hosts" validate:"required"` + Protocol string `config:"protocol"` + Path string `config:"path"` + ProxyURL string `config:"proxy_url"` + ProxyDisable bool `config:"proxy_disable"` + Timeout time.Duration `config:"timeout"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + APIKey string `config:"api_key"` + Headers map[string]string `config:"headers"` + MaxRetries int `config:"max_retries"` + MaxIdleConnsPerHost int `config:",ignore"` // CompressionLevel holds the gzip compression level used when bulk indexing // with go-docappender; it is otherwise ignored. @@ -145,9 +146,10 @@ func NewHTTPTransport(cfg *Config) (*http.Transport, error) { dialer := transport.NetDialer(cfg.Timeout) tlsDialer := transport.TLSDialer(dialer, tlsConfig, cfg.Timeout) return &http.Transport{ - Proxy: proxy, - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: tlsConfig.ToConfig(), + Proxy: proxy, + Dial: dialer.Dial, + DialTLS: tlsDialer.Dial, + TLSClientConfig: tlsConfig.ToConfig(), + MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, }, nil } diff --git a/internal/elasticsearch/config_test.go b/internal/elasticsearch/config_test.go index 2676d31a6b9..3593bf02336 100644 --- a/internal/elasticsearch/config_test.go +++ b/internal/elasticsearch/config_test.go @@ -146,10 +146,11 @@ func TestBeatsConfigSynced(t *testing.T) { // TODO(simitt): take a closer look at ES ouput changes in libbeat // introduced with https://github.com/elastic/beats/pull/25219 localStructExceptions := map[string]interface{}{ - "ssl": nil, - "timeout": nil, - "proxy_disable": nil, - "proxy_url": nil, + "ssl": nil, + "timeout": nil, + "proxy_disable": nil, + "proxy_url": nil, + "maxidleconnsperhost": nil, } for name, localStructField := range localStructFields { if _, ok := localStructExceptions[name]; ok {