Skip to content

Commit

Permalink
feat: use indexed storage for endpointslices instead of manually filt…
Browse files Browse the repository at this point in the history
…ering them for each service
  • Loading branch information
defaulterrr committed Apr 30, 2024
1 parent 300f772 commit dff0055
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 67 deletions.
81 changes: 51 additions & 30 deletions internal/ingress/controller/store/endpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,72 @@ package store

import (
"fmt"
"strings"

discoveryv1 "k8s.io/api/discovery/v1"
apiNames "k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/tools/cache"
)

type getEppsForServiceFunc = func(key string) ([]*discoveryv1.EndpointSlice, error)

// EndpointSliceLister makes a Store that lists Endpoints.
type EndpointSliceLister struct {
cache.Store
endpointSliceIndex getEppsForServiceFunc
}

// MatchByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store.
func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) {
var eps []*discoveryv1.EndpointSlice
keyNsLen := strings.Index(key, "/")
if keyNsLen < -1 {
keyNsLen = 0
} else {
// count '/' char
keyNsLen++
epss, err := s.endpointSliceIndex(key)
if err != nil {
return nil, err
}
// filter endpointSlices owned by svc
for _, listKey := range s.ListKeys() {
if len(key) < (apiNames.MaxGeneratedNameLength+keyNsLen) && !strings.HasPrefix(listKey, key) {
continue
}
// generated endpointslices names has truncated svc name as prefix when svc name is too long, we compare only non truncated part
// https://github.com/kubernetes/ingress-nginx/issues/9240
if len(key) >= (apiNames.MaxGeneratedNameLength+keyNsLen) && !strings.HasPrefix(listKey, key[:apiNames.MaxGeneratedNameLength+keyNsLen-1]) {
continue

if len(epss) == 0 {
return nil, NotExistsError(key)
}
return epss, nil
}

func epssIndexer() cache.Indexers {
return cache.Indexers{
discoveryv1.LabelServiceName: func(obj interface{}) ([]string, error) {
eps, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
// Skip object as it is not an endpointslice
return nil, nil
}

parentService, ok := eps.Labels[discoveryv1.LabelServiceName]
if !ok {
// There is no parent service and thus we cannot match this endpointslice to any service
// As far as i'm aware, this is only possible if you create epps objects by hand
return nil, nil
}

key := fmt.Sprintf("%s/%s", eps.Namespace, parentService)

return []string{key}, nil
},
}
}

func eppsForServiceFuncFromIndexer(indexer cache.Indexer) getEppsForServiceFunc {
return func(key string) ([]*discoveryv1.EndpointSlice, error) {
objs, err := indexer.ByIndex(discoveryv1.LabelServiceName, key)
if err != nil {
return nil, err
}
epss, exists, err := s.GetByKey(listKey)
if exists && err == nil {
// check for svc owner label
if svcName, ok := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName]; ok {
namespace := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetNamespace()
if key == fmt.Sprintf("%s/%s", namespace, svcName) {
eps = append(eps, epss.(*discoveryv1.EndpointSlice))
}

epss := make([]*discoveryv1.EndpointSlice, 0, len(objs))
for _, obj := range objs {
eps, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
continue
}

epss = append(epss, eps)
}

return epss, nil
}
if len(eps) == 0 {
return nil, NotExistsError(key)
}
return eps, nil
}
88 changes: 51 additions & 37 deletions internal/ingress/controller/store/endpointslice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@ import (
"k8s.io/client-go/tools/cache"
)

func newEndpointSliceLister(t *testing.T) *EndpointSliceLister {
func newEndpointSliceLister(t *testing.T) (*EndpointSliceLister, cache.Indexer) {
t.Helper()

return &EndpointSliceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, epssIndexer())

return &EndpointSliceLister{
Store: store,
endpointSliceIndex: eppsForServiceFuncFromIndexer(indexer),
}, indexer
}

func TestEndpointSliceLister(t *testing.T) {
t.Run("the key does not exist", func(t *testing.T) {
el := newEndpointSliceLister(t)
el, _ := newEndpointSliceLister(t)

key := "namespace/svcname"
_, err := el.MatchByKey(key)
Expand All @@ -47,58 +53,60 @@ func TestEndpointSliceLister(t *testing.T) {
}
})
t.Run("the key exists", func(t *testing.T) {
el := newEndpointSliceLister(t)
el, indexer := newEndpointSliceLister(t)

key := "namespace/svcname"
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "anothername-foo",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svcname",
epss := []*discoveryv1.EndpointSlice{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "anothername-foo",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svcname",
},
},
},
}
if err := el.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
}
endpointSlice = &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "svcname-bar",
Labels: map[string]string{
discoveryv1.LabelServiceName: "othersvc",
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "svcname-bar",
Labels: map[string]string{
discoveryv1.LabelServiceName: "othersvc",
},
},
},
}
if err := el.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
}
endpointSlice = &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "svcname-buz",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svcname",
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "svcname-buz",
Labels: map[string]string{
discoveryv1.LabelServiceName: "svcname2",
},
},
},
}
if err := el.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
for _, eps := range epss {
if err := el.Add(eps); err != nil {
t.Errorf("unexpected error %v", err)
}
if err := indexer.Add(eps); err != nil {
t.Errorf("unexpected error %v", err)
}
}

eps, err := el.MatchByKey(key)
if err != nil {
t.Errorf("unexpeted error %v", err)
}
if err == nil && len(eps) != 1 {
t.Errorf("expected one slice %v, error, got %d slices", endpointSlice, len(eps))
t.Errorf("expected one slice %v, error, got %d slices, keys stored in indexer: %v, eps returned by storer: %v", epss[0], len(eps), indexer.ListKeys(), eps)
}
if len(eps) > 0 && eps[0].GetName() != endpointSlice.GetName() {
t.Errorf("expected %v, error, got %v", endpointSlice.GetName(), eps[0].GetName())
if len(eps) > 0 && eps[0].GetName() != epss[0].GetName() {
t.Errorf("expected %v, error, got %v", epss[0].GetName(), eps[0].GetName())
}
})
t.Run("svc long name", func(t *testing.T) {
el := newEndpointSliceLister(t)
el, indexer := newEndpointSliceLister(t)
ns := "namespace"
ns2 := "another-ns"
svcName := "test-backend-http-test-http-test-http-test-http-test-http-truncated"
Expand All @@ -116,6 +124,9 @@ func TestEndpointSliceLister(t *testing.T) {
if err := el.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
}
if err := indexer.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
}
endpointSlice2 := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns2,
Expand All @@ -128,9 +139,12 @@ func TestEndpointSliceLister(t *testing.T) {
if err := el.Add(endpointSlice2); err != nil {
t.Errorf("unexpected error %v", err)
}
if err := indexer.Add(endpointSlice); err != nil {
t.Errorf("unexpected error %v", err)
}
eps, err := el.MatchByKey(key)
if err != nil {
t.Errorf("unexpeted error %v", err)
t.Errorf("unexpected error %v", err)
}
if len(eps) != 1 {
t.Errorf("expected one slice %v, error, got %d slices", endpointSlice, len(eps))
Expand Down
8 changes: 8 additions & 0 deletions internal/ingress/controller/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,15 @@ func New(
}

store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer()
// Add new endpointslices indexer to markup epps upfront for fast pinpoint retrieval
if err := store.informers.EndpointSlice.AddIndexers(epssIndexer()); err != nil {
// This error only occurs due to errors in code, this panic is not possible in runtime
// if the underlying code is correct. Typically, this error signals conflicts in indexer
panic(fmt.Sprintf("failed to add new index for endpointslices: %v", err))
}

store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore()
store.listers.EndpointSlice.endpointSliceIndex = eppsForServiceFuncFromIndexer(store.informers.EndpointSlice.GetIndexer())

store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
Expand Down

0 comments on commit dff0055

Please sign in to comment.