diff --git a/carbon/app.go b/carbon/app.go index 341345d6..ab9739ba 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -19,6 +19,7 @@ import ( "github.com/lomik/zapwriter" ) +// App is an application object used in main type App struct { sync.RWMutex Config *Config @@ -29,7 +30,7 @@ type App struct { Pickle receiver.Receiver Grpc receiver.Receiver Prometheus receiver.Receiver - TelegrafHttpJson receiver.Receiver + TelegrafHTTPJSON receiver.Receiver Collector *Collector // (!!!) Should be re-created on every change config/modules writeChan chan *RowBinary.WriteBuffer exit chan bool @@ -129,9 +130,9 @@ func (app *App) stopListeners() { logger.Debug("finished", zap.String("module", "prometheus")) } - if app.TelegrafHttpJson != nil { - app.TelegrafHttpJson.Stop() - app.TelegrafHttpJson = nil + if app.TelegrafHTTPJSON != nil { + app.TelegrafHTTPJSON.Stop() + app.TelegrafHTTPJSON = nil logger.Debug("finished", zap.String("module", "telegraf_http_json")) } } @@ -214,7 +215,9 @@ func (app *App) Start() (err error) { uploaders, nil, ) - app.Writer.Start() + if err := app.Writer.Start(); err != nil { + return err + } /* WRITER end */ /* UPLOADER start */ @@ -241,21 +244,23 @@ func (app *App) Start() (err error) { } for _, uploader := range app.Uploaders { - uploader.Start() + if err := uploader.Start(); err != nil { + return err + } } /* UPLOADER end */ /* RECEIVER start */ - if conf.Tcp.Enabled { + if conf.TCP.Enabled { app.TCP, err = receiver.New( - "tcp://"+conf.Tcp.Listen, + "tcp://"+conf.TCP.Listen, app.Config.TagDesc, receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2), receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.Tcp.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.Tcp.DropLongerThan), - receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())), + receiver.DropFuture(uint32(conf.TCP.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.TCP.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.TCP.DropLongerThan), + receiver.ReadTimeout(uint32(conf.TCP.ReadTimeout.Value().Seconds())), ) if err != nil { @@ -265,15 +270,15 @@ func (app *App) Start() (err error) { http.HandleFunc("/debug/receive/tcp/dropped/", app.TCP.DroppedHandler) } - if conf.Udp.Enabled { + if conf.UDP.Enabled { app.UDP, err = receiver.New( - "udp://"+conf.Udp.Listen, + "udp://"+conf.UDP.Listen, app.Config.TagDesc, receiver.ParseThreads(runtime.GOMAXPROCS(-1)*2), receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.Udp.DropLongerThan), + receiver.DropFuture(uint32(conf.UDP.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.UDP.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.UDP.DropLongerThan), ) if err != nil { @@ -335,22 +340,22 @@ func (app *App) Start() (err error) { http.HandleFunc("/debug/receive/prometheus/dropped/", app.Prometheus.DroppedHandler) } - if conf.TelegrafHttpJson.Enabled { - app.TelegrafHttpJson, err = receiver.New( - "telegraf+http+json://"+conf.TelegrafHttpJson.Listen, + if conf.TelegrafHTTPJSON.Enabled { + app.TelegrafHTTPJSON, err = receiver.New( + "telegraf+http+json://"+conf.TelegrafHTTPJSON.Listen, app.Config.TagDesc, receiver.WriteChan(app.writeChan), - receiver.DropFuture(uint32(conf.TelegrafHttpJson.DropFuture.Value().Seconds())), - receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())), - receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan), - receiver.ConcatChar(conf.TelegrafHttpJson.Concat), + receiver.DropFuture(uint32(conf.TelegrafHTTPJSON.DropFuture.Value().Seconds())), + receiver.DropPast(uint32(conf.TelegrafHTTPJSON.DropPast.Value().Seconds())), + receiver.DropLongerThan(conf.TelegrafHTTPJSON.DropLongerThan), + receiver.ConcatChar(conf.TelegrafHTTPJSON.Concat), ) if err != nil { return } - http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHttpJson.DroppedHandler) + http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHTTPJSON.DroppedHandler) } /* RECEIVER end */ diff --git a/carbon/collector.go b/carbon/collector.go index 3b637461..96b2d5e0 100644 --- a/carbon/collector.go +++ b/carbon/collector.go @@ -101,8 +101,8 @@ func NewCollector(app *App) *Collector { c.stats = append(c.stats, moduleCallback("prometheus", app.Prometheus)) } - if app.TelegrafHttpJson != nil { - c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHttpJson)) + if app.TelegrafHTTPJSON != nil { + c.stats = append(c.stats, moduleCallback("telegraf_http_json", app.TelegrafHTTPJSON)) } for n, u := range app.Uploaders { @@ -226,7 +226,7 @@ func (c *Collector) local(ctx context.Context) { func (c *Collector) chunked(ctx context.Context, chunkSize int, callback func([]byte)) { for { points := c.readData(ctx) - if points == nil || len(points) == 0 { + if len(points) == 0 { // exit closed return } diff --git a/carbon/config.go b/carbon/config.go index d56156c4..15645e32 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -15,6 +15,7 @@ import ( ) const ( + // MetricEndpointLocal used to send metrics in the carbon-clickhouse itself MetricEndpointLocal = "local" ) @@ -26,8 +27,9 @@ type commonConfig struct { Enabled bool `toml:"enabled"` } -type clickhouseConfig struct { - Url string `toml:"url"` +// ClickhouseConfig is TODO: use one ClickhouseConfig in all uploaders +type ClickhouseConfig struct { + URL string `toml:"url"` } type udpConfig struct { @@ -72,7 +74,7 @@ type promConfig struct { DropLongerThan uint16 `toml:"drop-longer-than"` } -type telegrafHttpJsonConfig struct { +type telegrafHTTPJSONConfig struct { Listen string `toml:"listen"` Enabled bool `toml:"enabled"` DropFuture *config.Duration `toml:"drop-future"` @@ -100,12 +102,12 @@ type Config struct { Common commonConfig `toml:"common"` Data dataConfig `toml:"data"` Upload map[string]*uploader.Config `toml:"upload"` - Udp udpConfig `toml:"udp"` - Tcp tcpConfig `toml:"tcp"` + UDP udpConfig `toml:"udp"` + TCP tcpConfig `toml:"tcp"` Pickle pickleConfig `toml:"pickle"` Grpc grpcConfig `toml:"grpc"` Prometheus promConfig `toml:"prometheus"` - TelegrafHttpJson telegrafHttpJsonConfig `toml:"telegraf_http_json"` + TelegrafHTTPJSON telegrafHTTPJSONConfig `toml:"telegraf_http_json"` Pprof pprofConfig `toml:"pprof"` Logging []zapwriter.Config `toml:"logging"` TagDesc tags.TagConfig `toml:"convert_to_tagged"` @@ -133,7 +135,7 @@ func NewConfig() *Config { CompAlgo: &config.Compression{CompAlgo: config.CompAlgoNone}, CompLevel: 0, }, - Udp: udpConfig{ + UDP: udpConfig{ Listen: ":2003", Enabled: true, LogIncomplete: false, @@ -141,7 +143,7 @@ func NewConfig() *Config { DropPast: &config.Duration{}, DropLongerThan: 0, }, - Tcp: tcpConfig{ + TCP: tcpConfig{ Listen: ":2003", Enabled: true, DropFuture: &config.Duration{}, @@ -172,7 +174,7 @@ func NewConfig() *Config { DropPast: &config.Duration{}, DropLongerThan: 0, }, - TelegrafHttpJson: telegrafHttpJsonConfig{ + TelegrafHTTPJSON: telegrafHTTPJSONConfig{ Listen: ":2007", Enabled: false, DropFuture: &config.Duration{}, @@ -192,13 +194,14 @@ func NewConfig() *Config { return cfg } +// NewLoggingConfig returns the zapwriter.Config with logging into "/var/log/carbon-clickhouse/carbon-clickhouse.log" func NewLoggingConfig() zapwriter.Config { cfg := zapwriter.NewConfig() cfg.File = "/var/log/carbon-clickhouse/carbon-clickhouse.log" return cfg } -// PrintConfig ... +// PrintDefaultConfig ... func PrintDefaultConfig() error { cfg := NewConfig() buf := new(bytes.Buffer) @@ -212,7 +215,7 @@ func PrintDefaultConfig() error { } cfg.Upload = map[string]*uploader.Config{ - "graphite": &uploader.Config{ + "graphite": { Type: "points", Timeout: &config.Duration{ Duration: time.Minute, @@ -221,7 +224,7 @@ func PrintDefaultConfig() error { TableName: "graphite", URL: "http://localhost:8123/", }, - "graphite_tree": &uploader.Config{ + "graphite_tree": { Type: "tree", Timeout: &config.Duration{ Duration: time.Minute, diff --git a/receiver/base.go b/receiver/base.go index 595b745e..005c8676 100644 --- a/receiver/base.go +++ b/receiver/base.go @@ -28,18 +28,18 @@ type Base struct { pastDropped uint64 // atomic tooLongDropped uint64 // atomic } - droppedList [droppedListSize]string - droppedListNext int - droppedListMu sync.Mutex - parseThreads int - dropFutureSeconds uint32 - dropPastSeconds uint32 - dropTooLongLimit uint16 + droppedList [droppedListSize]string + droppedListNext int + droppedListMu sync.Mutex + parseThreads int + dropFutureSeconds uint32 + dropPastSeconds uint32 + dropTooLongLimit uint16 readTimeoutSeconds uint32 - writeChan chan *RowBinary.WriteBuffer - logger *zap.Logger - Tags tags.TagConfig - concatCharacter string + writeChan chan *RowBinary.WriteBuffer + logger *zap.Logger + Tags tags.TagConfig + concatCharacter string } func NewBase(logger *zap.Logger, config tags.TagConfig) Base { @@ -56,6 +56,12 @@ func sendInt64Gauge(send func(metric string, value float64), metric string, valu send(metric, float64(atomic.LoadInt64(value))) } +func (base *Base) applyOptions(opts ...Option) { + for _, applyOption := range opts { + applyOption(base) + } +} + func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool { if base.dropFutureSeconds != 0 && (metricTime > (nowTime + base.dropFutureSeconds)) { atomic.AddUint64(&base.stat.futureDropped, 1) diff --git a/receiver/receiver.go b/receiver/receiver.go index fcab776b..2855467c 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -97,11 +97,7 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { return nil, err } - base := NewBase(zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)), config) - - for _, optApply := range opts { - optApply(&base) - } + logger := zapwriter.Logger(strings.Replace(u.Scheme, "+", "_", -1)) if u.Scheme == "tcp" { addr, err := net.ResolveTCPAddr("tcp", u.Host) @@ -110,9 +106,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &TCP{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan *Buffer), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -128,9 +125,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &Pickle{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan []byte), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -146,9 +144,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &UDP{ - Base: base, + Base: NewBase(logger, config), parseChan: make(chan *Buffer), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -164,8 +163,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &GRPC{ - Base: base, + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -181,8 +181,9 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { } r := &PrometheusRemoteWrite{ - Base: base, + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err @@ -197,9 +198,10 @@ func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { return nil, err } - r := &TelegrafHttpJson{ - Base: base, + r := &TelegrafHTTPJSON{ + Base: NewBase(logger, config), } + r.applyOptions(opts...) if err = r.Listen(addr); err != nil { return nil, err diff --git a/receiver/telegraf_http_json.go b/receiver/telegraf_http_json.go index 803efe7f..1b5c0242 100644 --- a/receiver/telegraf_http_json.go +++ b/receiver/telegraf_http_json.go @@ -17,18 +17,18 @@ import ( "go.uber.org/zap" ) -type TelegrafHttpMetric struct { +type TelegrafHTTPMetric struct { Name string `json:"name"` Timestamp int64 `json:"timestamp"` Fields map[string]interface{} `json:"fields"` Tags map[string]string `json:"tags"` } -type TelegrafHttpPayload struct { - Metrics []TelegrafHttpMetric `json:"metrics"` +type TelegrafHTTPPayload struct { + Metrics []TelegrafHTTPMetric `json:"metrics"` } -type TelegrafHttpJson struct { +type TelegrafHTTPJSON struct { Base listener *net.TCPListener } @@ -73,14 +73,14 @@ func TelegrafEncodeTags(tags map[string]string) string { return res.String() } -func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (rcv *TelegrafHTTPJSON) ServeHTTP(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - var data TelegrafHttpPayload + var data TelegrafHTTPPayload err = json.Unmarshal(body, &data) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -144,19 +144,19 @@ metricsLoop: } // Addr returns binded socket address. For bind port 0 in tests -func (rcv *TelegrafHttpJson) Addr() net.Addr { +func (rcv *TelegrafHTTPJSON) Addr() net.Addr { if rcv.listener == nil { return nil } return rcv.listener.Addr() } -func (rcv *TelegrafHttpJson) Stat(send func(metric string, value float64)) { +func (rcv *TelegrafHTTPJSON) Stat(send func(metric string, value float64)) { rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped") } // Listen bind port. Receive messages and send to out channel -func (rcv *TelegrafHttpJson) Listen(addr *net.TCPAddr) error { +func (rcv *TelegrafHTTPJSON) Listen(addr *net.TCPAddr) error { return rcv.StartFunc(func() error { tcpListener, err := net.ListenTCP("tcp", addr)