Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
13fac38
Initial Icinga Notifications Source
oxzi Jul 29, 2025
13c6708
Use the newly introduced notifications event utils from `igl`
yhabteab Aug 6, 2025
520a256
Don't limit queries referncing to `{host,service}_id` & Env ID params
yhabteab Aug 6, 2025
8f04886
Reevaluate rules immediately after refetching them
yhabteab Aug 6, 2025
08214d9
Retrieve host and service names from Redis
yhabteab Aug 6, 2025
bc040f3
Drop superfluous `rulesMutex`
yhabteab Aug 7, 2025
44fddfe
Notifications: Address Code Review
oxzi Sep 5, 2025
d326925
notifications: IGL Changes For Rules
oxzi Sep 9, 2025
7026446
history: Retry failing callback submissions
oxzi Sep 25, 2025
fca427f
notifications: Send relative Icinga Web 2 URLs
oxzi Sep 29, 2025
f0184ce
notifications: Don't abort for faulty object rules
oxzi Sep 30, 2025
c98ac5b
notifications: Reflect RulesInfo IGL update
oxzi Oct 2, 2025
a65bb52
Document Notification Source
oxzi Oct 2, 2025
3eb2661
Configurable callback sync telemetry stat name
oxzi Oct 21, 2025
36f6db8
Minor Tweaks for Icinga Notifications Integration
oxzi Oct 21, 2025
07e76e0
notifications: Fetch customvars from Redis
oxzi Oct 22, 2025
f8456cf
notifications: Evaluate Icinga DB Web Rule Filter
oxzi Oct 27, 2025
5c9a4f4
history: StreamSorter for Notifications Callback
oxzi Oct 31, 2025
2c13f4a
notifications: Custom Vars From SQL, Output Format
oxzi Oct 31, 2025
582caaa
config.example.yml: Reformat notifications-source
oxzi Oct 31, 2025
5f13ca2
history: SyncCallbackConf For Common Callback Conf
oxzi Oct 31, 2025
faeb9ff
notifications: Simplify Icinga DB Web Rule Evaluation
oxzi Oct 31, 2025
b3cab9e
telemetry: Undo Stats rework
oxzi Oct 31, 2025
bef44bc
history.parseRedisStreamId: Remove regex
oxzi Nov 3, 2025
aec7aa3
history.StreamSorter: Various Fixes
oxzi Nov 5, 2025
d41c17d
history.StreamSorter: Cleanup Output Channel
oxzi Nov 5, 2025
5d23985
telemetry: Remove leftover Stats.Callback
oxzi Nov 5, 2025
b60bac3
notifications: Fix flat customvars
oxzi Nov 5, 2025
287717a
history.streamSorterSubmissions: Use Pointer
oxzi Nov 5, 2025
f7126f2
StreamSorter: improve output channel close behavior and simplify impl…
julianbrost Nov 6, 2025
e0e04c6
history.StreamSorter: Few comments, No Data Races
oxzi Nov 6, 2025
c328b1a
notifications.Client: Allow Parameters of any Type
oxzi Nov 6, 2025
1a88189
notifications: Import StreamSorter Logic
oxzi Nov 7, 2025
019d956
notifications: TypeAcknowledgementCleared Message
oxzi Nov 10, 2025
6d63b3b
notifications: Mute and Unmute Events
oxzi Nov 10, 2025
ea6cd1d
notifications: Speed up StreamSorter Tests
oxzi Nov 10, 2025
231eb1a
doc: Update for Notifications
oxzi Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
"github.com/icinga/icingadb/pkg/notifications"
"github.com/okzk/sdnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -168,13 +169,32 @@ func run() int {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)

go func() {
logger.Info("Starting history sync")
{
var extraStages map[string]history.StageFunc
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
logger.Info("Starting Icinga Notifications source")

notificationsSource, err := notifications.NewNotificationsClient(
ctx,
db,
rc,
logs.GetChildLogger("notifications-source"),
cfg)
if err != nil {
logger.Fatalw("Can't create Icinga Notifications client from config", zap.Error(err))
}

if err := hs.Sync(ctx); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
extraStages = notificationsSource.SyncExtraStages()
}
}()

go func() {
logger.Info("Starting history sync")

if err := hs.Sync(ctx, extraStages); err != nil && !utils.IsContextCanceled(err) {
logger.Fatalf("%+v", err)
}
}()
}

// Main loop
for {
Expand Down
12 changes: 12 additions & 0 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,15 @@ redis:
# flapping:
# notification:
# state:

# Icinga DB can act as an event source for Icinga Notifications. If the following block is not empty, Icinga DB will
# submit events to the Icinga Notifications API.
#notifications-source:
# URL to the API root.
# api-base-url: http://localhost:5680

# Username to authenticate against the Icinga Notifications API.
# username: icingadb

# Password for the defined user.
# password: insecureinsecure
22 changes: 21 additions & 1 deletion doc/03-Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ ICINGADB_LOGGING_OPTIONS=database:error,high-availability:debug
| runtime-updates | Runtime updates of config objects after the initial config synchronization. |
| telemetry | Reporting of Icinga DB status to Icinga 2 via Redis® (for monitoring purposes). |

## Retention
## Retention Configuration

By default, no historical data is deleted, which means that the longer the data is retained,
the more disk space is required to store it. History retention is an optional feature that allows to
Expand Down Expand Up @@ -174,6 +174,26 @@ ICINGADB_RETENTION_OPTIONS=comment:356
| count | **Optional.** Number of old historical data a single query can delete in a `"DELETE FROM ... LIMIT count"` manner. Defaults to `5000`. |
| options | **Optional.** Map of history category to number of days to retain its data. Available categories are `acknowledgement`, `comment`, `downtime`, `flapping`, `notification` and `state`. |

## Notifications Source Configuration

!!! warning

The Icinga Notifications integration is a feature preview that you can try out.
However, incompatible changes may happen, so be sure to check the changelog for future updates.
Please do not use it in production.

Icinga DB can act as an event source for [Icinga Notifications](https://icinga.com/docs/icinga-notifications/).
If configured, Icinga DB will submit events to the Icinga Notifications API.

For YAML configuration, the options are part of the `notifications-source` dictionary.
For environment variables, each option is prefixed with `ICINGADB_NOTIFICATIONS_SOURCE_`.

| Option | Description |
|--------------|-----------------------------------------------------------------------------------|
| api-base-url | **Optional.** Icinga Notifications API base URL, such as `http://localhost:5680`. |
| username | **Optional.** Icinga Notifications API user for this source. |
| password | **Optional.** Icinga Notifications API user password. |

## Appendix

### Duration String
Expand Down
10 changes: 6 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/creasty/defaults"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/logging"
"github.com/icinga/icinga-go-library/notifications/source"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/pkg/errors"
Expand All @@ -15,10 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml"

// Config defines Icinga DB config.
type Config struct {
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"`
}

func (c *Config) SetDefaults() {
Expand Down
107 changes: 80 additions & 27 deletions pkg/icingadb/history/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"reflect"
"slices"
"sync"
)

Expand All @@ -37,7 +38,10 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
}

// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
func (s Sync) Sync(ctx context.Context) error {
//
// The optional extraStages parameter allows specifying an additional extra stage for each pipeline, identified by their
// key. This stage is executed after every other stage, but before the entry gets deleted from Redis.
func (s Sync) Sync(ctx context.Context, extraStages map[string]StageFunc) error {
g, ctx := errgroup.WithContext(ctx)

for key, pipeline := range syncPipelines {
Expand All @@ -62,6 +66,18 @@ func (s Sync) Sync(ctx context.Context) error {
// it has processed it, even if the stage itself does not do anything with this specific entry. It should only
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
// executed successfully.
//
// If an extra stage exists for this key, it will be appended to the pipeline. Thus, it is executed after every
// other pipeline action, but before deleteFromRedis.

// Shadowed variable to allow appending custom callbacks.
pipeline := pipeline
if extraStages != nil {
extraStage, ok := extraStages[key]
if ok {
pipeline = append(slices.Clip(pipeline), extraStage)
}
}

ch := make([]chan redis.XMessage, len(pipeline)+1)
for i := range ch {
Expand Down Expand Up @@ -152,26 +168,25 @@ func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redi
}

counter.Add(uint64(len(ids)))
telemetry.Stats.History.Add(uint64(len(ids)))
case <-ctx.Done():
return ctx.Err()
}
}
}

// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
// StageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop
// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information
// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history
// events from and an out channel to forward history entries to after processing them successfully. A stage function
// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On
// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis
// and can be processed at a later time.
type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error
type StageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error

// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
// writeOneEntityStage creates a StageFunc from a pointer to a struct implementing the v1.UpserterEntity interface.
// For each history event it receives, it parses that event into a new instance of that entity type and writes it to
// the database. It writes exactly one entity to the database for each history event.
func writeOneEntityStage(structPtr any) stageFunc {
func writeOneEntityStage(structPtr any) StageFunc {
structifier := structify.MakeMapStructifier(
reflect.TypeOf(structPtr).Elem(),
"json",
Expand All @@ -190,9 +205,9 @@ func writeOneEntityStage(structPtr any) stageFunc {
})
}

// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a
// writeMultiEntityStage creates a StageFunc from a function that takes a history event as an input and returns a
// (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database.
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc {
func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) StageFunc {
return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
type State struct {
Message redis.XMessage // Original event from Redis.
Expand Down Expand Up @@ -304,7 +319,7 @@ func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.Upse
}
}

// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed
// userNotificationStage is a specialized StageFunc that populates the user_notification_history table. It is executed
// on the notification history stream and uses the users_notified_ids attribute to create an entry in the
// user_notification_history relation table for each user ID.
func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
Expand Down Expand Up @@ -361,32 +376,70 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
})(ctx, s, key, in, out)
}

var syncPipelines = map[string][]stageFunc{
"notification": {
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
userNotificationStage, // user_notification_history (depends on notification_history)
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
// countElementStage increments the [Stats.History] counter.
//
// This StageFunc should be called last in each syncPipeline. Thus, it is executed before the final
// Sync.deleteFromRedis call in Sync.Sync, but before optional extra stages, potentially blocking.
func countElementStage(ctx context.Context, _ Sync, _ string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
defer close(out)

for {
select {
case msg, ok := <-in:
if !ok {
return nil
}

telemetry.Stats.History.Add(1)
out <- msg

case <-ctx.Done():
return ctx.Err()
}
}
}

const (
SyncPipelineAcknowledgement = "acknowledgement"
SyncPipelineComment = "comment"
SyncPipelineDowntime = "downtime"
SyncPipelineFlapping = "flapping"
SyncPipelineNotification = "notification"
SyncPipelineState = "state"
)

var syncPipelines = map[string][]StageFunc{
SyncPipelineAcknowledgement: {
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
countElementStage,
},
"state": {
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
SyncPipelineComment: {
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
countElementStage,
},
"downtime": {
SyncPipelineDowntime: {
writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history
writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history)
writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime
countElementStage,
},
"comment": {
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
},
"flapping": {
SyncPipelineFlapping: {
writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history
writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history)
countElementStage,
},
"acknowledgement": {
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
SyncPipelineNotification: {
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
userNotificationStage, // user_notification_history (depends on notification_history)
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
countElementStage,
},
SyncPipelineState: {
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
countElementStage,
},
}
9 changes: 9 additions & 0 deletions pkg/icingadb/v1/history/downtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ func (*HistoryDowntime) TableName() string {
return "history"
}

// DowntimeHistoryMeta is a combined HistoryMeta struct of DowntimeHistoryEntity and DowntimeHistory.
//
// It is used in the notifications package and became necessary as values of both structs were required.
type DowntimeHistoryMeta struct {
DowntimeHistoryEntity `json:",inline"`
DowntimeHistory `json:",inline"`
HistoryMeta `json:",inline"`
}

type SlaHistoryDowntime struct {
DowntimeHistoryEntity `json:",inline"`
HistoryTableMeta `json:",inline"`
Expand Down
18 changes: 12 additions & 6 deletions pkg/icingaredis/telemetry/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@ import (

var Stats struct {
// Config & co. are to be increased by the T sync once for every T object synced.
Config, State, History, Overdue, HistoryCleanup com.Counter
Config com.Counter
State com.Counter
History com.Counter
Overdue com.Counter
HistoryCleanup com.Counter
NotificationSync com.Counter
}

// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
func WriteStats(ctx context.Context, client *redis.Client, logger *logging.Logger) {
counters := map[string]*com.Counter{
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
"config_sync": &Stats.Config,
"state_sync": &Stats.State,
"history_sync": &Stats.History,
"overdue_sync": &Stats.Overdue,
"history_cleanup": &Stats.HistoryCleanup,
"notification_sync": &Stats.NotificationSync,
}

periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
Expand Down
Loading
Loading