Skip to content

Commit

Permalink
Make kube_dynamic route configuration available to DELTA_GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
ffilippopoulos committed Jan 14, 2025
1 parent e41d152 commit 8e158e6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ spec:
config_source:
resource_api_version: V3
api_config_source:
api_type: GRPC
api_type: DELTA_GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
Expand Down
6 changes: 4 additions & 2 deletions xds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,10 @@ func servicesToResources(serviceStore XdsServiceStore, authority string) ([]type

// servicesToResourcesWithNames returns maps of VirtualHost and Cluster type
// resources as expected by a Linear cache
func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string) (map[string]types.Resource, map[string]types.Resource) {
func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string) (map[string]types.Resource, map[string]types.Resource, map[string]types.Resource) {
cls := make(map[string]types.Resource)
vhds := make(map[string]types.Resource)
rds := make(map[string]types.Resource)
for _, s := range serviceStore.All() {
for _, port := range s.Service.Spec.Ports {
vh := makeVirtualHost(s.Service.Name, s.Service.Namespace, authority, port.Port, s.Retry, s.RingHashPolicies)
Expand All @@ -288,5 +289,6 @@ func servicesToResourcesWithNames(serviceStore XdsServiceStore, authority string
cls[cluster.Name] = patchClusterDeltaEDS(cluster)
}
}
return cls, vhds
rds["kube_dynamic"] = makeKubeDynamicRouteConfig()
return cls, vhds, rds
}
21 changes: 15 additions & 6 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Snapshotter struct {
endpointsSnapVersion int32 // Endpoints snap version for empty node ID snapshot
deltaCDSCache *cache.LinearCache
deltaEDSCache *cache.LinearCache
deltaRDSCache *cache.LinearCache
deltaVHDSCache *cache.LinearCache
muxCache cache.MuxCache
requestRateLimit *rate.Limiter // maximum number of requests allowed to server
Expand Down Expand Up @@ -136,6 +137,8 @@ func mapDeltaTypeURL(typeURL string) string {
return "deltaClusters"
case resource.EndpointType:
return "deltaEndpoints"
case resource.RouteType:
return "deltaRouteConfigurations"
case resource.VirtualHostType:
return "deltaVirtualHosts"
default:
Expand All @@ -160,6 +163,8 @@ func (s *Snapshotter) getDeltaCacheForType(typeURL string) *cache.LinearCache {
return s.deltaEDSCache
case resource.ClusterType:
return s.deltaCDSCache
case resource.RouteType:
return s.deltaRDSCache
case resource.VirtualHostType:
return s.deltaVHDSCache
default:
Expand All @@ -173,6 +178,7 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi
endpointsCache := cache.NewSnapshotCache(false, cache.IDHash{}, log.EnvoyLogger)
deltaCDSCache := cache.NewLinearCache(resource.ClusterType, cache.WithLogger(log.EnvoyLogger))
deltaEDSCache := cache.NewLinearCache(resource.EndpointType, cache.WithLogger(log.EnvoyLogger))
deltaRDSCache := cache.NewLinearCache(resource.RouteType, cache.WithLogger(log.EnvoyLogger))
deltaVHDSCache := cache.NewLinearCache(resource.VirtualHostType, cache.WithLogger(log.EnvoyLogger))
muxCache := cache.MuxCache{
Classify: func(r *cache.Request) string {
Expand All @@ -182,11 +188,12 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi
return mapDeltaTypeURL(r.TypeUrl)
},
Caches: map[string]cache.Cache{
"services": servicesCache,
"endpoints": endpointsCache,
"deltaClusters": deltaCDSCache,
"deltaEndpoints": deltaEDSCache,
"deltaVirtualHosts": deltaVHDSCache,
"services": servicesCache,
"endpoints": endpointsCache,
"deltaClusters": deltaCDSCache,
"deltaEndpoints": deltaEDSCache,
"deltaRouteConfigurations": deltaRDSCache,
"deltaVirtualHosts": deltaVHDSCache,
},
}
return &Snapshotter{
Expand All @@ -196,6 +203,7 @@ func NewSnapshotter(authority string, port uint, requestLimit, streamRequestLimi
endpointsCache: endpointsCache,
deltaCDSCache: deltaCDSCache,
deltaEDSCache: deltaEDSCache,
deltaRDSCache: deltaRDSCache,
deltaVHDSCache: deltaVHDSCache,
muxCache: muxCache,
requestRateLimit: rate.NewLimiter(rate.Limit(requestLimit), 1),
Expand Down Expand Up @@ -259,9 +267,10 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
return fmt.Errorf("Failed to set services snapshot %v", err)
}
// Sync linear caches
deltaCLS, deltaVHDS := servicesToResourcesWithNames(serviceStore, "")
deltaCLS, deltaVHDS, deltaRDS := servicesToResourcesWithNames(serviceStore, "")
s.getDeltaCacheForType(resource.ClusterType).SetResources(deltaCLS)
s.getDeltaCacheForType(resource.VirtualHostType).SetResources(deltaVHDS)
s.getDeltaCacheForType(resource.RouteType).SetResources(deltaRDS)

s.nodes.Range(func(nID, n interface{}) bool {
nodeID := nID.(string)
Expand Down

0 comments on commit 8e158e6

Please sign in to comment.