diff --git a/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml b/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml index d41c7bb..19bb052 100644 --- a/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml +++ b/envoy-sidecar/deploy/kustomize/kyverno/mutate/mutate-semaphore-xds-envoy-sidecar.yaml @@ -62,7 +62,7 @@ spec: config_source: resource_api_version: V3 api_config_source: - api_type: GRPC + api_type: DELTA_GRPC transport_api_version: V3 grpc_services: - envoy_grpc: diff --git a/xds/service.go b/xds/service.go index 24e621d..31b5e76 100644 --- a/xds/service.go +++ b/xds/service.go @@ -277,9 +277,10 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type // servicesToResourcesWithNames returns maps of VirtualHost and Cluster type // resources as expected by a Linear cache -func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string) (map[string]types.Resource, map[string]types.Resource) { +func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string) (map[string]types.Resource, map[string]types.Resource, map[string]types.Resource) { cls := make(map[string]types.Resource) vhds := make(map[string]types.Resource) + rds := make(map[string]types.Resource) for _, s := range serviceStore.All() { for _, port := range s.Service.Spec.Ports { vh := makeVirtualHost(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Retry, s.RingHashPolicies) @@ -288,5 +289,6 @@ func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string cls[cluster.Name] = patchClusterDeltaEDS(cluster) } } - return cls, vhds + rds["kube_dynamic"] = makeKubeDynamicRouteConfig() + return cls, vhds, rds } diff --git a/xds/snapshotter.go b/xds/snapshotter.go index 897527a..11c1fdd 100644 --- a/xds/snapshotter.go +++ b/xds/snapshotter.go @@ -55,6 +55,7 @@ type Snapshotter struct { endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot deltaCDSCache *cache.LinearCache deltaEDSCache *cache.LinearCache + deltaRDSCache *cache.LinearCache deltaVHDSCache *cache.LinearCache muxCache cache.MuxCache requestRateLimit *rate.Limiter // maximum number of requests allowed to server @@ -136,6 +137,8 @@ func mapDeltaTypeURL(typeURL string) string { return "deltaClusters" case resource.EndpointType: return "deltaEndpoints" + case resource.RouteType: + return "deltaRouteConfigurations" case resource.VirtualHostType: return "deltaVirtualHosts" default: @@ -160,6 +163,8 @@ func (s *Snapshotter) getDeltaCacheForType(typeURL string) *cache.LinearCache { return s.deltaEDSCache case resource.ClusterType: return s.deltaCDSCache + case resource.RouteType: + return s.deltaRDSCache case resource.VirtualHostType: return s.deltaVHDSCache default: @@ -173,6 +178,7 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger) deltaCDSCache := cache.NewLinearCache(resource.ClusterType, cache.WithLogger(log.EnvoyLogger)) deltaEDSCache := cache.NewLinearCache(resource.EndpointType, cache.WithLogger(log.EnvoyLogger)) + deltaRDSCache := cache.NewLinearCache(resource.RouteType, cache.WithLogger(log.EnvoyLogger)) deltaVHDSCache := cache.NewLinearCache(resource.VirtualHostType, cache.WithLogger(log.EnvoyLogger)) muxCache := cache.MuxCache{ Classify: func(r *cache.Request) string { @@ -182,11 +188,12 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi return mapDeltaTypeURL(r.TypeUrl) }, Caches: map[string]cache.Cache{ - "services": servicesCache, - "endpoints": endpointsCache, - "deltaClusters": deltaCDSCache, - "deltaEndpoints": deltaEDSCache, - "deltaVirtualHosts": deltaVHDSCache, + "services": servicesCache, + "endpoints": endpointsCache, + "deltaClusters": deltaCDSCache, + "deltaEndpoints": deltaEDSCache, + "deltaRouteConfigurations": deltaRDSCache, + "deltaVirtualHosts": deltaVHDSCache, }, } return &Snapshotter{ @@ -196,6 +203,7 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi endpointsCache: endpointsCache, deltaCDSCache: deltaCDSCache, deltaEDSCache: deltaEDSCache, + deltaRDSCache: deltaRDSCache, deltaVHDSCache: deltaVHDSCache, muxCache: muxCache, requestRateLimit: rate.NewLimiter(rate.Limit(requestLimit), 1), @@ -259,9 +267,10 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error { return fmt.Errorf("Failed to set services snapshot %v", err) } // Sync linear caches - deltaCLS, deltaVHDS := servicesToResourcesWithNames(serviceStore, "") + deltaCLS, deltaVHDS, deltaRDS := servicesToResourcesWithNames(serviceStore, "") s.getDeltaCacheForType(resource.ClusterType).SetResources(deltaCLS) s.getDeltaCacheForType(resource.VirtualHostType).SetResources(deltaVHDS) + s.getDeltaCacheForType(resource.RouteType).SetResources(deltaRDS) s.nodes.Range(func(nID, n interface{}) bool { nodeID := nID.(string)