Skip to content

Commit 4b91e08

Browse files
committed
[feat aga] Implement resource monitoring for referenced resources
1 parent ef89515 commit 4b91e08

File tree

9 files changed

+1697
-5
lines changed

9 files changed

+1697
-5
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package eventhandlers
2+
3+
import (
4+
"context"
5+
"github.com/go-logr/logr"
6+
corev1 "k8s.io/api/core/v1"
7+
networking "k8s.io/api/networking/v1"
8+
"k8s.io/apimachinery/pkg/types"
9+
"k8s.io/client-go/util/workqueue"
10+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
"sigs.k8s.io/controller-runtime/pkg/event"
13+
"sigs.k8s.io/controller-runtime/pkg/handler"
14+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
15+
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
16+
)
17+
18+
// NewEnqueueRequestsForResourceEvent creates a new handler for generic resource events
19+
func NewEnqueueRequestsForResourceEvent(
20+
resourceType aga.ResourceType,
21+
referenceTracker *aga.ReferenceTracker,
22+
logger logr.Logger,
23+
) handler.EventHandler {
24+
return &enqueueRequestsForResourceEvent{
25+
resourceType: resourceType,
26+
referenceTracker: referenceTracker,
27+
logger: logger,
28+
}
29+
}
30+
31+
// enqueueRequestsForResourceEvent handles resource events and enqueues reconcile requests for GlobalAccelerators
32+
// that reference the resource
33+
type enqueueRequestsForResourceEvent struct {
34+
resourceType aga.ResourceType
35+
referenceTracker *aga.ReferenceTracker
36+
logger logr.Logger
37+
}
38+
39+
// The following methods implement handler.TypedEventHandler interface
40+
41+
// Create handles Create events with the typed API
42+
func (h *enqueueRequestsForResourceEvent) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
43+
h.handleResource(ctx, evt.Object, "created", queue)
44+
}
45+
46+
// Update handles Update events with the typed API
47+
func (h *enqueueRequestsForResourceEvent) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
48+
h.handleResource(ctx, evt.ObjectNew, "updated", queue)
49+
}
50+
51+
// Delete handles Delete events with the typed API
52+
func (h *enqueueRequestsForResourceEvent) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
53+
h.handleResource(ctx, evt.Object, "deleted", queue)
54+
}
55+
56+
// Generic handles Generic events with the typed API
57+
func (h *enqueueRequestsForResourceEvent) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
58+
h.handleResource(ctx, evt.Object, "generic event", queue)
59+
}
60+
61+
// handleTypedResource handles resource events for the typed interface
62+
func (h *enqueueRequestsForResourceEvent) handleResource(_ context.Context, obj interface{}, eventType string, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
63+
var namespace, name string
64+
65+
// Extract namespace and name based on the object type
66+
switch res := obj.(type) {
67+
case *corev1.Service:
68+
namespace = res.Namespace
69+
name = res.Name
70+
case *networking.Ingress:
71+
namespace = res.Namespace
72+
name = res.Name
73+
case *gwv1.Gateway:
74+
namespace = res.Namespace
75+
name = res.Name
76+
default:
77+
h.logger.Error(nil, "Unknown resource type", "type", h.resourceType)
78+
return
79+
}
80+
81+
resourceKey := aga.ResourceKey{
82+
Type: h.resourceType,
83+
Name: types.NamespacedName{
84+
Namespace: namespace,
85+
Name: name,
86+
},
87+
}
88+
89+
// If this resource is not referenced by any GA, no need to queue reconciles
90+
if !h.referenceTracker.IsResourceReferenced(resourceKey) {
91+
return
92+
}
93+
94+
// Get all GAs that reference this resource
95+
gaRefs := h.referenceTracker.GetGAsForResource(resourceKey)
96+
97+
// Queue reconcile for affected GAs
98+
for _, gaRef := range gaRefs {
99+
h.logger.V(1).Info("Enqueueing GA for reconcile due to resource event",
100+
"resourceType", h.resourceType,
101+
"resourceName", resourceKey.Name,
102+
"eventType", eventType,
103+
"ga", gaRef)
104+
105+
queue.Add(reconcile.Request{NamespacedName: gaRef})
106+
}
107+
}

controllers/aga/globalaccelerator_controller.go

Lines changed: 151 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ import (
3535
ctrl "sigs.k8s.io/controller-runtime"
3636
"sigs.k8s.io/controller-runtime/pkg/client"
3737
"sigs.k8s.io/controller-runtime/pkg/controller"
38+
"sigs.k8s.io/controller-runtime/pkg/event"
3839
"sigs.k8s.io/controller-runtime/pkg/reconcile"
40+
"sigs.k8s.io/controller-runtime/pkg/source"
3941

4042
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
43+
"sigs.k8s.io/aws-load-balancer-controller/controllers/aga/eventhandlers"
4144
"sigs.k8s.io/aws-load-balancer-controller/pkg/aga"
4245
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
4346
"sigs.k8s.io/aws-load-balancer-controller/pkg/deploy"
@@ -50,6 +53,7 @@ import (
5053
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
5154
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
5255
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
56+
gwclientset "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
5357
)
5458

5559
const (
@@ -64,6 +68,9 @@ const (
6468
requeueMessage = "Monitoring provisioning state"
6569
statusUpdateRequeueTime = 1 * time.Minute
6670

71+
// Status reason constants
72+
EndpointLoadFailed = "EndpointLoadFailed"
73+
6774
// Metric stage constants
6875
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
6976
MetricStageAddFinalizers = "add_finalizers"
@@ -108,6 +115,17 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
108115
// Create status updater
109116
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)
110117

118+
// Create reference tracker for endpoint tracking
119+
referenceTracker := aga.NewReferenceTracker(logger.WithName("reference-tracker"))
120+
121+
// Create DNS resolver
122+
dnsToLoadBalancerResolver, err := aga.NewDNSToLoadBalancerResolver(cloud.ELBV2())
123+
if err != nil {
124+
logger.Error(err, "Failed to create DNS resolver")
125+
}
126+
127+
// Create unified endpoint loader
128+
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsToLoadBalancerResolver, logger.WithName("endpoint-loader"))
111129
return &globalAcceleratorReconciler{
112130
k8sClient: k8sClient,
113131
eventRecorder: eventRecorder,
@@ -120,6 +138,13 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
120138
metricsCollector: metricsCollector,
121139
reconcileTracker: reconcileCounters.IncrementAGA,
122140

141+
// Components for endpoint reference tracking
142+
referenceTracker: referenceTracker,
143+
dnsToLoadBalancerResolver: dnsToLoadBalancerResolver,
144+
145+
// Unified endpoint loader
146+
endpointLoader: endpointLoader,
147+
123148
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
124149
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
125150
}
@@ -138,6 +163,21 @@ type globalAcceleratorReconciler struct {
138163
metricsCollector lbcmetrics.MetricCollector
139164
reconcileTracker func(namespaceName ktypes.NamespacedName)
140165

166+
// Components for endpoint reference tracking
167+
referenceTracker *aga.ReferenceTracker
168+
dnsToLoadBalancerResolver *aga.DNSToLoadBalancerResolver
169+
170+
// Unified endpoint loader
171+
endpointLoader aga.EndpointLoader
172+
173+
// Resources manager for dedicated endpoint resource watchers
174+
endpointResourcesManager aga.EndpointResourcesManager
175+
176+
// Event channels for dedicated watchers
177+
serviceEventChan chan event.GenericEvent
178+
ingressEventChan chan event.GenericEvent
179+
gatewayEventChan chan event.GenericEvent
180+
141181
maxConcurrentReconciles int
142182
maxExponentialBackoffDelay time.Duration
143183
}
@@ -194,6 +234,13 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
194234

195235
func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
196236
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
237+
// Clean up references in the reference tracker
238+
gaKey := k8s.NamespacedName(ga)
239+
r.referenceTracker.RemoveGA(gaKey)
240+
241+
// Clean up resource watches
242+
r.endpointResourcesManager.RemoveGA(gaKey)
243+
197244
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
198245
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
199246
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
@@ -224,6 +271,29 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
224271

225272
func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
226273
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
274+
275+
// Get all endpoints from GA
276+
endpoints := aga.GetAllEndpointsFromGA(ga)
277+
278+
// Track referenced endpoints
279+
r.referenceTracker.UpdateReferencesForGA(ga, endpoints)
280+
281+
// Update resource watches with the endpointResourcesManager
282+
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)
283+
284+
// Validate and load endpoint status using the endpoint loader
285+
_, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)
286+
if len(fatalErrors) > 0 {
287+
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
288+
r.logger.Error(err, "Fatal error loading endpoints")
289+
290+
// Handle other endpoint loading errors
291+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, EndpointLoadFailed, err.Error()); statusErr != nil {
292+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after endpoint load failure")
293+
}
294+
return err
295+
}
296+
227297
var stack core.Stack
228298
var accelerator *agamodel.Accelerator
229299
var err error
@@ -335,21 +405,97 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
335405
return nil
336406
}
337407

338-
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
408+
// Create event channels for dedicated watchers
409+
r.serviceEventChan = make(chan event.GenericEvent)
410+
r.ingressEventChan = make(chan event.GenericEvent)
411+
r.gatewayEventChan = make(chan event.GenericEvent)
412+
413+
// Initialize Gateway API client using the same config
414+
gwClient, err := gwclientset.NewForConfig(mgr.GetConfig())
415+
if err != nil {
416+
r.logger.Error(err, "Failed to create Gateway API client")
339417
return err
340418
}
341419

342-
// TODO: Add event handlers for Services, Ingresses, and Gateways
343-
// that are referenced by GlobalAccelerator endpoints
420+
// Initialize the endpoint resources manager with clients
421+
r.endpointResourcesManager = aga.NewEndpointResourcesManager(
422+
clientSet,
423+
gwClient,
424+
r.serviceEventChan,
425+
r.ingressEventChan,
426+
r.gatewayEventChan,
427+
r.logger.WithName("endpoint-resources-manager"),
428+
)
429+
430+
if err := r.setupIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
431+
return err
432+
}
344433

345-
return ctrl.NewControllerManagedBy(mgr).
434+
// Set up the controller builder
435+
ctrl, err := ctrl.NewControllerManagedBy(mgr).
346436
For(&agaapi.GlobalAccelerator{}).
347437
Named(controllerName).
348438
WithOptions(controller.Options{
349439
MaxConcurrentReconciles: r.maxConcurrentReconciles,
350440
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
351441
}).
352-
Complete(r)
442+
Build(r)
443+
444+
if err != nil {
445+
return err
446+
}
447+
448+
// Setup watches for resource events
449+
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
450+
return err
451+
}
452+
453+
return nil
454+
}
455+
456+
// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
457+
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
458+
loggerPrefix := r.logger.WithName("eventHandlers")
459+
460+
// Create handlers for our dedicated watchers
461+
serviceHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
462+
aga.ServiceResourceType,
463+
r.referenceTracker,
464+
loggerPrefix.WithName("service-handler"),
465+
)
466+
467+
ingressHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
468+
aga.IngressResourceType,
469+
r.referenceTracker,
470+
loggerPrefix.WithName("ingress-handler"),
471+
)
472+
473+
gatewayHandler := eventhandlers.NewEnqueueRequestsForResourceEvent(
474+
aga.GatewayResourceType,
475+
r.referenceTracker,
476+
loggerPrefix.WithName("gateway-handler"),
477+
)
478+
479+
// Add watches using the channel sources with event handlers
480+
if err := c.Watch(source.Channel(r.serviceEventChan, serviceHandler)); err != nil {
481+
return err
482+
}
483+
484+
if err := c.Watch(source.Channel(r.ingressEventChan, ingressHandler)); err != nil {
485+
return err
486+
}
487+
488+
// Check if Gateway API client is initialized before setting up Gateway watch
489+
// This ensures we only set up the Gateway watch if Gateway CRDs are present
490+
if r.endpointResourcesManager != nil && r.endpointResourcesManager.HasGatewaySupport() {
491+
if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
492+
return err
493+
}
494+
} else {
495+
r.logger.Info("Gateway API CRDs not found, skipping Gateway event watch setup")
496+
}
497+
498+
return nil
353499
}
354500

355501
func (r *globalAcceleratorReconciler) setupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {

0 commit comments

Comments
 (0)