Skip to content

Commit 9f38c16

Browse files
authored
Merge pull request #4466 from zac-nixon/AGAController
Merge AGAController branch into main
2 parents 95b07ef + fa43ed5 commit 9f38c16

File tree

89 files changed

+11176
-329
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+11176
-329
lines changed

apis/aga/v1beta1/globalaccelerator_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
)
4949

5050
// PortRange defines the port range for Global Accelerator listeners.
51+
// +kubebuilder:validation:XValidation:rule="self.fromPort <= self.toPort",message="FromPort must be less than or equal to ToPort"
5152
type PortRange struct {
5253
// FromPort is the first port in the range of ports, inclusive.
5354
// +kubebuilder:validation:Minimum=1

config/crd/aga/aga-crds.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ spec:
264264
- fromPort
265265
- toPort
266266
type: object
267+
x-kubernetes-validations:
268+
- message: FromPort must be less than or equal to ToPort
269+
rule: self.fromPort <= self.toPort
267270
maxItems: 10
268271
minItems: 1
269272
type: array

config/crd/aga/aga.k8s.aws_globalaccelerators.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ spec:
264264
- fromPort
265265
- toPort
266266
type: object
267+
x-kubernetes-validations:
268+
- message: FromPort must be less than or equal to ToPort
269+
rule: self.fromPort <= self.toPort
267270
maxItems: 10
268271
minItems: 1
269272
type: array
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# This patch adds the GlobalAccelerator validator webhook configuration to the webhook configurations
2+
apiVersion: admissionregistration.k8s.io/v1
3+
kind: ValidatingWebhookConfiguration
4+
metadata:
5+
name: webhook-configuration
6+
webhooks:
7+
- name: vglobalaccelerator.aga.k8s.aws
8+
rules:
9+
- apiGroups:
10+
- "aga.k8s.aws"
11+
apiVersions:
12+
- v1beta1
13+
operations:
14+
- CREATE
15+
- UPDATE
16+
resources:
17+
- globalaccelerators
18+
scope: "Namespaced"

config/webhook/kustomization.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ patchesStrategicMerge:
99
- pod_mutator_patch.yaml
1010
- service_mutator_patch.yaml
1111
- ingressclassparams_validator_patch.yaml
12+
- globalaccelerator_validator_patch.yaml

config/webhook/manifests.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,27 @@ kind: ValidatingWebhookConfiguration
125125
metadata:
126126
name: webhook
127127
webhooks:
128+
- admissionReviewVersions:
129+
- v1beta1
130+
clientConfig:
131+
service:
132+
name: webhook-service
133+
namespace: system
134+
path: /validate-aga-k8s-aws-v1beta1-globalaccelerator
135+
failurePolicy: Fail
136+
matchPolicy: Equivalent
137+
name: vglobalaccelerator.aga.k8s.aws
138+
rules:
139+
- apiGroups:
140+
- aga.k8s.aws
141+
apiVersions:
142+
- v1beta1
143+
operations:
144+
- CREATE
145+
- UPDATE
146+
resources:
147+
- globalaccelerators
148+
sideEffects: None
128149
- admissionReviewVersions:
129150
- v1beta1
130151
clientConfig:

controllers/aga/globalaccelerator_controller.go

Lines changed: 114 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
23+
24+
"github.com/aws/aws-sdk-go-v2/service/globalaccelerator/types"
2225
"github.com/go-logr/logr"
26+
"github.com/pkg/errors"
2327
corev1 "k8s.io/api/core/v1"
24-
"k8s.io/apimachinery/pkg/types"
28+
ktypes "k8s.io/apimachinery/pkg/types"
2529
"k8s.io/client-go/kubernetes"
2630
"k8s.io/client-go/tools/record"
31+
"k8s.io/client-go/util/workqueue"
32+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
33+
agadeploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/aga"
2734
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
2835
ctrl "sigs.k8s.io/controller-runtime"
2936
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -42,6 +49,7 @@ import (
4249
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
4350
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
4451
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
52+
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
4553
)
4654

4755
const (
@@ -52,24 +60,30 @@ const (
5260
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
5361
globalAcceleratorKind = "GlobalAccelerator"
5462

63+
// Requeue constants for provisioning state monitoring
64+
requeueMessage = "Monitoring provisioning state"
65+
statusUpdateRequeueTime = 1 * time.Minute
66+
5567
// Metric stage constants
5668
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
5769
MetricStageAddFinalizers = "add_finalizers"
5870
MetricStageBuildModel = "build_model"
71+
MetricStageDeployStack = "deploy_stack"
5972
MetricStageReconcileGlobalAccelerator = "reconcile_globalaccelerator"
6073

6174
// Metric error constants
6275
MetricErrorAddFinalizers = "add_finalizers_error"
6376
MetricErrorRemoveFinalizers = "remove_finalizers_error"
6477
MetricErrorBuildModel = "build_model_error"
78+
MetricErrorDeployStack = "deploy_stack_error"
6579
MetricErrorReconcileGlobalAccelerator = "reconcile_globalaccelerator_error"
6680
)
6781

6882
// NewGlobalAcceleratorReconciler constructs new globalAcceleratorReconciler
69-
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
83+
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
7084

7185
// Create tracking provider
72-
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName)
86+
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(config.AWSConfig.Region))
7387

7488
// Create model builder
7589
agaModelBuilder := aga.NewDefaultModelBuilder(
@@ -78,6 +92,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
7892
trackingProvider,
7993
config.FeatureGates,
8094
config.ClusterName,
95+
config.AWSConfig.Region,
8196
config.DefaultTags,
8297
config.ExternalManagedTags,
8398
logger.WithName("aga-model-builder"),
@@ -87,17 +102,26 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
87102
// Create stack marshaller
88103
stackMarshaller := deploy.NewDefaultStackMarshaller()
89104

105+
// Create AGA stack deployer
106+
stackDeployer := agadeploy.NewDefaultStackDeployer(cloud, config, trackingProvider, logger.WithName("aga-stack-deployer"), metricsCollector, controllerName)
107+
108+
// Create status updater
109+
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)
110+
90111
return &globalAcceleratorReconciler{
91112
k8sClient: k8sClient,
92113
eventRecorder: eventRecorder,
93114
finalizerManager: finalizerManager,
94115
logger: logger,
95116
modelBuilder: agaModelBuilder,
96117
stackMarshaller: stackMarshaller,
118+
stackDeployer: stackDeployer,
119+
statusUpdater: statusUpdater,
97120
metricsCollector: metricsCollector,
98121
reconcileTracker: reconcileCounters.IncrementAGA,
99122

100-
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
123+
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
124+
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
101125
}
102126
}
103127

@@ -108,11 +132,14 @@ type globalAcceleratorReconciler struct {
108132
finalizerManager k8s.FinalizerManager
109133
modelBuilder aga.ModelBuilder
110134
stackMarshaller deploy.StackMarshaller
135+
stackDeployer agadeploy.StackDeployer
136+
statusUpdater agastatus.StatusUpdater
111137
logger logr.Logger
112138
metricsCollector lbcmetrics.MetricCollector
113-
reconcileTracker func(namespaceName types.NamespacedName)
139+
reconcileTracker func(namespaceName ktypes.NamespacedName)
114140

115-
maxConcurrentReconciles int
141+
maxConcurrentReconciles int
142+
maxExponentialBackoffDelay time.Duration
116143
}
117144

118145
// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
@@ -155,26 +182,19 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
155182
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorAddFinalizers, err, r.metricsCollector)
156183
}
157184

158-
// TODO: Implement GlobalAccelerator resource management
159-
// This would include:
160-
// 1. Creating/updating AWS Global Accelerator
161-
// 2. Managing listeners and endpoint groups
162-
// 3. Handling endpoint discovery from Services/Ingresses/Gateways
163185
reconcileResourceFn := func() {
164186
err = r.reconcileGlobalAcceleratorResources(ctx, ga)
165187
}
166188
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageReconcileGlobalAccelerator, reconcileResourceFn)
167189
if err != nil {
168190
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorReconcileGlobalAccelerator, err, r.metricsCollector)
169191
}
170-
171-
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
172192
return nil
173193
}
174194

175195
func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
176196
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
177-
// TODO: Implement cleanup logic for AWS Global Accelerator resources
197+
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
178198
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
179199
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
180200
return err
@@ -203,7 +223,7 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
203223
}
204224

205225
func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
206-
r.logger.Info("Reconciling GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
226+
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
207227
var stack core.Stack
208228
var accelerator *agamodel.Accelerator
209229
var err error
@@ -212,25 +232,93 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
212232
}
213233
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
214234
if err != nil {
235+
// Update status to indicate model building failure
236+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
237+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
238+
}
215239
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorBuildModel, err, r.metricsCollector)
216240
}
217241

218-
// Log the built model for debugging
219-
r.logger.Info("Built model successfully", "accelerator", accelerator.ID(), "stackID", stack.StackID())
242+
// Deploy the stack to create/update AWS Global Accelerator resources
243+
deployStackFn := func() {
244+
err = r.stackDeployer.Deploy(ctx, stack, r.metricsCollector, controllerName)
245+
}
246+
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
247+
if err != nil {
248+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))
220249

221-
// TODO: Implement the deploy phase
222-
// This would include:
223-
// 1. Deploy the stack to create/update AWS Global Accelerator resources
224-
// 2. Update the GlobalAccelerator status with the created resources
225-
// 3. Handle any deployment errors and update status accordingly
250+
// Update status to indicate deployment failure
251+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
252+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
253+
}
254+
255+
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorDeployStack, err, r.metricsCollector)
256+
}
257+
258+
r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())
259+
260+
// Update GlobalAccelerator status after successful deployment
261+
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
262+
if err != nil {
263+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
264+
return err
265+
}
266+
if requeueNeeded {
267+
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
268+
}
269+
270+
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
226271

227272
return nil
228273
}
229274

230275
func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
231-
// TODO: Implement the actual AWS Global Accelerator resource cleanup
232-
// This is a placeholder implementation
233-
r.logger.Info("Cleaning up GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
276+
r.logger.Info("Cleaning up GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
277+
278+
// Our enhanced AcceleratorManager now handles deletion of listeners before accelerator.
279+
// TODO: This will be enhanced to delete endpoint groups and endpoints
280+
// before deleting listeners and accelerator (when those features are implemented)
281+
// 1. Find the accelerator ARN from the CRD status
282+
if ga.Status.AcceleratorARN == nil {
283+
r.logger.Info("No accelerator ARN found in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
284+
return nil
285+
}
286+
287+
acceleratorARN := *ga.Status.AcceleratorARN
288+
if acceleratorARN == "" {
289+
r.logger.Info("Empty accelerator ARN in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
290+
return nil
291+
}
292+
293+
// 2. Delete the accelerator using accelerator delete manager
294+
acceleratorManager := r.stackDeployer.GetAcceleratorManager()
295+
r.logger.Info("Deleting accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
296+
297+
// Initialize reference to existing accelerator for deletion
298+
acceleratorWithTags := agadeploy.AcceleratorWithTags{
299+
Accelerator: &types.Accelerator{
300+
AcceleratorArn: &acceleratorARN,
301+
},
302+
Tags: nil,
303+
}
304+
305+
if err := acceleratorManager.Delete(ctx, acceleratorWithTags); err != nil {
306+
// Check if it's an AcceleratorNotDisabledError
307+
var notDisabledErr *agadeploy.AcceleratorNotDisabledError
308+
if errors.As(err, &notDisabledErr) {
309+
// Update status to indicate we're waiting for the accelerator to be disabled
310+
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
311+
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
312+
}
313+
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
314+
}
315+
316+
// Any other error
317+
r.logger.Error(err, "Failed to delete accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
318+
return fmt.Errorf("failed to delete accelerator %s: %w", acceleratorARN, err)
319+
}
320+
321+
r.logger.Info("Successfully cleaned up all GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
234322
return nil
235323
}
236324

@@ -259,6 +347,7 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
259347
Named(controllerName).
260348
WithOptions(controller.Options{
261349
MaxConcurrentReconciles: r.maxConcurrentReconciles,
350+
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
262351
}).
263352
Complete(r)
264353
}

0 commit comments

Comments
 (0)