Skip to content

Commit 257349a

Browse files
authored
Merge pull request #120 from chenchun/fix-api
Fix release IP API race condition
2 parents 170c23f + ad5ceaf commit 257349a

File tree

6 files changed

+120
-137
lines changed

6 files changed

+120
-137
lines changed

pkg/ipam/api/api.go

+57-60
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,31 @@ import (
2525
"time"
2626

2727
"github.com/emicklei/go-restful"
28+
"k8s.io/apimachinery/pkg/api/errors"
2829
"k8s.io/client-go/listers/core/v1"
2930
glog "k8s.io/klog"
3031
"tkestack.io/galaxy/pkg/api/galaxy/constant"
3132
"tkestack.io/galaxy/pkg/ipam/floatingip"
33+
"tkestack.io/galaxy/pkg/ipam/schedulerplugin"
3234
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
3335
"tkestack.io/galaxy/pkg/utils/httputil"
3436
pageutil "tkestack.io/galaxy/pkg/utils/page"
3537
)
3638

3739
// Controller is the API controller
3840
type Controller struct {
39-
ipam floatingip.IPAM
40-
podLister v1.PodLister
41+
ipam floatingip.IPAM
42+
releaseFunc func(r *schedulerplugin.ReleaseRequest) error
43+
podLister v1.PodLister
4144
}
4245

4346
// NewController construct a controller object
44-
func NewController(ipam floatingip.IPAM, lister v1.PodLister) *Controller {
47+
func NewController(
48+
ipam floatingip.IPAM, lister v1.PodLister, releaseFunc func(r *schedulerplugin.ReleaseRequest) error) *Controller {
4549
return &Controller{
46-
ipam: ipam,
47-
podLister: lister,
50+
ipam: ipam,
51+
podLister: lister,
52+
releaseFunc: releaseFunc,
4853
}
4954
}
5055

@@ -118,37 +123,35 @@ func (c *Controller) ListIPs(req *restful.Request, resp *restful.Response) {
118123
sort.Sort(bySortParam{array: fips, lessFunc: sortFunc(sortParam)})
119124
start, end, pagin := pageutil.Pagination(page, size, len(fips))
120125
pagedFips := fips[start:end]
121-
if err := fillReleasableAndStatus(c.podLister, pagedFips); err != nil {
122-
httputil.InternalError(resp, err)
123-
return
126+
for i := range pagedFips {
127+
releasable, status := c.checkReleasableAndStatus(&pagedFips[i])
128+
pagedFips[i].Status = status
129+
pagedFips[i].Releasable = releasable
124130
}
125131
resp.WriteEntity(ListIPResp{Page: *pagin, Content: pagedFips}) // nolint: errcheck
126132
}
127133

128-
// fillReleasableAndStatus fills status and releasable field
129-
func fillReleasableAndStatus(lister v1.PodLister, ips []FloatingIP) error {
130-
for i := range ips {
131-
if ips[i].labels != nil {
132-
if _, ok := ips[i].labels[constant.ReserveFIPLabel]; ok {
133-
ips[i].Releasable = false
134-
continue
135-
}
136-
}
137-
ips[i].Releasable = true
138-
if ips[i].PodName == "" {
139-
continue
140-
}
141-
pod, err := lister.Pods(ips[i].Namespace).Get(ips[i].PodName)
142-
if err != nil || pod == nil {
143-
ips[i].Status = "Deleted"
144-
continue
134+
func (c *Controller) checkReleasableAndStatus(fip *FloatingIP) (releasable bool, status string) {
135+
if fip.labels != nil {
136+
if _, ok := fip.labels[constant.ReserveFIPLabel]; ok {
137+
return
145138
}
146-
ips[i].Status = string(pod.Status.Phase)
147-
// On public cloud, we can't release exist pod's ip, because we need to call unassign ip first
148-
// TODO while on private environment, we can
149-
ips[i].Releasable = false
150139
}
151-
return nil
140+
if fip.PodName == "" {
141+
return
142+
}
143+
pod, err := c.podLister.Pods(fip.Namespace).Get(fip.PodName)
144+
if err == nil {
145+
status = string(pod.Status.Phase)
146+
return
147+
}
148+
if errors.IsNotFound(err) {
149+
releasable = true
150+
status = "Deleted"
151+
} else {
152+
status = "Unknown"
153+
}
154+
return
152155
}
153156

154157
// bySortParam defines sort funcs for FloatingIP array
@@ -225,6 +228,8 @@ type ReleaseIPReq struct {
225228
type ReleaseIPResp struct {
226229
httputil.Resp
227230
Unreleased []string `json:"unreleased,omitempty"`
231+
// Reason is the reason why this ip is not released
232+
Reason []string `json:"reasons,omitempty"`
228233
}
229234

230235
// SwaggerDoc generates swagger doc for release ip response
@@ -242,7 +247,10 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) {
242247
httputil.BadRequest(resp, err)
243248
return
244249
}
245-
expectIPtoKey := make(map[string]string)
250+
var (
251+
released, unreleasedIP, reasons []string
252+
unbindRequests []*schedulerplugin.ReleaseRequest
253+
)
246254
for i := range releaseIPReq.IPs {
247255
temp := releaseIPReq.IPs[i]
248256
ip := net.ParseIP(temp.IP)
@@ -259,36 +267,34 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) {
259267
httputil.BadRequest(resp, fmt.Errorf("unknown app type %q", temp.AppType))
260268
return
261269
}
270+
releasable, status := c.checkReleasableAndStatus(&temp)
271+
if !releasable {
272+
unreleasedIP = append(unreleasedIP, temp.IP)
273+
reasons = append(reasons, "releasable is false, pod status "+status)
274+
continue
275+
}
262276
keyObj := util.NewKeyObj(appTypePrefix, temp.Namespace, temp.AppName, temp.PodName, temp.PoolName)
263-
expectIPtoKey[temp.IP] = keyObj.KeyInDB
277+
unbindRequests = append(unbindRequests, &schedulerplugin.ReleaseRequest{IP: ip, KeyObj: keyObj})
264278
}
265-
if err := fillReleasableAndStatus(c.podLister, releaseIPReq.IPs); err != nil {
266-
httputil.BadRequest(resp, err)
267-
return
268-
}
269-
for _, ip := range releaseIPReq.IPs {
270-
if !ip.Releasable {
271-
httputil.BadRequest(resp, fmt.Errorf("%s is not releasable", ip.IP))
272-
return
279+
for _, req := range unbindRequests {
280+
if err := c.releaseFunc(req); err != nil {
281+
unreleasedIP = append(unreleasedIP, req.IP.String())
282+
reasons = append(reasons, err.Error())
283+
} else {
284+
released = append(released, req.IP.String())
273285
}
274286
}
275-
_, unreleased, err := batchReleaseIPs(expectIPtoKey, c.ipam)
276-
var unreleasedIP []string
277-
for ip := range unreleased {
278-
unreleasedIP = append(unreleasedIP, ip)
279-
}
287+
glog.Infof("releaseIPs %v", released)
280288
var res *ReleaseIPResp
281-
if err != nil {
282-
res = &ReleaseIPResp{Resp: httputil.NewResp(
283-
http.StatusInternalServerError, fmt.Sprintf("server error: %v", err))}
284-
} else if len(unreleasedIP) > 0 {
289+
if len(unreleasedIP) > 0 {
285290
res = &ReleaseIPResp{Resp: httputil.NewResp(
286-
http.StatusAccepted, fmt.Sprintf("Unreleased ips have been released or allocated to other pods, "+
287-
"or are not within valid range"))}
291+
http.StatusAccepted, fmt.Sprintf("Released %d ips, %d ips failed, please check the reasons "+
292+
"why they failed", len(released), len(unreleasedIP)))}
288293
} else {
289294
res = &ReleaseIPResp{Resp: httputil.NewResp(http.StatusOK, "")}
290295
}
291296
res.Unreleased = unreleasedIP
297+
res.Reason = reasons
292298
resp.WriteHeaderAndEntity(res.Code, res)
293299
}
294300

@@ -328,12 +334,3 @@ func convert(fip *floatingip.FloatingIP) FloatingIP {
328334
UpdateTime: fip.UpdatedAt,
329335
labels: fip.Labels}
330336
}
331-
332-
// batchReleaseIPs release ips from ipams
333-
func batchReleaseIPs(ipToKey map[string]string, ipam floatingip.IPAM) (map[string]string, map[string]string, error) {
334-
released, unreleased, err := ipam.ReleaseIPs(ipToKey)
335-
if len(released) > 0 {
336-
glog.Infof("releaseIPs %v", released)
337-
}
338-
return released, unreleased, err
339-
}

pkg/ipam/api/api_test.go

-69
This file was deleted.

pkg/ipam/schedulerplugin/bind.go

+44
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,47 @@ func (p *FloatingIPPlugin) unbind(pod *corev1.Pod) error {
200200
}
201201
return p.unbindNoneDpPod(keyObj, policy, "during unbinding pod")
202202
}
203+
204+
func (p *FloatingIPPlugin) Release(r *ReleaseRequest) error {
205+
caller := "by " + getCaller()
206+
k := r.KeyObj
207+
defer p.lockPod(k.PodName, k.Namespace)()
208+
// we are holding the pod's lock, query again in case the ip has been reallocated.
209+
fip, err := p.ipam.ByIP(r.IP)
210+
if err != nil {
211+
return err
212+
}
213+
if fip.Key != k.KeyInDB {
214+
// if key changed, abort
215+
if fip.Key == "" {
216+
glog.Infof("attempt to release %s key %s which is already released", r.IP.String(), k.KeyInDB)
217+
return nil
218+
}
219+
return fmt.Errorf("ip allocated to another pod %s", fip.Key)
220+
}
221+
running, reason := p.podRunning(k.PodName, k.Namespace, fip.PodUid)
222+
if running {
223+
return fmt.Errorf("pod (uid %s) is running", fip.PodUid)
224+
}
225+
glog.Infof("%s is not running, %s, %s", k.KeyInDB, reason, caller)
226+
if p.cloudProvider != nil && fip.NodeName != "" {
227+
// For tapp and sts pod, nodeName will be updated to empty after unassigning
228+
glog.Infof("UnAssignIP nodeName %s, ip %s, key %s %s", fip.NodeName, r.IP.String(), k.KeyInDB, caller)
229+
if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{
230+
NodeName: fip.NodeName,
231+
IPAddress: fip.IP.String(),
232+
}); err != nil {
233+
return fmt.Errorf("UnAssignIP nodeName %s, ip %s: %v", fip.NodeName, fip.IP.String(), err)
234+
}
235+
// for tapp and sts pod, we need to clean its node attr and uid
236+
if err := p.reserveIP(k.KeyInDB, k.KeyInDB, "after UnAssignIP "+caller); err != nil {
237+
return err
238+
}
239+
}
240+
if err := p.ipam.Release(k.KeyInDB, r.IP); err != nil {
241+
glog.Errorf("release ip %s: %v", caller, err)
242+
return fmt.Errorf("release ip: %v", err)
243+
}
244+
glog.Infof("released floating ip %s from %s %s", r.IP.String(), k.KeyInDB, caller)
245+
return nil
246+
}

pkg/ipam/schedulerplugin/floatingip_plugin.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,8 @@ func (p *FloatingIPPlugin) lockPod(name, namespace string) func() {
235235
p.podLockPool.LockKey(key)
236236
elapsed := (time.Now().UnixNano() - start.UnixNano()) / 1e6
237237
if elapsed > 500 {
238-
var caller string
239-
pc, _, no, ok := runtime.Caller(1)
240-
details := runtime.FuncForPC(pc)
241-
if ok && details != nil {
242-
caller = fmt.Sprintf("called from %s:%d\n", details.Name(), no)
243-
}
244238
glog.Infof("acquire lock for %s took %d ms, started at %s, %s", key, elapsed,
245-
start.Format("15:04:05.000"), caller)
239+
start.Format("15:04:05.000"), getCaller())
246240
}
247241
return func() {
248242
_ = p.podLockPool.UnlockKey(key)
@@ -283,3 +277,13 @@ func (p *FloatingIPPlugin) supportReserveIPPolicy(obj *util.KeyObj, policy const
283277
}
284278
return nil
285279
}
280+
281+
// getCaller returns the func packageName.funcName of the caller
282+
func getCaller() string {
283+
pc, _, no, ok := runtime.Caller(2)
284+
details := runtime.FuncForPC(pc)
285+
if ok && details != nil {
286+
return fmt.Sprintf("called from %s:%d\n", details.Name(), no)
287+
}
288+
return ""
289+
}

pkg/ipam/schedulerplugin/types.go

+7
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package schedulerplugin
1818

1919
import (
2020
"errors"
21+
"net"
2122

2223
"tkestack.io/galaxy/pkg/ipam/floatingip"
24+
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
2325
)
2426

2527
type NotSupportedReleasePolicyError error
@@ -60,3 +62,8 @@ func (conf *Conf) validate() {
6062
conf.FloatingIPKey = "floatingips"
6163
}
6264
}
65+
66+
type ReleaseRequest struct {
67+
KeyObj *util.KeyObj
68+
IP net.IP
69+
}

pkg/ipam/server/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (s *Server) startAPIServer() {
236236
Path("/v1").
237237
Consumes(restful.MIME_JSON).
238238
Produces(restful.MIME_JSON)
239-
c := api.NewController(s.plugin.GetIpam(), s.PodLister)
239+
c := api.NewController(s.plugin.GetIpam(), s.PodLister, s.plugin.Release)
240240
ws.Route(ws.GET("/ip").To(c.ListIPs).
241241
Doc("List ips by keyword or params").
242242
Param(ws.QueryParameter("keyword", "keyword").DataType("string")).

0 commit comments

Comments
 (0)