Skip to content

Commit f877bb3

Browse files
author
James DeFelice
committed
fixes #92: properly allocate libprocess port for executor driver; fix resource allocation to take into account executor resources; executor should attempt to use LIBPROCESS_IP as binding address for compat on multihomed hosts
1 parent 1b83f80 commit f877bb3

File tree

2 files changed

+66
-26
lines changed

2 files changed

+66
-26
lines changed

cmd/etcd-mesos-executor/app.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package main
2020

2121
import (
2222
"flag"
23+
"net"
24+
"os"
2325
"time"
2426

2527
log "github.com/golang/glog"
@@ -29,15 +31,29 @@ import (
2931
)
3032

3133
func main() {
32-
launchTimeout :=
33-
flag.Uint("launch-timeout", 240,
34+
var (
35+
launchTimeout = flag.Uint("launch-timeout", 240,
3436
"Seconds to retry launching an etcd instance for before giving up. "+
3537
"This should be long enough for a port occupied by a killed process "+
3638
"to be vacated.")
39+
driverPort = flag.Uint("driver-port", 0, "Libprocess port for the executor driver")
40+
)
3741
flag.Parse()
42+
if *driverPort == 0 {
43+
log.Fatal("missing or incorrectly specified driver-port flag, must be > 0")
44+
}
3845
log.Infoln("Starting etcd Executor")
3946

47+
var address net.IP
48+
if libprocessIP := os.Getenv("LIBPROCESS_IP"); libprocessIP != "" {
49+
address = net.ParseIP(libprocessIP)
50+
if address == nil {
51+
log.Warning("failed to parse IP address from LIBPROCESS_IP envvar %q", libprocessIP)
52+
}
53+
}
4054
dconfig := executor.DriverConfig{
55+
BindingAddress: address,
56+
BindingPort: uint16(*driverPort),
4157
Executor: etcdexecutor.New(
4258
time.Duration(*launchTimeout) * time.Second,
4359
),

scheduler/scheduler.go

+48-24
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,14 @@ import (
4545
)
4646

4747
const (
48+
// portsPerTask: rpcPort, clientPort, httpPort
4849
portsPerTask = 3
4950
notReseeding = 0
5051
reseedUnderway = 1
52+
53+
executorWantsCpus = 0.1
54+
executorWantsMem = 32
55+
executorWantsPorts = 1
5156
)
5257

5358
// State represents the mutability of the scheduler.
@@ -221,6 +226,11 @@ func (s *EtcdScheduler) ResourceOffers(
221226
driver scheduler.SchedulerDriver,
222227
offers []*mesos.Offer,
223228
) {
229+
var (
230+
cpusWanted = s.cpusPerTask + executorWantsCpus
231+
memWanted = s.memPerTask + executorWantsMem
232+
portsWanted = uint64(portsPerTask + executorWantsPorts)
233+
)
224234
for _, offer := range offers {
225235
resources := parseOffer(offer)
226236

@@ -262,25 +272,25 @@ func (s *EtcdScheduler) ResourceOffers(
262272
log.V(2).Infoln("-single-instance-per-slave is false, continuing.")
263273
}
264274

265-
if resources.cpus < s.cpusPerTask {
275+
if resources.cpus < cpusWanted {
266276
log.V(1).Infoln("Offer cpu is insufficient.")
267277
}
268278

269-
if resources.mems < s.memPerTask {
279+
if resources.mems < memWanted {
270280
log.V(1).Infoln("Offer memory is insufficient.")
271281
}
272282

273-
if totalPorts < portsPerTask {
283+
if totalPorts < portsWanted {
274284
log.V(1).Infoln("Offer ports are insuffient.")
275285
}
276286

277287
if resources.disk < s.diskPerTask {
278288
log.V(1).Infoln("Offer disk is insufficient.")
279289
}
280290

281-
if resources.cpus >= s.cpusPerTask &&
282-
resources.mems >= s.memPerTask &&
283-
totalPorts >= portsPerTask &&
291+
if resources.cpus >= cpusWanted &&
292+
resources.mems >= memWanted &&
293+
totalPorts >= portsWanted &&
284294
resources.disk >= s.diskPerTask &&
285295
s.offerCache.Push(offer) {
286296

@@ -884,12 +894,15 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
884894
return
885895
}
886896

887-
// TODO(tyler) this is a broken hack
888-
resources := parseOffer(offer)
889-
lowest := *resources.ports[0].Begin
890-
rpcPort := lowest
891-
clientPort := lowest + 1
892-
httpPort := lowest + 2
897+
// TODO(tyler) this is a broken hack; task gets low ports, executor gets high ports
898+
var (
899+
resources = parseOffer(offer)
900+
lowest = *resources.ports[0].Begin
901+
rpcPort = lowest
902+
clientPort = lowest + 1
903+
httpPort = lowest + 2
904+
libprocessPort = lowest + 3
905+
)
893906

894907
s.mut.Lock()
895908
var clusterType string
@@ -928,7 +941,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
928941

929942
configSummary := node.String()
930943
taskID := &mesos.TaskID{Value: &configSummary}
931-
executor := s.newExecutorInfo(node, s.executorUris)
944+
executor := s.newExecutorInfo(node, s.executorUris, libprocessPort)
932945
task := &mesos.TaskInfo{
933946
Data: serializedNodes,
934947
Name: proto.String("etcd-server"),
@@ -940,7 +953,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
940953
util.NewScalarResource("mem", s.memPerTask),
941954
util.NewScalarResource("disk", s.diskPerTask),
942955
util.NewRangesResource("ports", []*mesos.Value_Range{
943-
util.NewValueRange(uint64(rpcPort), uint64(httpPort)),
956+
util.NewValueRange(uint64(rpcPort), uint64(rpcPort+portsPerTask-1)),
944957
}),
945958
},
946959
Discovery: &mesos.DiscoveryInfo{
@@ -1198,22 +1211,33 @@ func ServeExecutorArtifact(path, address string, artifactPort int) (*string, err
11981211
func (s *EtcdScheduler) newExecutorInfo(
11991212
node *config.Node,
12001213
executorURIs []*mesos.CommandInfo_URI,
1214+
libprocessPort uint64,
12011215
) *mesos.ExecutorInfo {
12021216

1203-
_, bin := filepath.Split(s.ExecutorPath)
1204-
execmd := fmt.Sprintf("./%s -log_dir=./", bin)
1205-
1217+
var (
1218+
_, bin = filepath.Split(s.ExecutorPath)
1219+
execmd = fmt.Sprintf("./" + bin)
1220+
ci = &mesos.CommandInfo{
1221+
Value: proto.String(execmd),
1222+
Shell: proto.Bool(false),
1223+
Uris: executorURIs,
1224+
}
1225+
)
1226+
ci.Arguments = append(ci.Arguments, execmd)
1227+
ci.Arguments = append(ci.Arguments, "-log_dir=./")
1228+
ci.Arguments = append(ci.Arguments, "-driver-port="+strconv.Itoa(int(libprocessPort)))
12061229
return &mesos.ExecutorInfo{
12071230
ExecutorId: util.NewExecutorID(node.Name),
12081231
Name: proto.String("etcd"),
1209-
Source: proto.String("go_test"),
1210-
Command: &mesos.CommandInfo{
1211-
Value: proto.String(execmd),
1212-
Uris: executorURIs,
1213-
},
1232+
Source: proto.String(s.FrameworkName),
1233+
Command: ci,
12141234
Resources: []*mesos.Resource{
1215-
util.NewScalarResource("cpus", 0.1),
1216-
util.NewScalarResource("mem", 32),
1235+
util.NewScalarResource("cpus", executorWantsCpus),
1236+
util.NewScalarResource("mem", executorWantsMem),
1237+
util.NewRangesResource("ports", []*mesos.Value_Range{
1238+
// see hack in launchOne(), libprocessPort is the base of the executor port resource range
1239+
util.NewValueRange(libprocessPort, libprocessPort+executorWantsPorts-1),
1240+
}),
12171241
},
12181242
}
12191243
}

0 commit comments

Comments
 (0)