Skip to content

Commit 791feed

Browse files
committed
Merge pull request #93 from mesosphere/jdef_fix_92
resource allocation-related fixes, etc
2 parents 1b83f80 + 5a40190 commit 791feed

File tree

5 files changed

+76
-31
lines changed

5 files changed

+76
-31
lines changed

cmd/etcd-mesos-executor/app.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package main
2020

2121
import (
2222
"flag"
23+
"net"
24+
"os"
2325
"time"
2426

2527
log "github.com/golang/glog"
@@ -29,15 +31,29 @@ import (
2931
)
3032

3133
func main() {
32-
launchTimeout :=
33-
flag.Uint("launch-timeout", 240,
34+
var (
35+
launchTimeout = flag.Uint("launch-timeout", 240,
3436
"Seconds to retry launching an etcd instance for before giving up. "+
3537
"This should be long enough for a port occupied by a killed process "+
3638
"to be vacated.")
39+
driverPort = flag.Uint("driver-port", 0, "Libprocess port for the executor driver")
40+
)
3741
flag.Parse()
42+
if *driverPort == 0 {
43+
log.Fatal("missing or incorrectly specified driver-port flag, must be > 0")
44+
}
3845
log.Infoln("Starting etcd Executor")
3946

47+
var address net.IP
48+
if libprocessIP := os.Getenv("LIBPROCESS_IP"); libprocessIP != "" {
49+
address = net.ParseIP(libprocessIP)
50+
if address == nil {
51+
log.Warningf("failed to parse IP address from LIBPROCESS_IP envvar %q", libprocessIP)
52+
}
53+
}
4054
dconfig := executor.DriverConfig{
55+
BindingAddress: address,
56+
BindingPort: uint16(*driverPort),
4157
Executor: etcdexecutor.New(
4258
time.Duration(*launchTimeout) * time.Second,
4359
),

executor/executor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (e *Executor) etcdHarness(
199199
}
200200
_, err = driver.SendStatusUpdate(runStatus)
201201
if err != nil {
202-
log.Errorf("Got error sending status update, terminating: ", err)
202+
log.Errorf("Got error sending status update, terminating: %v", err)
203203
handleFailure(driver, taskInfo)
204204
}
205205

offercache/offercache_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestPush(t *testing.T) {
4444
oc.Push(newOffer(o, o))
4545
}
4646
if got := oc.Len(); got != tt.want {
47-
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
47+
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
4848
}
4949
}
5050
}
@@ -71,7 +71,7 @@ func TestRescind(t *testing.T) {
7171
oc.Rescind(util.NewOfferID(r))
7272
}
7373
if got := oc.Len(); got != tt.want {
74-
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
74+
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
7575
}
7676
}
7777

@@ -117,7 +117,7 @@ func TestBlockingPop(t *testing.T) {
117117
}()
118118

119119
if got != tt.want {
120-
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
120+
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
121121
}
122122
}
123123
}

scheduler/mock_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ func (m *MockSchedulerDriver) Wait() {
151151
defer m.Unlock()
152152
m.Called()
153153
}
154-
154+
func (m *MockSchedulerDriver) AcceptOffers(offerIDs []*mesos.OfferID, operations []*mesos.Offer_Operation, filters *mesos.Filters) (mesos.Status, error) {
155+
m.Lock()
156+
defer m.Unlock()
157+
args := m.Called(offerIDs, operations, filters)
158+
return status(args, 0), args.Error(1)
159+
}
155160
func status(args mock.Arguments, at int) (val mesos.Status) {
156161
if x := args.Get(at); x != nil {
157162
val = x.(mesos.Status)

scheduler/scheduler.go

+48-24
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,14 @@ import (
4545
)
4646

4747
const (
48+
// portsPerTask: rpcPort, clientPort, httpPort
4849
portsPerTask = 3
4950
notReseeding = 0
5051
reseedUnderway = 1
52+
53+
executorWantsCpus = 0.1
54+
executorWantsMem = 32
55+
executorWantsPorts = 1
5156
)
5257

5358
// State represents the mutability of the scheduler.
@@ -221,6 +226,11 @@ func (s *EtcdScheduler) ResourceOffers(
221226
driver scheduler.SchedulerDriver,
222227
offers []*mesos.Offer,
223228
) {
229+
var (
230+
cpusWanted = s.cpusPerTask + executorWantsCpus
231+
memWanted = s.memPerTask + executorWantsMem
232+
portsWanted = uint64(portsPerTask + executorWantsPorts)
233+
)
224234
for _, offer := range offers {
225235
resources := parseOffer(offer)
226236

@@ -262,25 +272,25 @@ func (s *EtcdScheduler) ResourceOffers(
262272
log.V(2).Infoln("-single-instance-per-slave is false, continuing.")
263273
}
264274

265-
if resources.cpus < s.cpusPerTask {
275+
if resources.cpus < cpusWanted {
266276
log.V(1).Infoln("Offer cpu is insufficient.")
267277
}
268278

269-
if resources.mems < s.memPerTask {
279+
if resources.mems < memWanted {
270280
log.V(1).Infoln("Offer memory is insufficient.")
271281
}
272282

273-
if totalPorts < portsPerTask {
283+
if totalPorts < portsWanted {
274284
log.V(1).Infoln("Offer ports are insuffient.")
275285
}
276286

277287
if resources.disk < s.diskPerTask {
278288
log.V(1).Infoln("Offer disk is insufficient.")
279289
}
280290

281-
if resources.cpus >= s.cpusPerTask &&
282-
resources.mems >= s.memPerTask &&
283-
totalPorts >= portsPerTask &&
291+
if resources.cpus >= cpusWanted &&
292+
resources.mems >= memWanted &&
293+
totalPorts >= portsWanted &&
284294
resources.disk >= s.diskPerTask &&
285295
s.offerCache.Push(offer) {
286296

@@ -884,12 +894,15 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
884894
return
885895
}
886896

887-
// TODO(tyler) this is a broken hack
888-
resources := parseOffer(offer)
889-
lowest := *resources.ports[0].Begin
890-
rpcPort := lowest
891-
clientPort := lowest + 1
892-
httpPort := lowest + 2
897+
// TODO(tyler) this is a broken hack; task gets low ports, executor gets high ports
898+
var (
899+
resources = parseOffer(offer)
900+
lowest = *resources.ports[0].Begin
901+
rpcPort = lowest
902+
clientPort = lowest + 1
903+
httpPort = lowest + 2
904+
libprocessPort = lowest + 3
905+
)
893906

894907
s.mut.Lock()
895908
var clusterType string
@@ -928,7 +941,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
928941

929942
configSummary := node.String()
930943
taskID := &mesos.TaskID{Value: &configSummary}
931-
executor := s.newExecutorInfo(node, s.executorUris)
944+
executor := s.newExecutorInfo(node, s.executorUris, libprocessPort)
932945
task := &mesos.TaskInfo{
933946
Data: serializedNodes,
934947
Name: proto.String("etcd-server"),
@@ -940,7 +953,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
940953
util.NewScalarResource("mem", s.memPerTask),
941954
util.NewScalarResource("disk", s.diskPerTask),
942955
util.NewRangesResource("ports", []*mesos.Value_Range{
943-
util.NewValueRange(uint64(rpcPort), uint64(httpPort)),
956+
util.NewValueRange(uint64(rpcPort), uint64(rpcPort+portsPerTask-1)),
944957
}),
945958
},
946959
Discovery: &mesos.DiscoveryInfo{
@@ -1198,22 +1211,33 @@ func ServeExecutorArtifact(path, address string, artifactPort int) (*string, err
11981211
func (s *EtcdScheduler) newExecutorInfo(
11991212
node *config.Node,
12001213
executorURIs []*mesos.CommandInfo_URI,
1214+
libprocessPort uint64,
12011215
) *mesos.ExecutorInfo {
12021216

1203-
_, bin := filepath.Split(s.ExecutorPath)
1204-
execmd := fmt.Sprintf("./%s -log_dir=./", bin)
1205-
1217+
var (
1218+
_, bin = filepath.Split(s.ExecutorPath)
1219+
execmd = fmt.Sprintf("./" + bin)
1220+
ci = &mesos.CommandInfo{
1221+
Value: proto.String(execmd),
1222+
Shell: proto.Bool(false),
1223+
Uris: executorURIs,
1224+
}
1225+
)
1226+
ci.Arguments = append(ci.Arguments, execmd)
1227+
ci.Arguments = append(ci.Arguments, "-log_dir=./")
1228+
ci.Arguments = append(ci.Arguments, "-driver-port="+strconv.Itoa(int(libprocessPort)))
12061229
return &mesos.ExecutorInfo{
12071230
ExecutorId: util.NewExecutorID(node.Name),
12081231
Name: proto.String("etcd"),
1209-
Source: proto.String("go_test"),
1210-
Command: &mesos.CommandInfo{
1211-
Value: proto.String(execmd),
1212-
Uris: executorURIs,
1213-
},
1232+
Source: proto.String(s.FrameworkName),
1233+
Command: ci,
12141234
Resources: []*mesos.Resource{
1215-
util.NewScalarResource("cpus", 0.1),
1216-
util.NewScalarResource("mem", 32),
1235+
util.NewScalarResource("cpus", executorWantsCpus),
1236+
util.NewScalarResource("mem", executorWantsMem),
1237+
util.NewRangesResource("ports", []*mesos.Value_Range{
1238+
// see hack in launchOne(), libprocessPort is the base of the executor port resource range
1239+
util.NewValueRange(libprocessPort, libprocessPort+executorWantsPorts-1),
1240+
}),
12171241
},
12181242
}
12191243
}

0 commit comments

Comments
 (0)