diff --git a/alert.go b/alert.go index 3a979a387..c9936d8b7 100644 --- a/alert.go +++ b/alert.go @@ -32,6 +32,7 @@ import ( "github.com/influxdata/kapacitor/services/snmptrap" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/influxdata/kapacitor/tick/ast" "github.com/influxdata/kapacitor/tick/stateful" "github.com/pkg/errors" @@ -332,6 +333,25 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a n.IsStateChangesOnly = true } + for _, teams := range n.WebexTeamsHandlers { + d.UDFLog("webex teams handlers") + d.UDFLog("teams room-id: " + teams.RoomID) + c := webexteams.HandlerConfig{ + ToPersonID: teams.ToPersonID, + RoomID: teams.RoomID, + ToPersonEmail: teams.ToPersonEmail, + Markdown: teams.Markdown, + Token: teams.Token, + } + h := et.tm.WebexTeamsService.Handler(c, ctx...) + an.handlers = append(an.handlers, h) + } + if et.tm.WebexTeamsService != nil && + et.tm.WebexTeamsService.Global() && + et.tm.WebexTeamsService.StateChangesOnly() { + n.IsStateChangesOnly = true + } + for _, k := range n.KafkaHandlers { c := kafka.HandlerConfig{ Cluster: k.Cluster, diff --git a/pipeline/alert.go b/pipeline/alert.go index dff6ade41..123741fbb 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -385,6 +385,10 @@ type AlertNodeData struct { // Send alert to Kafka topic // tick:ignore KafkaHandlers []*KafkaHandler `tick:"Kafka" json:"kafka"` + + //Send alert to Webex Teams + // tick:ignore + WebexTeamsHandlers []*WebexTeamsHandler `tick:"WebexTeams" json:"webexTeams"` } func newAlertNode(wants EdgeType) *AlertNode { @@ -1151,6 +1155,23 @@ type HipChatHandler struct { Token string `json:"token"` } +type WebexTeamsHandler struct { + *AlertNodeData `json:"-"` + RoomID string `json:"roomId"` + ToPersonID string `json:"toPersonId"` + ToPersonEmail string `json:"toPersonEmail"` + Token string `json:"token"` + Markdown bool `json:"markdown"` +} + +func (n *AlertNodeData) WebexTeams() *WebexTeamsHandler { + webexteams := &WebexTeamsHandler{ + AlertNodeData: n, + } + n.WebexTeamsHandlers = append(n.WebexTeamsHandlers, webexteams) + return webexteams +} + // Send the alert to Alerta. // // Example: diff --git a/pipeline/alert_test.go b/pipeline/alert_test.go index b56a3dda6..7c6119beb 100644 --- a/pipeline/alert_test.go +++ b/pipeline/alert_test.go @@ -80,7 +80,8 @@ func TestAlertNode_MarshalJSON(t *testing.T) { "talk": null, "mqtt": null, "snmpTrap": null, - "kafka": null + "kafka": null, + "webexTeams": null }`, }, } diff --git a/pipeline/json_test.go b/pipeline/json_test.go index 5d55f5357..7f6fab3ef 100644 --- a/pipeline/json_test.go +++ b/pipeline/json_test.go @@ -259,7 +259,8 @@ func TestPipeline_MarshalJSON(t *testing.T) { "talk": null, "mqtt": null, "snmpTrap": null, - "kafka": null + "kafka": null, + "webexTeams": null }, { "typeOf": "httpOut", diff --git a/pipeline/tick/alert.go b/pipeline/tick/alert.go index bef88cebd..5257d8d33 100644 --- a/pipeline/tick/alert.go +++ b/pipeline/tick/alert.go @@ -179,6 +179,15 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) { Dot("token", h.Token) } + for _, h := range a.WebexTeamsHandlers { + n.Dot("webexTeams"). + Dot("roomId", h.RoomID). + Dot("toPersonId", h.ToPersonID). + Dot("toPersonEmail", h.ToPersonEmail). + Dot("markdown", h.Markdown). + Dot("token", h.Token) + } + for _, h := range a.KafkaHandlers { n.Dot("kafka"). Dot("cluster", h.Cluster). diff --git a/server/config.go b/server/config.go index 36a2f6a58..86ff46d62 100644 --- a/server/config.go +++ b/server/config.go @@ -57,6 +57,7 @@ import ( "github.com/influxdata/kapacitor/services/udf" "github.com/influxdata/kapacitor/services/udp" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/influxdata/kapacitor/tlsconfig" "github.com/pkg/errors" @@ -102,6 +103,7 @@ type Config struct { Talk talk.Config `toml:"talk" override:"talk"` Telegram telegram.Config `toml:"telegram" override:"telegram"` VictorOps victorops.Config `toml:"victorops" override:"victorops"` + WebexTeams webexteams.Config `toml:"webexteams" override:"webexteams"` // Discovery for scraping Scraper []scraper.Config `toml:"scraper" override:"scraper,element-key=name"` @@ -171,6 +173,7 @@ func NewConfig() *Config { c.SNMPTrap = snmptrap.NewConfig() c.Telegram = telegram.NewConfig() c.VictorOps = victorops.NewConfig() + c.WebexTeams = webexteams.NewConfig() c.Reporting = reporting.NewConfig() c.Stats = stats.NewConfig() @@ -321,6 +324,10 @@ func (c *Config) Validate() error { return errors.Wrap(err, "victorops") } + if err := c.WebexTeams.Validate(); err != nil { + return errors.Wrap(err, "webexteams") + } + if err := c.UDF.Validate(); err != nil { return errors.Wrap(err, "udf") } diff --git a/server/server.go b/server/server.go index b5879d3e3..cb5adb4f0 100644 --- a/server/server.go +++ b/server/server.go @@ -70,6 +70,7 @@ import ( "github.com/influxdata/kapacitor/services/udf" "github.com/influxdata/kapacitor/services/udp" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/influxdata/kapacitor/uuid" "github.com/influxdata/kapacitor/waiter" "github.com/pkg/errors" @@ -263,6 +264,9 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv s.appendTalkService() s.appendVictorOpsService() + // append Webex Teams Service + s.appendWebexTeamsService() + // Append alert service s.appendAlertService() @@ -744,6 +748,16 @@ func (s *Server) appendHipChatService() { s.AppendService("hipchat", srv) } +func (s *Server) appendWebexTeamsService() { + c := s.config.WebexTeams + d := s.DiagService.NewWebexTeamsHandler() + srv := webexteams.NewService(c, d) + s.TaskMaster.WebexTeamsService = srv + s.AlertService.WebexTeamsService = srv + s.SetDynamicService("webexteams", srv) + s.AppendService("webexteams", srv) +} + func (s *Server) appendKafkaService() { c := s.config.Kafka d := s.DiagService.NewKafkaHandler() diff --git a/services/alert/service.go b/services/alert/service.go index 0159db710..74b66624c 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -31,6 +31,7 @@ import ( "github.com/influxdata/kapacitor/services/storage" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -84,6 +85,10 @@ type Service struct { DefaultHandlerConfig() alerta.HandlerConfig Handler(alerta.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } + WebexTeamsService interface { + DefaultHandlerConfig() webexteams.HandlerConfig + Handler(webexteams.HandlerConfig, ...keyvalue.T) alert.Handler + } HipChatService interface { Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler } @@ -796,6 +801,15 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) { } h = s.HipChatService.Handler(c, ctx...) h = newExternalHandler(h) + case "webexteams": + fmt.Println("case is webexteams") + c := webexteams.HandlerConfig{} + err = decodeOptions(spec.Options, &c) + if err != nil { + return handler{}, err + } + h = s.WebexTeamsService.Handler(c, ctx...) + h = newExternalHandler(h) case "kafka": c := kafka.HandlerConfig{} err = decodeOptions(spec.Options, &c) diff --git a/services/diagnostic/handlers.go b/services/diagnostic/handlers.go index 06dce758a..c1cf10f70 100644 --- a/services/diagnostic/handlers.go +++ b/services/diagnostic/handlers.go @@ -39,6 +39,7 @@ import ( "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/udp" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/influxdata/kapacitor/udf" "github.com/influxdata/kapacitor/uuid" plog "github.com/prometheus/common/log" @@ -177,6 +178,22 @@ func (h *AlertServiceHandler) Error(msg string, err error, ctx ...keyvalue.T) { Err(h.L, msg, err, ctx) } +// Webex Teams Handler +type WebexTeamsHandler struct { + l Logger +} + +func (h *WebexTeamsHandler) WithContext(ctx ...keyvalue.T) webexteams.Diagnostic { + fields := logFieldsFromContext(ctx) + return &WebexTeamsHandler{ + l: h.l.With(fields...), + } +} + +func (h *WebexTeamsHandler) Error(msg string, err error) { + h.l.Error(msg, Error(err)) +} + // Kapcitor Handler type KapacitorHandler struct { diff --git a/services/diagnostic/service.go b/services/diagnostic/service.go index 750de1566..b1e2a6de8 100644 --- a/services/diagnostic/service.go +++ b/services/diagnostic/service.go @@ -196,6 +196,12 @@ func (s *Service) NewHipChatHandler() *HipChatHandler { } } +func (s *Service) NewWebexTeamsHandler() *WebexTeamsHandler { + return &WebexTeamsHandler{ + l: s.Logger.With(String("service", "webexteams")), + } +} + func (s *Service) NewKafkaHandler() *KafkaHandler { return &KafkaHandler{ l: s.Logger.With(String("service", "kafka")), diff --git a/services/webexteams/config.go b/services/webexteams/config.go new file mode 100644 index 000000000..877947e92 --- /dev/null +++ b/services/webexteams/config.go @@ -0,0 +1,60 @@ +package webexteams + +import ( + "errors" + "net/url" +) + +const ( + // DefaultURL is the url for sending messages to the Webex Teams platform + DefaultURL = "https://api.ciscospark.com/v1/messages" +) + +// Config contains basic configuration for enabling the Webex Teams Alert Plugin +type Config struct { + // Whether Telegram integration is enabled. + Enabled bool `toml:"enabled" override:"enabled"` + + // The Telegram Bot URL, should not need to be changed. + URL string `toml:"url" override:"url"` + // The Webex Teams Access Token + Token string `toml:"token" override:"token,redact"` + + // The default Room, can be overridden per alert. + RoomID string `toml:"room-id" override:"room-id"` + + ToPersonID string `toml:"to-person-id" override:"to-person-id"` + ToPersonEmail string `toml:"to-person-email" override:"to-person-email"` + + // Whether all alerts should automatically post to Webex Teams + Global bool `toml:"global" override:"global"` + // Whether all alerts should automatically use stateChangesOnly mode. + // Only applies if global is also set. + StateChangesOnly bool `toml:"state-changes-only" override:"state-changes-only"` +} + +// NewConfig returns Config with the default Webex Teams URL +func NewConfig() Config { + return Config{ + URL: DefaultURL, + } +} + +// Validate ensures that the bare minimun configuration is supplied +func (c Config) Validate() error { + if c.Enabled { + if c.Token == "" { + return errors.New(InvalidTokenErr) + } + if c.RoomID == "" && c.ToPersonID == "" && c.ToPersonEmail == "" { + return errors.New(MissingDestinationErr) + } + if c.URL == "" { + return errors.New(InvalidURLErr) + } + if _, err := url.Parse(c.URL); err != nil { + return errors.New(InvalidURLErr) + } + } + return nil +} diff --git a/services/webexteams/errors.go b/services/webexteams/errors.go new file mode 100644 index 000000000..fbba45f01 --- /dev/null +++ b/services/webexteams/errors.go @@ -0,0 +1,12 @@ +package webexteams + +const ( + // InvalidTokenErr is for missing, expired, or invalid access tokens + InvalidTokenErr = "webexteams: invalid access token" + // InvalidURLErr is for a bad URL + InvalidURLErr = "webexteams: invalid url" + // ServiceUnavailableErr is for when an alert is triggered but the Webex Teams Config has not been enabled + ServiceUnavailableErr = "webexteams: service unavailable" + // MissingDestinationErr is for a bad configuration, missing roomId, personId, or toPersonEmail + MissingDestinationErr = "webexteams: a destination is required to send a webex teams message (roomId, personId, toPersonEmail)" +) diff --git a/services/webexteams/service.go b/services/webexteams/service.go new file mode 100644 index 000000000..cf33d6385 --- /dev/null +++ b/services/webexteams/service.go @@ -0,0 +1,192 @@ +package webexteams + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "sync/atomic" + + "github.com/influxdata/kapacitor/alert" + "github.com/influxdata/kapacitor/keyvalue" +) + +// Diagnostic is an interface +type Diagnostic interface { + WithContext(ctx ...keyvalue.T) Diagnostic + Error(msg string, err error) +} + +// Service is a struct +type Service struct { + configValue atomic.Value + diag Diagnostic +} + +// NewService returns a service +func NewService(c Config, d Diagnostic) *Service { + s := &Service{ + diag: d, + } + s.configValue.Store(c) + return s +} + +// Open peform any init +func (s *Service) Open() error { + return nil +} + +// Close preform any actions prior to close +func (s *Service) Close() error { + return nil +} + +// Update performs any actions required to update the config +func (s *Service) Update(newConfig []interface{}) error { + if l := len(newConfig); l != 1 { + return fmt.Errorf("expected only one new config object, got %d", l) + } + c, ok := newConfig[0].(Config) + if !ok { + return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) + } + s.configValue.Store(c) + return nil +} + +// config loads the config struct stored in the configValue field. +func (s *Service) config() Config { + return s.configValue.Load().(Config) +} + +// Alert sends a message to ONE of the roomID, personID, or personEmail. +func (s *Service) Alert(roomID, personID, personEmail, token, text string, markdown bool) error { + c := s.config() + if !c.Enabled { + return errors.New(ServiceUnavailableErr) + } + postData := map[string]interface{}{} + // decide the message destination + + // check for incoming values FIRST + // if none of these conditions pass + // fall back to the kapacitor configuration + if personID != "" { + postData["toPersonId"] = personID + } else if personEmail != "" { + postData["toPersonEmail"] = personEmail + } else if roomID != "" { + postData["roomId"] = roomID + } else if c.ToPersonID != "" { + postData["toPersonId"] = c.ToPersonID + } else if c.ToPersonEmail != "" { + postData["toPersonEmail"] = c.ToPersonEmail + } else if c.RoomID != "" { + postData["roomId"] = c.RoomID + } + if markdown == true { + postData["markdown"] = true + } + authToken := "" + if token != "" { + authToken = token + } else if c.Token != "" { + authToken = c.Token + } + postData["text"] = text + data, err := json.Marshal(&postData) + if err != nil { + return err + } + r, err := http.NewRequest(http.MethodPost, c.URL, bytes.NewReader(data)) + if err != nil { + return err + } + r.Header.Add("Authorization", "Bearer "+authToken) + r.Header.Add("Content-Type", "application/json") + response, err := http.DefaultClient.Do(r) + if err != nil { + return err + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response code %d from Webex Teams service", response.StatusCode) + } + return nil +} + +// HandlerConfig is the Webex Teams Configuration for the Handler +type HandlerConfig struct { + RoomID string `mapstructure:"room-id"` + ToPersonID string `mapstructure:"to-person-id"` + ToPersonEmail string `mapstructure:"to-person-email"` + Token string `mapstructure:"token"` + Markdown bool `mapstructure:"markdown"` +} + +// handler provides the implementation of the alert.Handler interface for the Foo service. +type handler struct { + s *Service + c HandlerConfig + diag Diagnostic +} + +// DefaultHandlerConfig returns a HandlerConfig struct with defaults applied. +func (s *Service) DefaultHandlerConfig() HandlerConfig { + // return a handler config populated with the default room from the service config. + c := s.config() + return HandlerConfig{ + RoomID: c.RoomID, + } +} + +// Handler creates a handler from the config. +func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler { + return &handler{ + s: s, + c: c, + diag: s.diag.WithContext(ctx...), + } +} + +// StateChangesOnly configures whether or not alerts should be generated only when state changs +func (s *Service) StateChangesOnly() bool { + fmt.Println("state changes only", s.config().StateChangesOnly) + return s.config().StateChangesOnly +} + +// StateChangesOnly configures whether or not alerts should be generated only when state changs +func (s *Service) Global() bool { + c := s.config() + return c.Global +} + +// Handle takes an event and posts its message to the Foo service chat room. +func (h *handler) Handle(event alert.Event) { + if err := h.s.Alert(h.c.RoomID, h.c.ToPersonID, h.c.ToPersonEmail, h.c.Token, event.State.Message, h.c.Markdown); err != nil { + h.diag.Error("E! failed to handle event", err) + } +} + +type testOptions struct { + RoomID string `json:"roomId"` + Message string `json:"message"` +} + +func (s *Service) TestOptions() interface{} { + c := s.config() + return &testOptions{ + RoomID: c.RoomID, + Message: "test webex message", + } +} + +func (s *Service) Test(o interface{}) error { + options, ok := o.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + return s.Alert(options.RoomID, "", "", "", options.Message, false) +} diff --git a/services/webexteams/webexteamstest/webexteamstest.go b/services/webexteams/webexteamstest/webexteamstest.go new file mode 100644 index 000000000..1025d98d5 --- /dev/null +++ b/services/webexteams/webexteamstest/webexteamstest.go @@ -0,0 +1,59 @@ +package webexteamstest + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "sync" +) + +type Server struct { + mu sync.Mutex + ts *httptest.Server + URL string + requests []Request + closed bool +} + +func NewServer() *Server { + s := new(Server) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hr := Request{ + URL: r.URL.String(), + } + dec := json.NewDecoder(r.Body) + dec.Decode(&hr.PostData) + s.mu.Lock() + s.requests = append(s.requests, hr) + s.mu.Unlock() + })) + s.ts = ts + s.URL = ts.URL + return s +} + +func (s *Server) Requests() []Request { + s.mu.Lock() + defer s.mu.Unlock() + return s.requests +} +func (s *Server) Close() { + if s.closed { + return + } + s.closed = true + s.ts.Close() +} + +type Request struct { + URL string + PostData PostData +} + +type PostData struct { + ToPersonID string `json:"toPersonId"` + ToPersonEmail string `json:"toPersonEmail"` + RoomID string `json:"roomId"` + Markdown string `json:"markdown"` + Text string `json:"text"` +} diff --git a/task_master.go b/task_master.go index cb4ea67bd..ec65c48f4 100644 --- a/task_master.go +++ b/task_master.go @@ -39,6 +39,7 @@ import ( swarm "github.com/influxdata/kapacitor/services/swarm/client" "github.com/influxdata/kapacitor/services/telegram" "github.com/influxdata/kapacitor/services/victorops" + "github.com/influxdata/kapacitor/services/webexteams" "github.com/influxdata/kapacitor/tick" "github.com/influxdata/kapacitor/tick/stateful" "github.com/influxdata/kapacitor/timer" @@ -167,6 +168,12 @@ type TaskMaster struct { StateChangesOnly() bool Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler } + WebexTeamsService interface { + Global() bool + StateChangesOnly() bool + DefaultHandlerConfig() webexteams.HandlerConfig + Handler(webexteams.HandlerConfig, ...keyvalue.T) alert.Handler + } KafkaService interface { Handler(kafka.HandlerConfig, ...keyvalue.T) (alert.Handler, error) }