Skip to content

Commit a199863

Browse files
committed
notifications: IGL Changes For Rules
The rules and rule version is now part of the Event. Also rename the Client method receiver variable.
1 parent 2335460 commit a199863

File tree

4 files changed

+51
-48
lines changed

4 files changed

+51
-48
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/goccy/go-yaml v1.13.0
88
github.com/google/go-cmp v0.7.0
99
github.com/google/uuid v1.6.0
10-
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e
10+
github.com/icinga/icinga-go-library v0.7.3-0.20250909100113-20db23663e45
1111
github.com/jessevdk/go-flags v1.6.1
1212
github.com/jmoiron/sqlx v1.4.0
1313
github.com/mattn/go-sqlite3 v1.14.32

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
3737
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
3838
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
3939
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
40-
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e h1:yZPWPPCHKozWRm9VedDHd8igJh9uI4U2CJdgl1On+/4=
41-
github.com/icinga/icinga-go-library v0.7.3-0.20250904130608-5032573a325e/go.mod h1:exEJdfik2GPYrvZM6Gn4BXIBLIGg6OrCCMnILT+mTUs=
40+
github.com/icinga/icinga-go-library v0.7.3-0.20250909100113-20db23663e45 h1:Wz6ttTYgYB7y8FH7snBSnnllLuzhE0QSp6m3P9b/QfM=
41+
github.com/icinga/icinga-go-library v0.7.3-0.20250909100113-20db23663e45/go.mod h1:uCENf5EVhNVvXTvhB+jXiwRB2NdLlz8cymseOM4qmI0=
4242
github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4=
4343
github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc=
4444
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=

pkg/notifications/notifications.go

Lines changed: 46 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ func NewNotificationsClient(
132132
// > select * from host where id = :host_id and environment_id = :environment_id and name like 'prefix_%'
133133
//
134134
// The :host_id and :environment_id parameters will be bound to the entity's ID and EnvironmentId fields, respectively.
135-
func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) {
136-
outRuleIds := make([]int64, 0, len(s.rules.Rules))
135+
func (client *Client) evaluateRulesForObject(ctx context.Context, entity database.Entity) ([]int64, error) {
136+
outRuleIds := make([]int64, 0, len(client.rules.Rules))
137137

138-
for rule := range s.rules.Iter() {
138+
for rule := range client.rules.Iter() {
139139
if rule.ObjectFilterExpr == "" {
140140
outRuleIds = append(outRuleIds, rule.Id)
141141
continue
@@ -148,7 +148,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent
148148
if err != nil {
149149
return false, errors.Wrapf(err, "cannot unescape rule %d object filter expression %q", rule.Id, rule.ObjectFilterExpr)
150150
}
151-
rows, err := s.db.NamedQueryContext(ctx, s.db.Rebind(query), entity)
151+
rows, err := client.db.NamedQueryContext(ctx, client.db.Rebind(query), entity)
152152
if err != nil {
153153
return false, err
154154
}
@@ -175,7 +175,7 @@ func (s *Client) evaluateRulesForObject(ctx context.Context, entity database.Ent
175175
// This function is used by all event builders to create a common event structure that includes the host and service
176176
// names, the absolute URL to the Icinga Web 2 Icinga DB page for the host or service, and the tags for the event.
177177
// Any event type-specific information (like severity, message, etc.) is added by the specific event builders.
178-
func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
178+
func (client *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error) {
179179
var (
180180
objectName string
181181
objectUrl *url.URL
@@ -185,7 +185,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
185185
if rlr.ServiceName != "" {
186186
objectName = rlr.HostName + "!" + rlr.ServiceName
187187

188-
objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/service")
188+
objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/service")
189189
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.ServiceName) + "&host.name=" + utils.RawUrlEncode(rlr.HostName)
190190

191191
objectTags = map[string]string{
@@ -195,7 +195,7 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
195195
} else {
196196
objectName = rlr.HostName
197197

198-
objectUrl = s.notificationsClient.JoinIcingaWeb2Path("/icingadb/host")
198+
objectUrl = client.notificationsClient.JoinIcingaWeb2Path("/icingadb/host")
199199
objectUrl.RawQuery = "name=" + utils.RawUrlEncode(rlr.HostName)
200200

201201
objectTags = map[string]string{
@@ -214,13 +214,13 @@ func (s *Client) buildCommonEvent(rlr *redisLookupResult) (*event.Event, error)
214214
//
215215
// The resulted event will have all the necessary information for a state change event, and must
216216
// not be further modified by the caller.
217-
func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
218-
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
217+
func (client *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateHistory) (*event.Event, error) {
218+
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
219219
if err != nil {
220220
return nil, err
221221
}
222222

223-
ev, err := s.buildCommonEvent(res)
223+
ev, err := client.buildCommonEvent(res)
224224
if err != nil {
225225
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
226226
}
@@ -262,13 +262,13 @@ func (s *Client) buildStateHistoryEvent(ctx context.Context, h *v1history.StateH
262262
}
263263

264264
// buildDowntimeHistoryMetaEvent from a downtime history entry.
265-
func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
266-
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
265+
func (client *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history.DowntimeHistoryMeta) (*event.Event, error) {
266+
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
267267
if err != nil {
268268
return nil, err
269269
}
270270

271-
ev, err := s.buildCommonEvent(res)
271+
ev, err := client.buildCommonEvent(res)
272272
if err != nil {
273273
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
274274
}
@@ -302,13 +302,13 @@ func (s *Client) buildDowntimeHistoryMetaEvent(ctx context.Context, h *v1history
302302
}
303303

304304
// buildFlappingHistoryEvent from a flapping history entry.
305-
func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
306-
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
305+
func (client *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.FlappingHistory) (*event.Event, error) {
306+
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
307307
if err != nil {
308308
return nil, err
309309
}
310310

311-
ev, err := s.buildCommonEvent(res)
311+
ev, err := client.buildCommonEvent(res)
312312
if err != nil {
313313
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
314314
}
@@ -333,13 +333,13 @@ func (s *Client) buildFlappingHistoryEvent(ctx context.Context, h *v1history.Fla
333333
}
334334

335335
// buildAcknowledgementHistoryEvent from an acknowledgment history entry.
336-
func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
337-
res, err := s.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
336+
func (client *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1history.AcknowledgementHistory) (*event.Event, error) {
337+
res, err := client.fetchHostServiceName(ctx, h.HostId, h.ServiceId)
338338
if err != nil {
339339
return nil, err
340340
}
341341

342-
ev, err := s.buildCommonEvent(res)
342+
ev, err := client.buildCommonEvent(res)
343343
if err != nil {
344344
return nil, errors.Wrapf(err, "cannot build event for %q,%q", res.HostName, res.ServiceName)
345345
}
@@ -371,17 +371,17 @@ func (s *Client) buildAcknowledgementHistoryEvent(ctx context.Context, h *v1hist
371371
}
372372

373373
// worker is the background worker launched by NewNotificationsClient.
374-
func (s *Client) worker() {
375-
defer s.ctxCancel()
374+
func (client *Client) worker() {
375+
defer client.ctxCancel()
376376

377377
for {
378378
select {
379-
case <-s.ctx.Done():
379+
case <-client.ctx.Done():
380380
return
381381

382-
case sub, more := <-s.inputCh:
382+
case sub, more := <-client.inputCh:
383383
if !more { // Should never happen, but just in case.
384-
s.logger.Debug("Input channel closed, stopping worker")
384+
client.logger.Debug("Input channel closed, stopping worker")
385385
return
386386
}
387387

@@ -393,40 +393,40 @@ func (s *Client) worker() {
393393
// Keep the type switch in sync with syncPipelines from pkg/icingadb/history/sync.go
394394
switch h := sub.entity.(type) {
395395
case *v1history.AcknowledgementHistory:
396-
ev, eventErr = s.buildAcknowledgementHistoryEvent(s.ctx, h)
396+
ev, eventErr = client.buildAcknowledgementHistoryEvent(client.ctx, h)
397397

398398
case *v1history.DowntimeHistoryMeta:
399-
ev, eventErr = s.buildDowntimeHistoryMetaEvent(s.ctx, h)
399+
ev, eventErr = client.buildDowntimeHistoryMetaEvent(client.ctx, h)
400400

401401
case *v1history.FlappingHistory:
402-
ev, eventErr = s.buildFlappingHistoryEvent(s.ctx, h)
402+
ev, eventErr = client.buildFlappingHistoryEvent(client.ctx, h)
403403

404404
case *v1history.StateHistory:
405405
if h.StateType != common.HardState {
406406
continue
407407
}
408-
ev, eventErr = s.buildStateHistoryEvent(s.ctx, h)
408+
ev, eventErr = client.buildStateHistoryEvent(client.ctx, h)
409409

410410
default:
411-
s.logger.Error("Cannot process unsupported type",
411+
client.logger.Error("Cannot process unsupported type",
412412
zap.Object("submission", sub),
413413
zap.String("type", fmt.Sprintf("%T", h)))
414414
continue
415415
}
416416

417417
if eventErr != nil {
418-
s.logger.Errorw("Cannot build event from history entry",
418+
client.logger.Errorw("Cannot build event from history entry",
419419
zap.Object("submission", sub),
420420
zap.String("type", fmt.Sprintf("%T", sub.entity)),
421421
zap.Error(eventErr))
422422
continue
423423
} else if ev == nil {
424424
// This really should not happen.
425-
s.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub))
425+
client.logger.Errorw("No event was fetched, but no error was reported.", zap.Object("submission", sub))
426426
continue
427427
}
428428

429-
eventLogger := s.logger.With(zap.Object(
429+
eventLogger := client.logger.With(zap.Object(
430430
"event",
431431
zapcore.ObjectMarshalerFunc(func(encoder zapcore.ObjectEncoder) error {
432432
encoder.AddString("name", ev.Name)
@@ -438,29 +438,32 @@ func (s *Client) worker() {
438438
sub.traces["evaluate_jump_pre"] = time.Now()
439439
reevaluateRules:
440440
sub.traces["evaluate_jump_last"] = time.Now()
441-
eventRuleIds, err := s.evaluateRulesForObject(s.ctx, sub.entity)
441+
eventRuleIds, err := client.evaluateRulesForObject(client.ctx, sub.entity)
442442
if err != nil {
443443
eventLogger.Errorw("Cannot evaluate rules for event",
444444
zap.Object("submission", sub),
445445
zap.Error(err))
446446
continue
447447
}
448448

449+
ev.RulesVersion = client.rules.Version
450+
ev.RuleIds = eventRuleIds
451+
449452
sub.traces["process_last"] = time.Now()
450-
newEventRules, err := s.notificationsClient.ProcessEvent(s.ctx, ev, s.rules.Version, eventRuleIds...)
453+
newEventRules, err := client.notificationsClient.ProcessEvent(client.ctx, ev)
451454
if errors.Is(err, source.ErrRulesOutdated) {
452-
s.rules = newEventRules
455+
client.rules = newEventRules
453456

454457
eventLogger.Infow("Re-evaluating rules for event after fetching new rules",
455458
zap.Object("submission", sub),
456-
zap.String("rules_version", s.rules.Version))
459+
zap.String("rules_version", client.rules.Version))
457460

458461
// Re-evaluate the just fetched rules for the current event.
459462
goto reevaluateRules
460463
} else if err != nil {
461464
eventLogger.Errorw("Cannot submit event to Icinga Notifications",
462465
zap.Object("submission", sub),
463-
zap.String("rules_version", s.rules.Version),
466+
zap.String("rules_version", client.rules.Version),
464467
zap.Any("rules", eventRuleIds),
465468
zap.Error(err))
466469
continue
@@ -478,7 +481,7 @@ func (s *Client) worker() {
478481
//
479482
// Internally, a buffered channel is used for delivery. So this function should not block. Otherwise, it will abort
480483
// after a second and an error is logged.
481-
func (s *Client) Submit(entity database.Entity) {
484+
func (client *Client) Submit(entity database.Entity) {
482485
sub := submission{
483486
entity: entity,
484487
traces: map[string]time.Time{
@@ -487,17 +490,17 @@ func (s *Client) Submit(entity database.Entity) {
487490
}
488491

489492
select {
490-
case <-s.ctx.Done():
491-
s.logger.Errorw("Client context is done, rejecting submission",
493+
case <-client.ctx.Done():
494+
client.logger.Errorw("Client context is done, rejecting submission",
492495
zap.Object("submission", sub),
493-
zap.Error(s.ctx.Err()))
496+
zap.Error(client.ctx.Err()))
494497
return
495498

496-
case s.inputCh <- sub:
499+
case client.inputCh <- sub:
497500
return
498501

499502
case <-time.After(time.Second):
500-
s.logger.Error("Client submission channel is blocking, rejecting submission",
503+
client.logger.Error("Client submission channel is blocking, rejecting submission",
501504
zap.Object("submission", sub))
502505
return
503506
}

pkg/notifications/redis_fetch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ import (
2323
// If this operation couldn't be completed within a reasonable time (a hard coded 5 seconds), it will cancel the
2424
// request and return an error indicating that the operation timed out. In case of the serviceId being set, the
2525
// maximum execution time of the Redis HGet commands is 10s (5s for each HGet call).
26-
func (s *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
26+
func (client *Client) fetchHostServiceName(ctx context.Context, hostId, serviceId types.Binary) (*redisLookupResult, error) {
2727
redisHGet := func(typ, field string, out *redisLookupResult) error {
2828
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
2929
defer cancel()
3030

3131
err := retry.WithBackoff(
3232
ctx,
33-
func(ctx context.Context) error { return s.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
33+
func(ctx context.Context) error { return client.redisClient.HGet(ctx, "icinga:"+typ, field).Scan(out) },
3434
retry.Retryable,
3535
backoff.DefaultBackoff,
3636
retry.Settings{},

0 commit comments

Comments
 (0)