Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions activator/activator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestActivator(t *testing.T) {
}
assert.NoError(t, s.Reset())
s.Stop(t.Context())
bpf.Cleanup()
})
}
}
Expand Down
131 changes: 87 additions & 44 deletions activator/bpf.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activator

import (
"errors"
"fmt"
"log/slog"
"maps"
Expand All @@ -10,6 +11,7 @@ import (
"strconv"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/rlimit"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand All @@ -30,9 +32,10 @@ const (
type BPF struct {
pid int
objs *bpfObjects
log *slog.Logger
qdiscs []*netlink.GenericQdisc
filters []*netlink.BpfFilter
log *slog.Logger
links []link.Link
}

type BPFConfig struct {
Expand Down Expand Up @@ -101,73 +104,113 @@ func InitBPF(pid int, log *slog.Logger, probeBinaryName string, opts ...BPFOpts)
}

func (bpf *BPF) Cleanup() error {
errs := []error{}
for _, link := range bpf.links {
if err := link.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing link: %w", err))
}
}
if err := bpf.objs.Close(); err != nil {
return fmt.Errorf("unable to close bpf objects: %w", err)
errs = append(errs, fmt.Errorf("unable to close bpf objects: %w", err))
}

for _, qdisc := range bpf.qdiscs {
if err := netlink.QdiscDel(qdisc); !os.IsNotExist(err) {
return fmt.Errorf("unable to delete qdisc: %w", err)
errs = append(errs, fmt.Errorf("unable to delete qdisc: %w", err))
}
}
for _, filter := range bpf.filters {
if err := netlink.FilterDel(filter); !os.IsNotExist(err) {
return fmt.Errorf("unable to delete filter: %w", err)
errs = append(errs, fmt.Errorf("unable to delete filter: %w", err))
}
}

bpf.log.Info("deleting", "path", PinPath(bpf.pid))
return os.RemoveAll(PinPath(bpf.pid))
errs = append(errs, os.RemoveAll(PinPath(bpf.pid)))
return errors.Join(errs...)
}

func (bpf *BPF) AttachRedirector(ifaces ...string) error {
for _, iface := range ifaces {
devID, err := net.InterfaceByName(iface)
for _, ifaceName := range ifaces {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return fmt.Errorf("could not get interface ID: %w", err)
}

qdisc := &netlink.GenericQdisc{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: devID.Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
},
QdiscType: "clsact",
}

if err := netlink.QdiscReplace(qdisc); err != nil {
return fmt.Errorf("could not replace qdisc: %w", err)
// try TCX first and if that is unsupported by the kernel fallback to
// the old qdisc.
if err := bpf.attachTCX(iface); err == nil {
bpf.log.Info("attached redirector using TCX", "iface", ifaceName)
continue
}
bpf.qdiscs = append(bpf.qdiscs, qdisc)

ingress := netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: devID.Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: 1,
Protocol: unix.ETH_P_ALL,
},
Fd: bpf.objs.TcRedirectIngress.FD(),
Name: bpf.objs.TcRedirectIngress.String(),
DirectAction: true,
if err := bpf.attachQdisc(iface); err == nil {
bpf.log.Warn("attached redirector using Qdisc as TCX failed", "iface", ifaceName)
return err
}
egress := ingress
egress.Parent = netlink.HANDLE_MIN_EGRESS
egress.Fd = bpf.objs.TcRedirectEgress.FD()
egress.Name = bpf.objs.TcRedirectEgress.String()
}
return nil
}

if err := netlink.FilterReplace(&ingress); err != nil {
return fmt.Errorf("failed to replace tc filter: %w", err)
}
bpf.filters = append(bpf.filters, &ingress)
func (bpf *BPF) attachTCX(iface *net.Interface) error {
ingress, err := link.AttachTCX(link.TCXOptions{
Interface: iface.Index,
Program: bpf.objs.TcRedirectIngress,
Attach: ebpf.AttachTCXIngress,
})
if err != nil {
return fmt.Errorf("could not attach ingress TCX: %w", err)
}
bpf.links = append(bpf.links, ingress)
egress, err := link.AttachTCX(link.TCXOptions{
Interface: iface.Index,
Program: bpf.objs.TcRedirectEgress,
Attach: ebpf.AttachTCXEgress,
})
if err != nil {
return fmt.Errorf("could not attach egress TCX: %w", err)
}
bpf.links = append(bpf.links, egress)
return nil
}

if err := netlink.FilterReplace(&egress); err != nil {
return fmt.Errorf("failed to replace tc filter: %w", err)
}
bpf.filters = append(bpf.filters, &egress)
func (bpf *BPF) attachQdisc(iface *net.Interface) error {
qdisc := &netlink.GenericQdisc{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: iface.Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
},
QdiscType: "clsact",
}

if err := netlink.QdiscReplace(qdisc); err != nil {
return fmt.Errorf("could not replace qdisc: %w", err)
}
bpf.qdiscs = append(bpf.qdiscs, qdisc)

ingress := netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: iface.Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: 1,
Protocol: unix.ETH_P_ALL,
},
Fd: bpf.objs.TcRedirectIngress.FD(),
Name: bpf.objs.TcRedirectIngress.String(),
DirectAction: true,
}
egress := ingress
egress.Parent = netlink.HANDLE_MIN_EGRESS
egress.Fd = bpf.objs.TcRedirectEgress.FD()
egress.Name = bpf.objs.TcRedirectEgress.String()

if err := netlink.FilterReplace(&ingress); err != nil {
return fmt.Errorf("failed to replace tc filter: %w", err)
}
bpf.filters = append(bpf.filters, &ingress)

if err := netlink.FilterReplace(&egress); err != nil {
return fmt.Errorf("failed to replace tc filter: %w", err)
}
bpf.filters = append(bpf.filters, &egress)
return nil
}

Expand Down
18 changes: 11 additions & 7 deletions e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,27 +694,27 @@ func endpointsReady(endpoints []discoveryv1.Endpoint) bool {

func cordonNode(t testing.TB, ctx context.Context, client client.Client, name string) (uncordon func()) {
node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}
retry.RetryOnConflict(retry.DefaultRetry, func() error {
assert.NoError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := client.Get(ctx, objectName(node), node); err != nil {
return err
}
node.Spec.Unschedulable = true
return client.Update(ctx, node)
})
}))
return func() {
uncordonNode(t, ctx, client, name)
}
}

func uncordonNode(t testing.TB, ctx context.Context, client client.Client, name string) {
node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}
retry.RetryOnConflict(retry.DefaultRetry, func() error {
assert.NoError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := client.Get(ctx, objectName(node), node); err != nil {
return err
}
node.Spec.Unschedulable = false
return client.Update(ctx, node)
})
}))
}

func cordonOtherNodes(t testing.TB, ctx context.Context, client client.Client, name string) (uncordon func()) {
Expand All @@ -725,9 +725,13 @@ func cordonOtherNodes(t testing.TB, ctx context.Context, client client.Client, n
if node.Name == name {
continue
}
require.NoError(t, client.Get(ctx, objectName(&node), &node))
node.Spec.Unschedulable = true
require.NoError(t, client.Update(ctx, &node))
assert.NoError(t, retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := client.Get(ctx, objectName(&node), &node); err != nil {
return err
}
node.Spec.Unschedulable = true
return client.Update(ctx, &node)
}))
uncordonFuncs = append(uncordonFuncs, func() {
uncordonNode(t, ctx, client, node.Name)
})
Expand Down