Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/influxdata/kapacitor/services/snmptrap"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/webexteams"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/pkg/errors"
Expand Down Expand Up @@ -332,6 +333,25 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
n.IsStateChangesOnly = true
}

for _, teams := range n.WebexTeamsHandlers {
d.UDFLog("webex teams handlers")
d.UDFLog("teams room-id: " + teams.RoomID)
c := webexteams.HandlerConfig{
ToPersonID: teams.ToPersonID,
RoomID: teams.RoomID,
ToPersonEmail: teams.ToPersonEmail,
Markdown: teams.Markdown,
Token: teams.Token,
}
h := et.tm.WebexTeamsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if et.tm.WebexTeamsService != nil &&
et.tm.WebexTeamsService.Global() &&
et.tm.WebexTeamsService.StateChangesOnly() {
n.IsStateChangesOnly = true
}

for _, k := range n.KafkaHandlers {
c := kafka.HandlerConfig{
Cluster: k.Cluster,
Expand Down
21 changes: 21 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ type AlertNodeData struct {
// Send alert to Kafka topic
// tick:ignore
KafkaHandlers []*KafkaHandler `tick:"Kafka" json:"kafka"`

//Send alert to Webex Teams
// tick:ignore
WebexTeamsHandlers []*WebexTeamsHandler `tick:"WebexTeams" json:"webexTeams"`
}

func newAlertNode(wants EdgeType) *AlertNode {
Expand Down Expand Up @@ -1151,6 +1155,23 @@ type HipChatHandler struct {
Token string `json:"token"`
}

type WebexTeamsHandler struct {
*AlertNodeData `json:"-"`
RoomID string `json:"roomId"`
ToPersonID string `json:"toPersonId"`
ToPersonEmail string `json:"toPersonEmail"`
Token string `json:"token"`
Markdown bool `json:"markdown"`
}

func (n *AlertNodeData) WebexTeams() *WebexTeamsHandler {
webexteams := &WebexTeamsHandler{
AlertNodeData: n,
}
n.WebexTeamsHandlers = append(n.WebexTeamsHandlers, webexteams)
return webexteams
}

// Send the alert to Alerta.
//
// Example:
Expand Down
3 changes: 2 additions & 1 deletion pipeline/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func TestAlertNode_MarshalJSON(t *testing.T) {
"talk": null,
"mqtt": null,
"snmpTrap": null,
"kafka": null
"kafka": null,
"webexTeams": null
}`,
},
}
Expand Down
3 changes: 2 additions & 1 deletion pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func TestPipeline_MarshalJSON(t *testing.T) {
"talk": null,
"mqtt": null,
"snmpTrap": null,
"kafka": null
"kafka": null,
"webexTeams": null
},
{
"typeOf": "httpOut",
Expand Down
9 changes: 9 additions & 0 deletions pipeline/tick/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
Dot("token", h.Token)
}

for _, h := range a.WebexTeamsHandlers {
n.Dot("webexTeams").
Dot("roomId", h.RoomID).
Dot("toPersonId", h.ToPersonID).
Dot("toPersonEmail", h.ToPersonEmail).
Dot("markdown", h.Markdown).
Dot("token", h.Token)
}

for _, h := range a.KafkaHandlers {
n.Dot("kafka").
Dot("cluster", h.Cluster).
Expand Down
7 changes: 7 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/webexteams"
"github.com/influxdata/kapacitor/tlsconfig"
"github.com/pkg/errors"

Expand Down Expand Up @@ -102,6 +103,7 @@ type Config struct {
Talk talk.Config `toml:"talk" override:"talk"`
Telegram telegram.Config `toml:"telegram" override:"telegram"`
VictorOps victorops.Config `toml:"victorops" override:"victorops"`
WebexTeams webexteams.Config `toml:"webexteams" override:"webexteams"`

// Discovery for scraping
Scraper []scraper.Config `toml:"scraper" override:"scraper,element-key=name"`
Expand Down Expand Up @@ -171,6 +173,7 @@ func NewConfig() *Config {
c.SNMPTrap = snmptrap.NewConfig()
c.Telegram = telegram.NewConfig()
c.VictorOps = victorops.NewConfig()
c.WebexTeams = webexteams.NewConfig()

c.Reporting = reporting.NewConfig()
c.Stats = stats.NewConfig()
Expand Down Expand Up @@ -321,6 +324,10 @@ func (c *Config) Validate() error {
return errors.Wrap(err, "victorops")
}

if err := c.WebexTeams.Validate(); err != nil {
return errors.Wrap(err, "webexteams")
}

if err := c.UDF.Validate(); err != nil {
return errors.Wrap(err, "udf")
}
Expand Down
14 changes: 14 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/webexteams"
"github.com/influxdata/kapacitor/uuid"
"github.com/influxdata/kapacitor/waiter"
"github.com/pkg/errors"
Expand Down Expand Up @@ -263,6 +264,9 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
s.appendTalkService()
s.appendVictorOpsService()

// append Webex Teams Service
s.appendWebexTeamsService()

// Append alert service
s.appendAlertService()

Expand Down Expand Up @@ -744,6 +748,16 @@ func (s *Server) appendHipChatService() {
s.AppendService("hipchat", srv)
}

func (s *Server) appendWebexTeamsService() {
c := s.config.WebexTeams
d := s.DiagService.NewWebexTeamsHandler()
srv := webexteams.NewService(c, d)
s.TaskMaster.WebexTeamsService = srv
s.AlertService.WebexTeamsService = srv
s.SetDynamicService("webexteams", srv)
s.AppendService("webexteams", srv)
}

func (s *Server) appendKafkaService() {
c := s.config.Kafka
d := s.DiagService.NewKafkaHandler()
Expand Down
14 changes: 14 additions & 0 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/influxdata/kapacitor/services/storage"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/webexteams"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -84,6 +85,10 @@ type Service struct {
DefaultHandlerConfig() alerta.HandlerConfig
Handler(alerta.HandlerConfig, ...keyvalue.T) (alert.Handler, error)
}
WebexTeamsService interface {
DefaultHandlerConfig() webexteams.HandlerConfig
Handler(webexteams.HandlerConfig, ...keyvalue.T) alert.Handler
}
HipChatService interface {
Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler
}
Expand Down Expand Up @@ -796,6 +801,15 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {
}
h = s.HipChatService.Handler(c, ctx...)
h = newExternalHandler(h)
case "webexteams":
fmt.Println("case is webexteams")
c := webexteams.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return handler{}, err
}
h = s.WebexTeamsService.Handler(c, ctx...)
h = newExternalHandler(h)
case "kafka":
c := kafka.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
Expand Down
17 changes: 17 additions & 0 deletions services/diagnostic/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/services/webexteams"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/uuid"
plog "github.com/prometheus/common/log"
Expand Down Expand Up @@ -177,6 +178,22 @@ func (h *AlertServiceHandler) Error(msg string, err error, ctx ...keyvalue.T) {
Err(h.L, msg, err, ctx)
}

// Webex Teams Handler
type WebexTeamsHandler struct {
l Logger
}

func (h *WebexTeamsHandler) WithContext(ctx ...keyvalue.T) webexteams.Diagnostic {
fields := logFieldsFromContext(ctx)
return &WebexTeamsHandler{
l: h.l.With(fields...),
}
}

func (h *WebexTeamsHandler) Error(msg string, err error) {
h.l.Error(msg, Error(err))
}

// Kapcitor Handler

type KapacitorHandler struct {
Expand Down
6 changes: 6 additions & 0 deletions services/diagnostic/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ func (s *Service) NewHipChatHandler() *HipChatHandler {
}
}

func (s *Service) NewWebexTeamsHandler() *WebexTeamsHandler {
return &WebexTeamsHandler{
l: s.Logger.With(String("service", "webexteams")),
}
}

func (s *Service) NewKafkaHandler() *KafkaHandler {
return &KafkaHandler{
l: s.Logger.With(String("service", "kafka")),
Expand Down
60 changes: 60 additions & 0 deletions services/webexteams/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package webexteams

import (
"errors"
"net/url"
)

const (
// DefaultURL is the url for sending messages to the Webex Teams platform
DefaultURL = "https://api.ciscospark.com/v1/messages"
)

// Config contains basic configuration for enabling the Webex Teams Alert Plugin
type Config struct {
// Whether Telegram integration is enabled.
Enabled bool `toml:"enabled" override:"enabled"`

// The Telegram Bot URL, should not need to be changed.
URL string `toml:"url" override:"url"`
// The Webex Teams Access Token
Token string `toml:"token" override:"token,redact"`

// The default Room, can be overridden per alert.
RoomID string `toml:"room-id" override:"room-id"`

ToPersonID string `toml:"to-person-id" override:"to-person-id"`
ToPersonEmail string `toml:"to-person-email" override:"to-person-email"`

// Whether all alerts should automatically post to Webex Teams
Global bool `toml:"global" override:"global"`
// Whether all alerts should automatically use stateChangesOnly mode.
// Only applies if global is also set.
StateChangesOnly bool `toml:"state-changes-only" override:"state-changes-only"`
}

// NewConfig returns Config with the default Webex Teams URL
func NewConfig() Config {
return Config{
URL: DefaultURL,
}
}

// Validate ensures that the bare minimun configuration is supplied
func (c Config) Validate() error {
if c.Enabled {
if c.Token == "" {
return errors.New(InvalidTokenErr)
}
if c.RoomID == "" && c.ToPersonID == "" && c.ToPersonEmail == "" {
return errors.New(MissingDestinationErr)
}
if c.URL == "" {
return errors.New(InvalidURLErr)
}
if _, err := url.Parse(c.URL); err != nil {
return errors.New(InvalidURLErr)
}
}
return nil
}
12 changes: 12 additions & 0 deletions services/webexteams/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package webexteams

const (
// InvalidTokenErr is for missing, expired, or invalid access tokens
InvalidTokenErr = "webexteams: invalid access token"
// InvalidURLErr is for a bad URL
InvalidURLErr = "webexteams: invalid url"
// ServiceUnavailableErr is for when an alert is triggered but the Webex Teams Config has not been enabled
ServiceUnavailableErr = "webexteams: service unavailable"
// MissingDestinationErr is for a bad configuration, missing roomId, personId, or toPersonEmail
MissingDestinationErr = "webexteams: a destination is required to send a webex teams message (roomId, personId, toPersonEmail)"
)
Loading