Skip to content

Commit f051cb7

Browse files
author
k8s-merge-robot
committed
Merge pull request kubernetes#14260 from mesosphere/sttts-kubelet-capacity
Auto commit by PR queue bot
2 parents 834ce9d + 1435077 commit f051cb7

File tree

5 files changed

+146
-12
lines changed

5 files changed

+146
-12
lines changed

cluster/mesos/docker/docker-compose.yml

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mesosslave:
5050
- MESOS_RESOURCES=cpus:4;mem:1280;disk:25600;ports:[8000-21099]
5151
- MESOS_SWITCH_USER=0
5252
- MESOS_CONTAINERIZERS=docker,mesos
53+
- MESOS_ISOLATION=cgroups/cpu,cgroups/mem
5354
- DOCKER_DAEMON_ARGS
5455
links:
5556
- etcd
@@ -144,6 +145,7 @@ scheduler:
144145
--mesos-master=mesosmaster1:5050
145146
--cluster-dns=10.10.10.10
146147
--cluster-domain=cluster.local
148+
--mesos-executor-cpus=1.0
147149
--v=4
148150
environment:
149151
- MESOS_DOCKER_ETCD_TIMEOUT

contrib/mesos/pkg/executor/executor.go

+61
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ type kuberTask struct {
9292

9393
type podStatusFunc func() (*api.PodStatus, error)
9494

95+
type NodeInfo struct {
96+
Cores int
97+
Mem int64 // in bytes
98+
}
99+
95100
// KubernetesExecutor is an mesos executor that runs pods
96101
// in a minion machine.
97102
type KubernetesExecutor struct {
@@ -113,6 +118,7 @@ type KubernetesExecutor struct {
113118
staticPodsConfigPath string
114119
podController *framework.Controller
115120
launchGracePeriod time.Duration
121+
nodeInfos chan<- NodeInfo
116122
}
117123

118124
type Config struct {
@@ -127,6 +133,7 @@ type Config struct {
127133
StaticPodsConfigPath string
128134
PodLW cache.ListerWatcher // mandatory, otherwise initialiation will panic
129135
LaunchGracePeriod time.Duration
136+
NodeInfos chan<- NodeInfo
130137
}
131138

132139
func (k *KubernetesExecutor) isConnected() bool {
@@ -152,6 +159,7 @@ func New(config Config) *KubernetesExecutor {
152159
podStatusFunc: config.PodStatusFunc,
153160
staticPodsConfigPath: config.StaticPodsConfigPath,
154161
launchGracePeriod: config.LaunchGracePeriod,
162+
nodeInfos: config.NodeInfos,
155163
}
156164

157165
// watch pods from the given pod ListWatch
@@ -236,6 +244,10 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
236244
Pods: []*api.Pod{},
237245
Op: kubetypes.SET,
238246
})
247+
248+
if slaveInfo != nil && k.nodeInfos != nil {
249+
k.nodeInfos <- nodeInfo(slaveInfo, executorInfo) // leave it behind the upper lock to avoid panics
250+
}
239251
}
240252

241253
// Reregistered is called when the executor is successfully re-registered with the slave.
@@ -255,6 +267,16 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
255267
log.Errorf("cannot update node labels: %v", err)
256268
}
257269
}
270+
271+
if slaveInfo != nil && k.nodeInfos != nil {
272+
// make sure nodeInfos is not nil and send new NodeInfo
273+
k.lock.Lock()
274+
defer k.lock.Unlock()
275+
if k.isDone() {
276+
return
277+
}
278+
k.nodeInfos <- nodeInfo(slaveInfo, nil)
279+
}
258280
}
259281

260282
// initializeStaticPodsSource unzips the data slice into the static-pods directory
@@ -796,6 +818,7 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) {
796818
// signal to all listeners that this KubeletExecutor is done!
797819
close(k.terminate)
798820
close(k.updateChan)
821+
close(k.nodeInfos)
799822

800823
if k.shutdownAlert != nil {
801824
func() {
@@ -926,3 +949,41 @@ func differentTime(a, b *unversionedapi.Time) bool {
926949
func differentPeriod(a, b *int64) bool {
927950
return (a == nil) != (b == nil) || (a != nil && b != nil && *a != *b)
928951
}
952+
953+
func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
954+
var executorCPU, executorMem float64
955+
956+
// get executor resources
957+
if ei != nil {
958+
for _, r := range ei.GetResources() {
959+
if r == nil || r.GetType() != mesos.Value_SCALAR {
960+
continue
961+
}
962+
switch r.GetName() {
963+
case "cpus":
964+
executorCPU = r.GetScalar().GetValue()
965+
case "mem":
966+
executorMem = r.GetScalar().GetValue()
967+
}
968+
}
969+
}
970+
971+
// get resource capacity of the node
972+
ni := NodeInfo{}
973+
for _, r := range si.GetResources() {
974+
if r == nil || r.GetType() != mesos.Value_SCALAR {
975+
continue
976+
}
977+
978+
switch r.GetName() {
979+
case "cpus":
980+
// We intentionally take the floor of executorCPU because cores are integers
981+
// and we would loose a complete cpu here if the value is <1.
982+
// TODO(sttts): switch to float64 when "Machine Allocables" are implemented
983+
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU)))
984+
case "mem":
985+
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024
986+
}
987+
}
988+
return ni
989+
}

contrib/mesos/pkg/executor/executor_test.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
135135
mockDriver := &MockExecutorDriver{}
136136
updates := make(chan kubetypes.PodUpdate, 1024)
137137
config := Config{
138-
Docker: dockertools.ConnectToDockerOrDie("fake://"),
139-
Updates: updates,
138+
Docker: dockertools.ConnectToDockerOrDie("fake://"),
139+
Updates: updates,
140+
NodeInfos: make(chan NodeInfo, 1),
140141
APIClient: client.NewOrDie(&client.Config{
141142
Host: testApiServer.server.URL,
142143
Version: testapi.Default.Version(),
@@ -297,8 +298,9 @@ func TestExecutorStaticPods(t *testing.T) {
297298

298299
mockDriver := &MockExecutorDriver{}
299300
config := Config{
300-
Docker: dockertools.ConnectToDockerOrDie("fake://"),
301-
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
301+
Docker: dockertools.ConnectToDockerOrDie("fake://"),
302+
Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
303+
NodeInfos: make(chan NodeInfo, 1),
302304
APIClient: client.NewOrDie(&client.Config{
303305
Host: testApiServer.server.URL,
304306
Version: testapi.Default.Version(),
@@ -379,8 +381,9 @@ func TestExecutorFrameworkMessage(t *testing.T) {
379381
mockDriver := &MockExecutorDriver{}
380382
kubeletFinished := make(chan struct{})
381383
config := Config{
382-
Docker: dockertools.ConnectToDockerOrDie("fake://"),
383-
Updates: make(chan kubetypes.PodUpdate, 1024),
384+
Docker: dockertools.ConnectToDockerOrDie("fake://"),
385+
Updates: make(chan kubetypes.PodUpdate, 1024),
386+
NodeInfos: make(chan NodeInfo, 1),
384387
APIClient: client.NewOrDie(&client.Config{
385388
Host: testApiServer.server.URL,
386389
Version: testapi.Default.Version(),
@@ -558,8 +561,9 @@ func TestExecutorShutdown(t *testing.T) {
558561
var exitCalled int32 = 0
559562
updates := make(chan kubetypes.PodUpdate, 1024)
560563
config := Config{
561-
Docker: dockertools.ConnectToDockerOrDie("fake://"),
562-
Updates: updates,
564+
Docker: dockertools.ConnectToDockerOrDie("fake://"),
565+
Updates: updates,
566+
NodeInfos: make(chan NodeInfo, 1),
563567
ShutdownAlert: func() {
564568
close(kubeletFinished)
565569
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2015 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package service
18+
19+
import (
20+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
21+
22+
cadvisorApi "github.com/google/cadvisor/info/v1"
23+
)
24+
25+
type MesosCadvisor struct {
26+
cadvisor.Interface
27+
cores int
28+
mem int64
29+
}
30+
31+
func NewMesosCadvisor(cores int, mem int64, port uint) (*MesosCadvisor, error) {
32+
c, err := cadvisor.New(port)
33+
if err != nil {
34+
return nil, err
35+
}
36+
return &MesosCadvisor{c, cores, mem}, nil
37+
}
38+
39+
func (mc *MesosCadvisor) MachineInfo() (*cadvisorApi.MachineInfo, error) {
40+
mi, err := mc.Interface.MachineInfo()
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
// set Mesos provided values
46+
mesosMi := *mi
47+
mesosMi.NumCores = mc.cores
48+
mesosMi.MemoryCapacity = mc.mem
49+
50+
return &mesosMi, nil
51+
}

contrib/mesos/pkg/executor/service/service.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
7878
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
7979
}
8080

81-
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{},
81+
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, nodeInfos chan<- executor.NodeInfo, kubeletFinished <-chan struct{},
8282
staticPodsConfigPath string, apiclient *client.Client) error {
8383
exec := executor.New(executor.Config{
8484
Updates: execUpdates,
@@ -113,6 +113,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
113113
PodLW: cache.NewListWatchFromClient(apiclient, "pods", api.NamespaceAll,
114114
fields.OneTermEqualSelector(client.PodHost, s.HostnameOverride),
115115
),
116+
NodeInfos: nodeInfos,
116117
})
117118

118119
// initialize driver and initialize the executor with it
@@ -139,7 +140,7 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
139140
return nil
140141
}
141142

142-
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletDone chan<- struct{},
143+
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, nodeInfos <-chan executor.NodeInfo, kubeletDone chan<- struct{},
143144
staticPodsConfigPath string, apiclient *client.Client) error {
144145
kcfg, err := s.UnsecuredKubeletConfig()
145146
if err == nil {
@@ -179,6 +180,20 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
179180
panic("cloud provider must not be set")
180181
}
181182

183+
// create custom cAdvisor interface which return the resource values that Mesos reports
184+
ni := <-nodeInfos
185+
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, s.CAdvisorPort)
186+
if err != nil {
187+
return err
188+
}
189+
kcfg.CAdvisorInterface = cAdvisorInterface
190+
go func() {
191+
for ni := range nodeInfos {
192+
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished
193+
log.V(3).Infof("ignoring updated node resources: %v", ni)
194+
}
195+
}()
196+
182197
// create main pod source
183198
updates := kcfg.PodConfig.Channel(MESOS_CFG_SOURCE)
184199
go func() {
@@ -217,6 +232,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
217232
// create shared channels
218233
kubeletFinished := make(chan struct{})
219234
execUpdates := make(chan kubetypes.PodUpdate, 1)
235+
nodeInfos := make(chan executor.NodeInfo, 1)
220236

221237
// create static pods directory
222238
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
@@ -237,13 +253,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
237253
}
238254

239255
// start executor
240-
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
256+
err = s.runExecutor(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
241257
if err != nil {
242258
return err
243259
}
244260

245261
// start kubelet, blocking
246-
return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
262+
return s.runKubelet(execUpdates, nodeInfos, kubeletFinished, staticPodsConfigPath, apiclient)
247263
}
248264

249265
func defaultBindingAddress() string {

0 commit comments

Comments
 (0)