From 763ed5ffd17335202c091a3fd13cbad95d48fdcb Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Thu, 16 Nov 2023 09:36:11 +0100 Subject: [PATCH] perf: set max conns idle per host as docappender max request (#12035) * perf: set max conns idle per host as docappender max request By default the http transport will use 2 as the max idle connections per host. The docappender client will send request a single host and will have at most MaxRequest appenders sharing the same client. To improve performance and connection reuse we set the number of max idle conns per host to MaxRequest * fix: use default value of 10 sync default value with docappender default * fix: only overwrite maxidleconnsperhost if maxrequests is not zero * fix: set max idle conns per host before creating the es client --------- Co-authored-by: Silvia Mitter --- internal/beater/beater.go | 5 ++++ internal/elasticsearch/config.go | 34 ++++++++++++++------------- internal/elasticsearch/config_test.go | 9 +++---- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index f6c7f5e4d53..aee736e3fa1 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -700,10 +700,15 @@ func (s *Runner) newFinalBatchProcessor( } esConfig.FlushInterval = time.Second esConfig.Config = elasticsearch.DefaultConfig() + esConfig.MaxIdleConnsPerHost = 10 if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil { return nil, nil, err } + if esConfig.MaxRequests != 0 { + esConfig.MaxIdleConnsPerHost = esConfig.MaxRequests + } + var flushBytes int if esConfig.FlushBytes != "" { b, err := humanize.ParseBytes(esConfig.FlushBytes) diff --git a/internal/elasticsearch/config.go b/internal/elasticsearch/config.go index 3c493e75a20..8b9e6a423eb 100644 --- a/internal/elasticsearch/config.go +++ b/internal/elasticsearch/config.go @@ -44,18 +44,19 @@ var ( // Config holds all configurable fields that are used to create a Client type Config struct { - Hosts Hosts `config:"hosts" validate:"required"` - Protocol string `config:"protocol"` - Path string `config:"path"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` - Timeout time.Duration `config:"timeout"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` - APIKey string `config:"api_key"` - Headers map[string]string `config:"headers"` - MaxRetries int `config:"max_retries"` + Hosts Hosts `config:"hosts" validate:"required"` + Protocol string `config:"protocol"` + Path string `config:"path"` + ProxyURL string `config:"proxy_url"` + ProxyDisable bool `config:"proxy_disable"` + Timeout time.Duration `config:"timeout"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` + APIKey string `config:"api_key"` + Headers map[string]string `config:"headers"` + MaxRetries int `config:"max_retries"` + MaxIdleConnsPerHost int `config:",ignore"` // CompressionLevel holds the gzip compression level used when bulk indexing // with go-docappender; it is otherwise ignored. @@ -145,9 +146,10 @@ func NewHTTPTransport(cfg *Config) (*http.Transport, error) { dialer := transport.NetDialer(cfg.Timeout) tlsDialer := transport.TLSDialer(dialer, tlsConfig, cfg.Timeout) return &http.Transport{ - Proxy: proxy, - Dial: dialer.Dial, - DialTLS: tlsDialer.Dial, - TLSClientConfig: tlsConfig.ToConfig(), + Proxy: proxy, + Dial: dialer.Dial, + DialTLS: tlsDialer.Dial, + TLSClientConfig: tlsConfig.ToConfig(), + MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost, }, nil } diff --git a/internal/elasticsearch/config_test.go b/internal/elasticsearch/config_test.go index 2676d31a6b9..3593bf02336 100644 --- a/internal/elasticsearch/config_test.go +++ b/internal/elasticsearch/config_test.go @@ -146,10 +146,11 @@ func TestBeatsConfigSynced(t *testing.T) { // TODO(simitt): take a closer look at ES ouput changes in libbeat // introduced with https://github.com/elastic/beats/pull/25219 localStructExceptions := map[string]interface{}{ - "ssl": nil, - "timeout": nil, - "proxy_disable": nil, - "proxy_url": nil, + "ssl": nil, + "timeout": nil, + "proxy_disable": nil, + "proxy_url": nil, + "maxidleconnsperhost": nil, } for name, localStructField := range localStructFields { if _, ok := localStructExceptions[name]; ok {