Skip to content

Commit

Permalink
Revert "PR influxdata#59, implementation of multiple outputs"
Browse files Browse the repository at this point in the history
This reverts commit 48a0755, reversing
changes made to 924700f.
  • Loading branch information
sparrc committed Aug 11, 2015
1 parent 48a0755 commit b312e48
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 218 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pkg/
tivan
.vagrant
telegraf
98 changes: 37 additions & 61 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ import (
"sync"
"time"

"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/plugins"
)

type runningOutput struct {
name string
output outputs.Output
}

type runningPlugin struct {
name string
plugin plugins.Plugin
Expand All @@ -37,8 +32,9 @@ type Agent struct {

Config *Config

outputs []*runningOutput
plugins []*runningPlugin

conn *client.Client
}

// NewAgent returns an Agent struct based off the given Config
Expand Down Expand Up @@ -70,36 +66,25 @@ func NewAgent(config *Config) (*Agent, error) {

// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
for _, o := range a.outputs {
err := o.output.Connect(a.Hostname)
if err != nil {
return err
}
}
return nil
}

func (a *Agent) LoadOutputs() ([]string, error) {
var names []string
config := a.Config

for _, name := range a.Config.OutputsDeclared() {
creator, ok := outputs.Outputs[name]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}

output := creator()
u, err := url.Parse(config.URL)
if err != nil {
return err
}

err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}
c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
Timeout: config.Timeout.Duration,
})

a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
if err != nil {
return err
}

<<<<<<< HEAD
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
Expand All @@ -109,11 +94,8 @@ func (a *Agent) LoadOutputs() ([]string, error) {
}

a.conn = c
=======
sort.Strings(names)
>>>>>>> jipperinbham-outputs-phase1

return names, nil
return nil
}

// LoadPlugins loads the agent's plugins
Expand Down Expand Up @@ -171,14 +153,17 @@ func (a *Agent) crankParallel() error {

close(points)

var bp BatchPoints
bp.Time = time.Now()
var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

for sub := range points {
bp.Points = append(bp.Points, sub.Points...)
acc.Points = append(acc.Points, sub.Points...)
}

return a.flush(&bp)
_, err := a.conn.Write(acc.BatchPoints)
return err
}

func (a *Agent) crank() error {
Expand All @@ -195,9 +180,12 @@ func (a *Agent) crank() error {
}
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

return a.flush(&acc)
_, err := a.conn.Write(acc.BatchPoints)
return err
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
Expand All @@ -219,10 +207,7 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
acc.Time = time.Now()
acc.Database = a.Config.Database

err = a.flush(&acc)
if err != nil {
return err
}
a.conn.Write(acc.BatchPoints)

select {
case <-shutdown:
Expand All @@ -233,22 +218,6 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}

func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
}(o)
}

wg.Wait()

return outerr
}

// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
Expand Down Expand Up @@ -307,6 +276,13 @@ func (a *Agent) Test() error {

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()
if err != nil {
return err
}
}

var wg sync.WaitGroup

for _, plugin := range a.plugins {
Expand Down
14 changes: 6 additions & 8 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/influxdb/telegraf"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all"
)

Expand Down Expand Up @@ -62,11 +61,6 @@ func main() {
ag.Debug = true
}

outputs, err := ag.LoadOutputs()
if err != nil {
log.Fatal(err)
}

plugins, err := ag.LoadPlugins()
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -106,15 +100,19 @@ func main() {
close(shutdown)
}()

log.Print("Telegraf Agent running")
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Print("InfluxDB Agent running")
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}

if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
if err != nil {
Expand Down
79 changes: 22 additions & 57 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,26 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}

// Config specifies the outputs that telegraf
// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
URL string
Username string
Password string
Database string
UserAgent string
Timeout Duration
Tags map[string]string

agent *ast.Table
plugins map[string]*ast.Table
outputs map[string]*ast.Table
}

// Plugins returns the configured plugins as a map of name -> plugin toml
func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}
type TagFilter struct {
Name string
Filter []string
}

// Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table {
return c.outputs
}

// The name of a tag, and the values on which to filter
type TagFilter struct {
Expand All @@ -66,9 +64,6 @@ type ConfiguredPlugin struct {

Drop []string
Pass []string
TagDrop []TagFilter

TagPass []TagFilter

TagDrop []TagFilter
TagPass []TagFilter
Expand Down Expand Up @@ -111,10 +106,6 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return false
}

<<<<<<< HEAD
=======

>>>>>>> jipperinbham-outputs-phase1
if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
Expand All @@ -128,18 +119,7 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
return true
}

<<<<<<< HEAD
return true
=======
return nil
}

// ApplyOutput loads the toml config into the given interface
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v)
}
>>>>>>> jipperinbham-outputs-phase1
}

// ApplyAgent loads the toml config into the given interface
Expand Down Expand Up @@ -245,24 +225,15 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err

// PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string {
return declared(c.plugins)
}

// OutputsDeclared returns the name of all outputs declared in the config.
func (c *Config) OutputsDeclared() []string {
return declared(c.outputs)
}

func declared(endpoints map[string]*ast.Table) []string {
var names []string
var plugins []string

for name, _ := range endpoints {
names = append(names, name)
for name := range c.plugins {
plugins = append(plugins, name)
}

sort.Strings(names)
sort.Strings(plugins)

return names
return plugins
}

// DefaultConfig returns an empty default configuration
Expand All @@ -286,7 +257,6 @@ func LoadConfig(path string) (*Config, error) {

c := &Config{
plugins: make(map[string]*ast.Table),
outputs: make(map[string]*ast.Table),
}

for name, val := range tbl.Fields {
Expand All @@ -296,16 +266,13 @@ func LoadConfig(path string) (*Config, error) {
}

switch name {
case "influxdb":
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
case "agent":
c.agent = subtbl
case "outputs":
for outputName, outputVal := range subtbl.Fields {
outputSubtbl, ok := outputVal.(*ast.Table)
if !ok {
return nil, errInvalidConfig
}
c.outputs[outputName] = outputSubtbl
}
default:
c.plugins[name] = subtbl
}
Expand Down Expand Up @@ -360,11 +327,8 @@ var header = `# Telegraf configuration
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.
# OUTPUTS
[outputs]
# Configuration for influxdb server to send metrics to
[outputs.influxdb]
[influxdb]
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
Expand All @@ -381,11 +345,12 @@ database = "telegraf" # required.
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# tags = { "dc": "us-east-1" }
# Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags]
# tags = { "dc" = "us-east-1" }
# dc = "us-east-1"
# Configuration for telegraf itself
# [agent]
Expand Down
Loading

0 comments on commit b312e48

Please sign in to comment.