Skip to content

Commit

Permalink
chore(insights): better handling of context cancelled (#9172)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz authored Feb 8, 2024
1 parent 67ee1be commit 067121c
Showing 1 changed file with 54 additions and 50 deletions.
104 changes: 54 additions & 50 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package insights

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -253,52 +253,11 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
// Usually this shouldn't close if there's no closed context
continue
}
if event.flag == 0 {
continue
}
startProcessingTime := r.now()
r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds()))
r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds()))
tenantCtx := multitenant.WithTenant(ctx, event.tenantId)
log := kuma_log.AddFieldsFromCtx(log, tenantCtx, r.extensions)
log = log.WithValues("event", event)
dpOverviews, err := r.dpOverviews(tenantCtx, event.mesh)
if err != nil {
log.Error(err, "unable to get DataplaneOverviews to recompute insights")
continue
log := kuma_log.AddFieldsFromCtx(log, tenantCtx, r.extensions).WithValues("event", event)
if err := r.processEvent(tenantCtx, start, event); err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "could not process an event")
}

externalServices := &core_mesh.ExternalServiceResourceList{}
if err := r.rm.List(tenantCtx, externalServices, store.ListByMesh(event.mesh)); err != nil {
log.Error(err, "unable to get ExternalServices to recompute insights")
continue
}

anyChanged := false
if event.flag&FlagService == FlagService {
err, changed := r.createOrUpdateServiceInsight(tenantCtx, event.mesh, dpOverviews, externalServices.Items)
if err != nil {
log.Error(err, "unable to resync ServiceInsight")
}
if changed {
anyChanged = true
}
}
if event.flag&FlagMesh == FlagMesh {
err, changed := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, externalServices.Items, event.types)
if err != nil {
log.Error(err, "unable to resync MeshInsight")
}
if changed {
anyChanged = true
}
}
reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_")
result := ResultNoChanges
if anyChanged {
result = ResultChanged
}
r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds()))
}
}
}()
Expand Down Expand Up @@ -333,7 +292,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
if steps == r.stepsBeforeFullResync {
steps = 0
tenantIds, err := r.tenantFn.GetIDs(ctx)
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "could not get tenants")
}
wg := sync.WaitGroup{}
Expand All @@ -346,8 +305,8 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
wg.Wait()
}
// We flush the batch
if err := batch.flush(tickCtx, resyncEvents, flushAll); err != nil {
// We flush the batch. We want to check if the original context is cancelled to avoid logging on CP shutdown.
if err := batch.flush(tickCtx, resyncEvents, flushAll); err != nil && !errors.Is(ctx.Err(), context.Canceled) {
log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick")
}
cancelTimeout()
Expand All @@ -366,7 +325,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
if resourceChanged, ok := event.(events.ResourceChangedEvent); ok {
supported, err := r.tenantFn.IDSupported(ctx, resourceChanged.TenantID)
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "could not determine if tenant ID is supported", "tenantID", resourceChanged.TenantID)
continue
}
Expand Down Expand Up @@ -397,6 +356,51 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}

func (r *resyncer) processEvent(ctx context.Context, start time.Time, event resyncEvent) error {
if event.flag == 0 {
return nil
}
startProcessingTime := r.now()
r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds()))
r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds()))
dpOverviews, err := r.dpOverviews(ctx, event.mesh)
if err != nil {
return errors.Wrap(err, "unable to get DataplaneOverviews to recompute insights")
}

externalServices := &core_mesh.ExternalServiceResourceList{}
if err := r.rm.List(ctx, externalServices, store.ListByMesh(event.mesh)); err != nil {
return errors.Wrap(err, "unable to get ExternalServices to recompute insights")
}

anyChanged := false
if event.flag&FlagService == FlagService {
err, changed := r.createOrUpdateServiceInsight(ctx, event.mesh, dpOverviews, externalServices.Items)
if err != nil {
return errors.Wrap(err, "unable to resync ServiceInsight")
}
if changed {
anyChanged = true
}
}
if event.flag&FlagMesh == FlagMesh {
err, changed := r.createOrUpdateMeshInsight(ctx, event.mesh, dpOverviews, externalServices.Items, event.types)
if err != nil {
return errors.Wrap(err, "unable to resync MeshInsight")
}
if changed {
anyChanged = true
}
}
reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_")
result := ResultNoChanges
if anyChanged {
result = ResultChanged
}
r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds()))
return nil
}

func (r *resyncer) dpOverviews(ctx context.Context, mesh string) ([]*core_mesh.DataplaneOverviewResource, error) {
dataplanes := &core_mesh.DataplaneResourceList{}
if err := r.rm.List(ctx, dataplanes, store.ListByMesh(mesh)); err != nil {
Expand All @@ -414,7 +418,7 @@ func (r *resyncer) dpOverviews(ctx context.Context, mesh string) ([]*core_mesh.D
func (r *resyncer) addMeshesToBatch(ctx context.Context, batch *eventBatch, tenantID string, reason string) {
meshList := &core_mesh.MeshResourceList{}
tenantCtx := multitenant.WithTenant(ctx, tenantID)
if err := r.rm.List(tenantCtx, meshList); err != nil {
if err := r.rm.List(tenantCtx, meshList); err != nil && !errors.Is(err, context.Canceled) {
log := kuma_log.AddFieldsFromCtx(log, tenantCtx, r.extensions)
log.Error(err, "failed to get list of meshes")
return
Expand Down

0 comments on commit 067121c

Please sign in to comment.