Skip to content

Commit 4b416f3

Browse files
authored
do not program status file until endpoint has status "up" (#8692)
1 parent ff1ce46 commit 4b416f3

File tree

3 files changed

+164
-14
lines changed

3 files changed

+164
-14
lines changed

felix/fv/pod_setup_wait_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Pod setup status wait", []a
131131
By("creating a workload before Felix starts")
132132
wl := workload.New(tc.Felixes[0], "workload-endpoint-status-tests-0", "default", "10.65.0.10", "8080", "tcp")
133133
wl.ConfigureInInfra(infra)
134+
err := wl.Start()
135+
Expect(err).NotTo(HaveOccurred(), "Couldn't start a test workload")
134136

135137
By("determining the filename Felix will look for")
136138
wKey, err := names.V3WorkloadEndpointToWorkloadEndpointKey(wl.WorkloadEndpoint)

felix/statusrep/status_file_reporter.go

+63-8
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
)
3333

3434
const (
35-
dirStatus = "endpoint-status"
35+
dirStatus = "endpoint-status"
36+
statusUp = "up"
37+
statusDown = "down"
3638
)
3739

3840
// EndpointStatusFileReporter writes a file to the FS
@@ -55,6 +57,8 @@ type EndpointStatusFileReporter struct {
5557
reapplyInterval time.Duration
5658

5759
hostname string
60+
61+
filesys
5862
}
5963

6064
// Backoff wraps a timer-based-retry type which can be stepped.
@@ -68,6 +72,35 @@ type backoffManager struct {
6872
newBackoffFunc func() Backoff
6973
}
7074

75+
type filesys interface {
76+
Create(name string) (*os.File, error)
77+
Remove(name string) error
78+
Mkdir(name string, perm os.FileMode) error
79+
ReadDir(name string) ([]os.DirEntry, error)
80+
}
81+
82+
type defaultFilesys struct{}
83+
84+
// Create wraps os.Create.
85+
func (fs *defaultFilesys) Create(name string) (*os.File, error) {
86+
return os.Create(name)
87+
}
88+
89+
// Remove wraps os.Remove.
90+
func (fs *defaultFilesys) Remove(name string) error {
91+
return os.Remove(name)
92+
}
93+
94+
// Mkdir wraps os.Mkdir.
95+
func (fs *defaultFilesys) Mkdir(name string, perm os.FileMode) error {
96+
return os.Mkdir(name, perm)
97+
}
98+
99+
// ReadDir wraps os.ReadDir.
100+
func (fs *defaultFilesys) ReadDir(name string) ([]os.DirEntry, error) {
101+
return os.ReadDir(name)
102+
}
103+
71104
// newBackoffManager creates a backoffManager which uses the
72105
// passed func to create backoffs.
73106
func newBackoffManager(newBackoffFunc func() Backoff) backoffManager {
@@ -99,6 +132,7 @@ func NewEndpointStatusFileReporter(
99132
bom: newBackoffManager(newDefaultBackoff),
100133
hostname: "",
101134
reapplyInterval: 10 * time.Second,
135+
filesys: &defaultFilesys{},
102136
}
103137

104138
for _, o := range opts {
@@ -124,6 +158,13 @@ func WithHostname(hostname string) FileReporterOption {
124158
}
125159
}
126160

161+
// WithFilesys allows shimming into filesystem calls.
162+
func WithFilesys(f filesys) FileReporterOption {
163+
return func(fr *EndpointStatusFileReporter) {
164+
fr.filesys = f
165+
}
166+
}
167+
127168
// SyncForever blocks until ctx is cancelled.
128169
// Continuously pulls status-updates from updates C,
129170
// and reconciles the filesystem with internal state.
@@ -245,7 +286,20 @@ func (fr *EndpointStatusFileReporter) handleEndpointUpdate(e interface{}) {
245286
}
246287
key := names.WorkloadEndpointIDToWorkloadEndpointKey(m.Id, fr.hostname)
247288
fn := names.WorkloadEndpointKeyToStatusFilename(key)
248-
fr.statusDirDeltaTracker.Desired().Add(fn)
289+
290+
if m.Status.Status == statusDown {
291+
logrus.WithField("update", e).Debug("Skipping WorkloadEndpointStatusUpdate with down status")
292+
fr.statusDirDeltaTracker.Desired().Delete(fn)
293+
return
294+
} else if m.Status.Status == statusUp {
295+
// Explicitly checking the opposite case here (rather than fallthrough)
296+
// in-case of a terrible failure where status is neither "up" nor "down".
297+
logrus.WithField("update", e).Debug("Handling WorkloadEndpointUpdate with up status")
298+
fr.statusDirDeltaTracker.Desired().Add(fn)
299+
} else {
300+
logrus.WithField("update", e).Warn("Skipping update with unrecognized status")
301+
}
302+
249303
case *proto.WorkloadEndpointStatusRemove:
250304
if m.Id == nil {
251305
logrus.WithField("update", m).Warn("Couldn't handle nil WorkloadEndpointStatusRemove")
@@ -259,10 +313,11 @@ func (fr *EndpointStatusFileReporter) handleEndpointUpdate(e interface{}) {
259313
}
260314
}
261315

316+
// A sub-call of SyncForever.
262317
// Overwrites our user-space representation of the kernel with a fresh snapshot.
263318
func (fr *EndpointStatusFileReporter) resyncDataplaneWithKernel() error {
264319
// Load any pre-existing committed dataplane entries.
265-
entries, err := ensureStatusDir(fr.endpointStatusDirPrefix)
320+
entries, err := fr.ensureStatusDir(fr.endpointStatusDirPrefix)
266321
if err != nil {
267322
return err
268323
}
@@ -310,7 +365,7 @@ func (fr *EndpointStatusFileReporter) writeStatusFile(name string) error {
310365
// Write file to dir.
311366
logrus.WithField("filename", name).Debug("Writing endpoint-status file to status-dir")
312367
filename := filepath.Join(fr.endpointStatusDirPrefix, dirStatus, name)
313-
f, err := os.Create(filename)
368+
f, err := fr.filesys.Create(filename)
314369
if err != nil {
315370
return err
316371
}
@@ -319,19 +374,19 @@ func (fr *EndpointStatusFileReporter) writeStatusFile(name string) error {
319374

320375
func (fr *EndpointStatusFileReporter) deleteStatusFile(name string) error {
321376
filename := filepath.Join(fr.endpointStatusDirPrefix, dirStatus, name)
322-
return os.Remove(filename)
377+
return fr.filesys.Remove(filename)
323378
}
324379

325380
// ensureStatusDir ensures there is a directory named "endpoint-status", within
326381
// the parent dir specified by prefix. Attempts to create the dir if it doesn't exist.
327382
// Returns all entries within the dir if any exist.
328-
func ensureStatusDir(prefix string) (entries []fs.DirEntry, err error) {
383+
func (fr *EndpointStatusFileReporter) ensureStatusDir(prefix string) (entries []fs.DirEntry, err error) {
329384
filename := filepath.Join(prefix, dirStatus)
330385

331-
entries, err = os.ReadDir(filename)
386+
entries, err = fr.filesys.ReadDir(filename)
332387
if err != nil && errors.Is(err, fs.ErrNotExist) {
333388
// Discard ErrNotExist and return the result of attempting to create it.
334-
return entries, os.Mkdir(filename, fs.FileMode(0655))
389+
return entries, fr.filesys.Mkdir(filename, fs.FileMode(0655))
335390
}
336391

337392
return entries, err

felix/statusrep/status_file_reporter_test.go

+99-6
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package statusrep_test
15+
package statusrep
1616

1717
import (
1818
"context"
19+
"os"
20+
"path/filepath"
1921
"time"
2022

2123
. "github.com/onsi/ginkgo"
2224
. "github.com/onsi/gomega"
25+
"github.com/sirupsen/logrus"
2326

2427
"github.com/projectcalico/calico/felix/proto"
25-
"github.com/projectcalico/calico/felix/statusrep"
28+
"github.com/projectcalico/calico/libcalico-go/lib/names"
2629
)
2730

2831
type mockBackoff struct {
@@ -39,7 +42,43 @@ func (b *mockBackoff) Step() time.Duration {
3942
return b.StepDuration
4043
}
4144

45+
type mockFilesys struct {
46+
createCB func(string)
47+
removeCB func(string)
48+
mkdirCB func(name string)
49+
readdirCB func(name string)
50+
}
51+
52+
func (f *mockFilesys) Create(name string) (*os.File, error) {
53+
if f.createCB != nil {
54+
f.createCB(name)
55+
}
56+
return os.Create(name)
57+
}
58+
59+
func (f *mockFilesys) Remove(name string) error {
60+
if f.removeCB != nil {
61+
f.removeCB(name)
62+
}
63+
return os.Remove(name)
64+
}
65+
66+
func (f *mockFilesys) Mkdir(name string, perm os.FileMode) error {
67+
if f.mkdirCB != nil {
68+
f.mkdirCB(name)
69+
}
70+
return os.Mkdir(name, perm)
71+
}
72+
73+
func (f *mockFilesys) ReadDir(name string) ([]os.DirEntry, error) {
74+
if f.readdirCB != nil {
75+
f.readdirCB(name)
76+
}
77+
return os.ReadDir(name)
78+
}
79+
4280
var _ = Describe("Endpoint Policy Status Reports [file-reporting]", func() {
81+
logrus.SetLevel(logrus.DebugLevel)
4382
var endpointUpdatesC chan interface{}
4483
var ctx context.Context
4584
var cancel context.CancelFunc
@@ -56,18 +95,24 @@ var _ = Describe("Endpoint Policy Status Reports [file-reporting]", func() {
5695
})
5796

5897
It("should create a new directory and retry if it fails", func() {
59-
var fileReporter *statusrep.EndpointStatusFileReporter
98+
var fileReporter *EndpointStatusFileReporter
6099

61100
backoffCalledC := make(chan struct{})
62-
newMockBackoff := func() statusrep.Backoff {
101+
newMockBackoff := func() Backoff {
63102
return &mockBackoff{
64103
C: backoffCalledC,
65-
StepDuration: 1 * time.Nanosecond,
104+
StepDuration: 1 * time.Second,
66105
}
67106
}
107+
readdirC := make(chan string, 100)
108+
mockfs := mockFilesys{
109+
readdirCB: func(name string) {
110+
readdirC <- name
111+
},
112+
}
68113

69114
// Use a path we think the reporter cannot write to.
70-
fileReporter = statusrep.NewEndpointStatusFileReporter(endpointUpdatesC, "/root/", statusrep.WithNewBackoffFunc(newMockBackoff))
115+
fileReporter = NewEndpointStatusFileReporter(endpointUpdatesC, "/root/", WithNewBackoffFunc(newMockBackoff), WithFilesys(&mockfs))
71116

72117
By("Starting a fileReporter which cannot create the necessary directory")
73118

@@ -79,6 +124,54 @@ var _ = Describe("Endpoint Policy Status Reports [file-reporting]", func() {
79124
}()
80125

81126
Eventually(endpointUpdatesC, "10s").Should(BeSent(&proto.DataplaneInSync{}))
127+
128+
Eventually(readdirC, "10s").Should(Receive(Equal("/root/endpoint-status")), "Expected reporter to try reading a forbidden directory")
82129
Eventually(backoffCalledC, "10s").Should(BeClosed(), "Backoff wasn't called by the reporter (is the file-reporting unexpectedly succeeding?).")
83130
})
131+
132+
It("should only add a desired file to the delta-tracker when update has status up", func() {
133+
fileCreatedC := make(chan string, 100)
134+
mockfs := mockFilesys{
135+
createCB: func(name string) {
136+
fileCreatedC <- name
137+
},
138+
}
139+
reporter := NewEndpointStatusFileReporter(endpointUpdatesC, "/tmp", WithHostname("host"), WithFilesys(&mockfs))
140+
141+
go func() {
142+
defer close(doneC)
143+
reporter.SyncForever(ctx)
144+
}()
145+
146+
By("Sending a status-down update to the reporter")
147+
148+
wepID := &proto.WorkloadEndpointID{
149+
OrchestratorId: "abc",
150+
WorkloadId: "default/pod1",
151+
EndpointId: "eth0",
152+
}
153+
key := names.WorkloadEndpointIDToWorkloadEndpointKey(wepID, "host")
154+
mapKey := names.WorkloadEndpointKeyToStatusFilename(key)
155+
filename := filepath.Join("/tmp/endpoint-status", mapKey)
156+
157+
Eventually(endpointUpdatesC, "10s").Should(BeSent(&proto.WorkloadEndpointStatusUpdate{
158+
Id: wepID,
159+
Status: &proto.EndpointStatus{
160+
Status: "down",
161+
},
162+
}))
163+
Eventually(endpointUpdatesC, "10s").Should(BeSent(&proto.DataplaneInSync{}))
164+
165+
Consistently(fileCreatedC, "3s").ShouldNot(Receive(), "Tracker wrote a file for an endpoint before status was up")
166+
167+
By("Sending a status-up update to the reporter")
168+
Eventually(endpointUpdatesC, "10s").Should(BeSent(&proto.WorkloadEndpointStatusUpdate{
169+
Id: wepID,
170+
Status: &proto.EndpointStatus{
171+
Status: "up",
172+
},
173+
}))
174+
175+
Eventually(fileCreatedC, "10s").Should(Receive(Equal(filename)), "Tracker did not add desired file for endpoint with status up")
176+
})
84177
})

0 commit comments

Comments
 (0)