diff --git a/activator/activator_test.go b/activator/activator_test.go index d4b495a..d40df75 100644 --- a/activator/activator_test.go +++ b/activator/activator_test.go @@ -179,6 +179,7 @@ func TestActivator(t *testing.T) { } assert.NoError(t, s.Reset()) s.Stop(t.Context()) + bpf.Cleanup() }) } } diff --git a/activator/bpf.go b/activator/bpf.go index 8f097d8..7651dfb 100644 --- a/activator/bpf.go +++ b/activator/bpf.go @@ -1,6 +1,7 @@ package activator import ( + "errors" "fmt" "log/slog" "maps" @@ -10,6 +11,7 @@ import ( "strconv" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/rlimit" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -30,9 +32,10 @@ const ( type BPF struct { pid int objs *bpfObjects + log *slog.Logger qdiscs []*netlink.GenericQdisc filters []*netlink.BpfFilter - log *slog.Logger + links []link.Link } type BPFConfig struct { @@ -101,73 +104,113 @@ func InitBPF(pid int, log *slog.Logger, probeBinaryName string, opts ...BPFOpts) } func (bpf *BPF) Cleanup() error { + errs := []error{} + for _, link := range bpf.links { + if err := link.Close(); err != nil { + errs = append(errs, fmt.Errorf("closing link: %w", err)) + } + } if err := bpf.objs.Close(); err != nil { - return fmt.Errorf("unable to close bpf objects: %w", err) + errs = append(errs, fmt.Errorf("unable to close bpf objects: %w", err)) } - for _, qdisc := range bpf.qdiscs { if err := netlink.QdiscDel(qdisc); !os.IsNotExist(err) { - return fmt.Errorf("unable to delete qdisc: %w", err) + errs = append(errs, fmt.Errorf("unable to delete qdisc: %w", err)) } } for _, filter := range bpf.filters { if err := netlink.FilterDel(filter); !os.IsNotExist(err) { - return fmt.Errorf("unable to delete filter: %w", err) + errs = append(errs, fmt.Errorf("unable to delete filter: %w", err)) } } bpf.log.Info("deleting", "path", PinPath(bpf.pid)) - return os.RemoveAll(PinPath(bpf.pid)) + errs = append(errs, os.RemoveAll(PinPath(bpf.pid))) + return errors.Join(errs...) } func (bpf *BPF) AttachRedirector(ifaces ...string) error { - for _, iface := range ifaces { - devID, err := net.InterfaceByName(iface) + for _, ifaceName := range ifaces { + iface, err := net.InterfaceByName(ifaceName) if err != nil { return fmt.Errorf("could not get interface ID: %w", err) } - - qdisc := &netlink.GenericQdisc{ - QdiscAttrs: netlink.QdiscAttrs{ - LinkIndex: devID.Index, - Handle: netlink.MakeHandle(0xffff, 0), - Parent: netlink.HANDLE_CLSACT, - }, - QdiscType: "clsact", - } - - if err := netlink.QdiscReplace(qdisc); err != nil { - return fmt.Errorf("could not replace qdisc: %w", err) + // try TCX first and if that is unsupported by the kernel fallback to + // the old qdisc. + if err := bpf.attachTCX(iface); err == nil { + bpf.log.Info("attached redirector using TCX", "iface", ifaceName) + continue } - bpf.qdiscs = append(bpf.qdiscs, qdisc) - - ingress := netlink.BpfFilter{ - FilterAttrs: netlink.FilterAttrs{ - LinkIndex: devID.Index, - Parent: netlink.HANDLE_MIN_INGRESS, - Handle: 1, - Protocol: unix.ETH_P_ALL, - }, - Fd: bpf.objs.TcRedirectIngress.FD(), - Name: bpf.objs.TcRedirectIngress.String(), - DirectAction: true, + if err := bpf.attachQdisc(iface); err == nil { + bpf.log.Warn("attached redirector using Qdisc as TCX failed", "iface", ifaceName) + return err } - egress := ingress - egress.Parent = netlink.HANDLE_MIN_EGRESS - egress.Fd = bpf.objs.TcRedirectEgress.FD() - egress.Name = bpf.objs.TcRedirectEgress.String() + } + return nil +} - if err := netlink.FilterReplace(&ingress); err != nil { - return fmt.Errorf("failed to replace tc filter: %w", err) - } - bpf.filters = append(bpf.filters, &ingress) +func (bpf *BPF) attachTCX(iface *net.Interface) error { + ingress, err := link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: bpf.objs.TcRedirectIngress, + Attach: ebpf.AttachTCXIngress, + }) + if err != nil { + return fmt.Errorf("could not attach ingress TCX: %w", err) + } + bpf.links = append(bpf.links, ingress) + egress, err := link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: bpf.objs.TcRedirectEgress, + Attach: ebpf.AttachTCXEgress, + }) + if err != nil { + return fmt.Errorf("could not attach egress TCX: %w", err) + } + bpf.links = append(bpf.links, egress) + return nil +} - if err := netlink.FilterReplace(&egress); err != nil { - return fmt.Errorf("failed to replace tc filter: %w", err) - } - bpf.filters = append(bpf.filters, &egress) +func (bpf *BPF) attachQdisc(iface *net.Interface) error { + qdisc := &netlink.GenericQdisc{ + QdiscAttrs: netlink.QdiscAttrs{ + LinkIndex: iface.Index, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + }, + QdiscType: "clsact", + } + + if err := netlink.QdiscReplace(qdisc); err != nil { + return fmt.Errorf("could not replace qdisc: %w", err) } + bpf.qdiscs = append(bpf.qdiscs, qdisc) + ingress := netlink.BpfFilter{ + FilterAttrs: netlink.FilterAttrs{ + LinkIndex: iface.Index, + Parent: netlink.HANDLE_MIN_INGRESS, + Handle: 1, + Protocol: unix.ETH_P_ALL, + }, + Fd: bpf.objs.TcRedirectIngress.FD(), + Name: bpf.objs.TcRedirectIngress.String(), + DirectAction: true, + } + egress := ingress + egress.Parent = netlink.HANDLE_MIN_EGRESS + egress.Fd = bpf.objs.TcRedirectEgress.FD() + egress.Name = bpf.objs.TcRedirectEgress.String() + + if err := netlink.FilterReplace(&ingress); err != nil { + return fmt.Errorf("failed to replace tc filter: %w", err) + } + bpf.filters = append(bpf.filters, &ingress) + + if err := netlink.FilterReplace(&egress); err != nil { + return fmt.Errorf("failed to replace tc filter: %w", err) + } + bpf.filters = append(bpf.filters, &egress) return nil } diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 7d73487..3fb7c9f 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -694,13 +694,13 @@ func endpointsReady(endpoints []discoveryv1.Endpoint) bool { func cordonNode(t testing.TB, ctx context.Context, client client.Client, name string) (uncordon func()) { node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} - retry.RetryOnConflict(retry.DefaultRetry, func() error { + assert.NoError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error { if err := client.Get(ctx, objectName(node), node); err != nil { return err } node.Spec.Unschedulable = true return client.Update(ctx, node) - }) + })) return func() { uncordonNode(t, ctx, client, name) } @@ -708,13 +708,13 @@ func cordonNode(t testing.TB, ctx context.Context, client client.Client, name st func uncordonNode(t testing.TB, ctx context.Context, client client.Client, name string) { node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} - retry.RetryOnConflict(retry.DefaultRetry, func() error { + assert.NoError(t, retry.RetryOnConflict(retry.DefaultRetry, func() error { if err := client.Get(ctx, objectName(node), node); err != nil { return err } node.Spec.Unschedulable = false return client.Update(ctx, node) - }) + })) } func cordonOtherNodes(t testing.TB, ctx context.Context, client client.Client, name string) (uncordon func()) { @@ -725,9 +725,13 @@ func cordonOtherNodes(t testing.TB, ctx context.Context, client client.Client, n if node.Name == name { continue } - require.NoError(t, client.Get(ctx, objectName(&node), &node)) - node.Spec.Unschedulable = true - require.NoError(t, client.Update(ctx, &node)) + assert.NoError(t, retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := client.Get(ctx, objectName(&node), &node); err != nil { + return err + } + node.Spec.Unschedulable = true + return client.Update(ctx, &node) + })) uncordonFuncs = append(uncordonFuncs, func() { uncordonNode(t, ctx, client, node.Name) })