Skip to content

Commit

Permalink
datapath,endpoint: explicitly remove TC filters during endpoint teardown
Browse files Browse the repository at this point in the history
[ upstream commit 6633ca8 ]
[ backporter notes: Fixed conflicts in pkg/endpoint/endpoint.go due to
  the fact that v1.15 uses `!option.Config.DryMode` rather than
  `!e.isProperty(PropertyFakeEndpoint)` ]

Prior to this commit, we left it up to the kernel to clean up tc attachments
when the CNI finally removes the veth when a Pod goes away. This leaves a window
of time where an endpoint's tc programs can potentially be invoked after
the endpoint's internal tail call maps have already been cleared and the
endpoint has been removed from the endpoint map and ipcache, resulting in
undefined behaviour.

This patch clearly defines the endpoint teardown sequence as follows:
- remove (endpoint) routes
- set the interface down
- detach tc(x) hooks
- remove endpoint from endpoint map
- remove endpoint policy program(s)
- delete conntrack map pins
- remove policy prog array map pin
- remove internal tail call map pin
- remove custom calls map pin

This puts the agent more in control of the teardown sequence and will allow us
to reason better about failures related to missed tail calls and other flakes.

Signed-off-by: Timo Beckers <[email protected]>
Signed-off-by: Sebastian Wicki <[email protected]>
  • Loading branch information
ti-mo authored and gandro committed Nov 7, 2024
1 parent 78684dc commit a6fab19
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 48 deletions.
26 changes: 21 additions & 5 deletions pkg/datapath/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,29 @@ func (l *Loader) Unload(ep datapath.Endpoint) {
}
}

log := log.WithField(logfields.EndpointID, ep.StringID())

// Remove legacy tc attachments.
if err := removeTCFilters(ep.InterfaceName(), netlink.HANDLE_MIN_INGRESS); err != nil {
log.WithError(err).Errorf("Removing ingress filter from interface %s", ep.InterfaceName())
}
if err := removeTCFilters(ep.InterfaceName(), netlink.HANDLE_MIN_EGRESS); err != nil {
log.WithError(err).Errorf("Removing egress filter from interface %s", ep.InterfaceName())
}

// If Cilium and the kernel support tcx to attach TC programs to the
// endpoint's veth device, its bpf_link object is pinned to a per-endpoint
// bpffs directory. When the endpoint gets deleted, removing the whole
// directory cleans up any pinned maps and links.
bpffsPath := bpffsEndpointDir(bpf.CiliumPath(), ep)
if err := bpf.Remove(bpffsPath); err != nil {
log.WithError(err).WithField(logfields.EndpointID, ep.StringID())
// bpffs directory. When the endpoint gets deleted, we can remove the whole
// directory to clean up any leftover pinned links and maps.

// Remove the links directory first to avoid removing program arrays before
// the entrypoints are detached.
if err := bpf.Remove(bpffsEndpointLinksDir(bpf.CiliumPath(), ep)); err != nil {
log.WithError(err)
}
// Finally, remove the endpoint's top-level directory.
if err := bpf.Remove(bpffsEndpointDir(bpf.CiliumPath(), ep)); err != nil {
log.WithError(err)
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/datapath/loader/netlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func attachTCProgram(link netlink.Link, prog *ebpf.Program, progName, bpffsDir s
// Direction is passed as netlink.HANDLE_MIN_{INGRESS,EGRESS} via tcDir.
func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if errors.As(err, &netlink.LinkNotFoundError{}) {
// No interface, no filters to remove.
return nil
}
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/datapath/loader/netlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ func TestAttachRemoveTCProgram(t *testing.T) {
})
}

func TestRemoveTCFiltersError(t *testing.T) {
testutils.PrivilegedTest(t)

require.NoError(t, removeTCFilters("missing", directionToParent(dirIngress)))
}

func TestSetupIPIPDevices(t *testing.T) {
testutils.PrivilegedTest(t)

Expand Down
66 changes: 39 additions & 27 deletions pkg/endpoint/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,51 +964,63 @@ func (e *Endpoint) InitMap() error {
return policymap.Create(e.policyMapPath())
}

// deleteMaps releases references to all BPF maps associated with this
// endpoint.
//
// For each error that occurs while releasing these references, an error is
// added to the resulting error slice which is returned.
// deleteMaps deletes the endpoint's entry from the global
// cilium_(egress)call_policy maps and removes endpoint-specific cilium_calls_,
// cilium_policy_ and cilium_ct{4,6}_ map pins.
//
// Returns nil on success.
// Call this after the endpoint's tc hook has been detached.
func (e *Endpoint) deleteMaps() []error {
var errors []error

maps := map[string]string{
"policy": e.policyMapPath(),
"calls": e.callsMapPath(),
// Remove the endpoint from cilium_lxc. After this point, ip->epID lookups
// will fail, causing packets to/from the Pod to be dropped in many cases,
// stopping packet evaluation.
if err := lxcmap.DeleteElement(e); err != nil {
errors = append(errors, err...)
}
if !e.isHost {
maps["custom"] = e.customCallsMapPath()

// Remove the policy tail call entry for the endpoint. This will disable
// policy evaluation for the endpoint and will result in missing tail calls if
// e.g. bpf_host or bpf_overlay call into the endpoint's policy program.
if err := policymap.RemoveGlobalMapping(uint32(e.ID), option.Config.EnableEnvoyConfig); err != nil {
errors = append(errors, fmt.Errorf("removing endpoint program from global policy map: %w", err))
}
for name, path := range maps {
if err := os.RemoveAll(path); err != nil {
errors = append(errors, fmt.Errorf("unable to remove %s map file %s: %w", name, path, err))

// Remove rate limit from bandwidth manager map.
if e.bps != 0 {
if err := e.owner.Datapath().BandwidthManager().DeleteEndpointBandwidthLimit(e.ID); err != nil {
errors = append(errors, fmt.Errorf("removing endpoint from bandwidth manager map: %w", err))
}
}

if e.ConntrackLocalLocked() {
// Remove local connection tracking maps
// Remove endpoint-specific CT map pins.
for _, m := range ctmap.LocalMaps(e, option.Config.EnableIPv4, option.Config.EnableIPv6) {
ctPath, err := m.Path()
if err == nil {
err = os.RemoveAll(ctPath)
}
if err != nil {
errors = append(errors, fmt.Errorf("unable to remove CT map %s: %w", ctPath, err))
errors = append(errors, fmt.Errorf("getting path for CT map pin %s: %w", m.Name(), err))
continue
}
if err := os.RemoveAll(ctPath); err != nil {
errors = append(errors, fmt.Errorf("removing CT map pin %s: %w", ctPath, err))
}
}
}

// Remove handle_policy() tail call entry for EP
if err := policymap.RemoveGlobalMapping(uint32(e.ID), option.Config.EnableEnvoyConfig); err != nil {
errors = append(errors, fmt.Errorf("unable to remove endpoint from global policy map: %w", err))
// Remove program array pins as the last step. This permanently invalidates
// the endpoint programs' state, because removing a program array map pin
// removes the map's entries even if the map is still referenced by any live
// bpf programs, potentially resulting in missed tail calls if any packets are
// still in flight.
if err := os.RemoveAll(e.policyMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing policy map pin for endpoint %s: %w", e.StringID(), err))
}

// Remove rate-limit from bandwidth manager map.
if e.bps != 0 {
if err := e.owner.Datapath().BandwidthManager().DeleteEndpointBandwidthLimit(e.ID); err != nil {
errors = append(errors, fmt.Errorf("unable to remote endpoint from bandwidth manager map: %w", err))
if err := os.RemoveAll(e.callsMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing calls map pin for endpoint %s: %w", e.StringID(), err))
}
if !e.isHost {
if err := os.RemoveAll(e.customCallsMapPath()); err != nil {
errors = append(errors, fmt.Errorf("removing custom calls map pin for endpoint %s: %w", e.StringID(), err))
}
}

Expand Down
50 changes: 34 additions & 16 deletions pkg/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"

"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
k8sTypes "k8s.io/apimachinery/pkg/types"

"github.com/cilium/cilium/api/v1/models"
Expand Down Expand Up @@ -47,7 +48,6 @@ import (
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/mac"
"github.com/cilium/cilium/pkg/maps/ctmap"
"github.com/cilium/cilium/pkg/maps/lxcmap"
"github.com/cilium/cilium/pkg/maps/policymap"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/monitor/notifications"
Expand Down Expand Up @@ -1223,10 +1223,6 @@ type DeleteConfig struct {
func (e *Endpoint) leaveLocked(proxyWaitGroup *completion.WaitGroup, conf DeleteConfig) []error {
errs := []error{}

if !option.Config.DryMode {
e.owner.Datapath().Loader().Unload(e.createEpInfoCache(""))
}

// Remove policy references from shared policy structures
e.desiredPolicy.Detach()
e.realizedPolicy.Detach()
Expand Down Expand Up @@ -2449,17 +2445,6 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
}
e.setState(StateDisconnecting, "Deleting endpoint")

// If dry mode is enabled, no changes to BPF maps are performed
if !option.Config.DryMode {
if errs2 := lxcmap.DeleteElement(e); errs2 != nil {
errs = append(errs, errs2...)
}

if errs2 := e.deleteMaps(); errs2 != nil {
errs = append(errs, errs2...)
}
}

if option.Config.IPAM == ipamOption.IPAMENI || option.Config.IPAM == ipamOption.IPAMAzure || option.Config.IPAM == ipamOption.IPAMAlibabaCloud {
e.getLogger().WithFields(logrus.Fields{
"ep": e.GetID(),
Expand Down Expand Up @@ -2493,6 +2478,24 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
}
}

// If dry mode is enabled, no changes to system state are made.
if !option.Config.DryMode {
// Set the Endpoint's interface down to prevent it from passing any traffic
// after its tc filters are removed.
if err := e.setDown(); err != nil {
errs = append(errs, err)
}

// Detach the endpoint program from any tc(x) hooks.
e.owner.Datapath().Loader().Unload(e.createEpInfoCache(""))

// Delete the endpoint's entries from the global cilium_(egress)call_policy
// maps and remove per-endpoint cilium_calls_ and cilium_policy_ map pins.
if err := e.deleteMaps(); err != nil {
errs = append(errs, err...)
}
}

completionCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
proxyWaitGroup := completion.NewWaitGroup(completionCtx)

Expand All @@ -2508,6 +2511,21 @@ func (e *Endpoint) Delete(conf DeleteConfig) []error {
return errs
}

// setDown sets the Endpoint's underlying interface down. If the interface
// cannot be retrieved, returns nil.
func (e *Endpoint) setDown() error {
link, err := netlink.LinkByName(e.HostInterface())
if errors.As(err, &netlink.LinkNotFoundError{}) {
// No interface, nothing to do.
return nil
}
if err != nil {
return fmt.Errorf("setting interface %s down: %w", e.HostInterface(), err)
}

return netlink.LinkSetDown(link)
}

// WaitForFirstRegeneration waits for specific conditions before returning:
// * if the endpoint has a sidecar proxy, it waits for the endpoint's BPF
// program to be generated for the first time.
Expand Down

0 comments on commit a6fab19

Please sign in to comment.