Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource allocation-related fixes, etc #93

Merged
merged 3 commits into from
Feb 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/etcd-mesos-executor/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package main

import (
"flag"
"net"
"os"
"time"

log "github.com/golang/glog"
Expand All @@ -29,15 +31,29 @@ import (
)

func main() {
launchTimeout :=
flag.Uint("launch-timeout", 240,
var (
launchTimeout = flag.Uint("launch-timeout", 240,
"Seconds to retry launching an etcd instance for before giving up. "+
"This should be long enough for a port occupied by a killed process "+
"to be vacated.")
driverPort = flag.Uint("driver-port", 0, "Libprocess port for the executor driver")
)
flag.Parse()
if *driverPort == 0 {
log.Fatal("missing or incorrectly specified driver-port flag, must be > 0")
}
log.Infoln("Starting etcd Executor")

var address net.IP
if libprocessIP := os.Getenv("LIBPROCESS_IP"); libprocessIP != "" {
address = net.ParseIP(libprocessIP)
if address == nil {
log.Warningf("failed to parse IP address from LIBPROCESS_IP envvar %q", libprocessIP)
}
}
dconfig := executor.DriverConfig{
BindingAddress: address,
BindingPort: uint16(*driverPort),
Executor: etcdexecutor.New(
time.Duration(*launchTimeout) * time.Second,
),
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (e *Executor) etcdHarness(
}
_, err = driver.SendStatusUpdate(runStatus)
if err != nil {
log.Errorf("Got error sending status update, terminating: ", err)
log.Errorf("Got error sending status update, terminating: %v", err)
handleFailure(driver, taskInfo)
}

Expand Down
6 changes: 3 additions & 3 deletions offercache/offercache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestPush(t *testing.T) {
oc.Push(newOffer(o, o))
}
if got := oc.Len(); got != tt.want {
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
}
}
}
Expand All @@ -71,7 +71,7 @@ func TestRescind(t *testing.T) {
oc.Rescind(util.NewOfferID(r))
}
if got := oc.Len(); got != tt.want {
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
}
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func TestBlockingPop(t *testing.T) {
}()

if got != tt.want {
t.Errorf("test #%d: got : %s, want: %s", i, got, tt.want)
t.Errorf("test #%d: got : %d, want: %d", i, got, tt.want)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion scheduler/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (m *MockSchedulerDriver) Wait() {
defer m.Unlock()
m.Called()
}

func (m *MockSchedulerDriver) AcceptOffers(offerIDs []*mesos.OfferID, operations []*mesos.Offer_Operation, filters *mesos.Filters) (mesos.Status, error) {
m.Lock()
defer m.Unlock()
args := m.Called(offerIDs, operations, filters)
return status(args, 0), args.Error(1)
}
func status(args mock.Arguments, at int) (val mesos.Status) {
if x := args.Get(at); x != nil {
val = x.(mesos.Status)
Expand Down
72 changes: 48 additions & 24 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@ import (
)

const (
// portsPerTask: rpcPort, clientPort, httpPort
portsPerTask = 3
notReseeding = 0
reseedUnderway = 1

executorWantsCpus = 0.1
executorWantsMem = 32
executorWantsPorts = 1
)

// State represents the mutability of the scheduler.
Expand Down Expand Up @@ -221,6 +226,11 @@ func (s *EtcdScheduler) ResourceOffers(
driver scheduler.SchedulerDriver,
offers []*mesos.Offer,
) {
var (
cpusWanted = s.cpusPerTask + executorWantsCpus
memWanted = s.memPerTask + executorWantsMem
portsWanted = uint64(portsPerTask + executorWantsPorts)
)
for _, offer := range offers {
resources := parseOffer(offer)

Expand Down Expand Up @@ -262,25 +272,25 @@ func (s *EtcdScheduler) ResourceOffers(
log.V(2).Infoln("-single-instance-per-slave is false, continuing.")
}

if resources.cpus < s.cpusPerTask {
if resources.cpus < cpusWanted {
log.V(1).Infoln("Offer cpu is insufficient.")
}

if resources.mems < s.memPerTask {
if resources.mems < memWanted {
log.V(1).Infoln("Offer memory is insufficient.")
}

if totalPorts < portsPerTask {
if totalPorts < portsWanted {
log.V(1).Infoln("Offer ports are insuffient.")
}

if resources.disk < s.diskPerTask {
log.V(1).Infoln("Offer disk is insufficient.")
}

if resources.cpus >= s.cpusPerTask &&
resources.mems >= s.memPerTask &&
totalPorts >= portsPerTask &&
if resources.cpus >= cpusWanted &&
resources.mems >= memWanted &&
totalPorts >= portsWanted &&
resources.disk >= s.diskPerTask &&
s.offerCache.Push(offer) {

Expand Down Expand Up @@ -884,12 +894,15 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
return
}

// TODO(tyler) this is a broken hack
resources := parseOffer(offer)
lowest := *resources.ports[0].Begin
rpcPort := lowest
clientPort := lowest + 1
httpPort := lowest + 2
// TODO(tyler) this is a broken hack; task gets low ports, executor gets high ports
var (
resources = parseOffer(offer)
lowest = *resources.ports[0].Begin
rpcPort = lowest
clientPort = lowest + 1
httpPort = lowest + 2
libprocessPort = lowest + 3
)

s.mut.Lock()
var clusterType string
Expand Down Expand Up @@ -928,7 +941,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {

configSummary := node.String()
taskID := &mesos.TaskID{Value: &configSummary}
executor := s.newExecutorInfo(node, s.executorUris)
executor := s.newExecutorInfo(node, s.executorUris, libprocessPort)
task := &mesos.TaskInfo{
Data: serializedNodes,
Name: proto.String("etcd-server"),
Expand All @@ -940,7 +953,7 @@ func (s *EtcdScheduler) launchOne(driver scheduler.SchedulerDriver) {
util.NewScalarResource("mem", s.memPerTask),
util.NewScalarResource("disk", s.diskPerTask),
util.NewRangesResource("ports", []*mesos.Value_Range{
util.NewValueRange(uint64(rpcPort), uint64(httpPort)),
util.NewValueRange(uint64(rpcPort), uint64(rpcPort+portsPerTask-1)),
}),
},
Discovery: &mesos.DiscoveryInfo{
Expand Down Expand Up @@ -1198,22 +1211,33 @@ func ServeExecutorArtifact(path, address string, artifactPort int) (*string, err
func (s *EtcdScheduler) newExecutorInfo(
node *config.Node,
executorURIs []*mesos.CommandInfo_URI,
libprocessPort uint64,
) *mesos.ExecutorInfo {

_, bin := filepath.Split(s.ExecutorPath)
execmd := fmt.Sprintf("./%s -log_dir=./", bin)

var (
_, bin = filepath.Split(s.ExecutorPath)
execmd = fmt.Sprintf("./" + bin)
ci = &mesos.CommandInfo{
Value: proto.String(execmd),
Shell: proto.Bool(false),
Uris: executorURIs,
}
)
ci.Arguments = append(ci.Arguments, execmd)
ci.Arguments = append(ci.Arguments, "-log_dir=./")
ci.Arguments = append(ci.Arguments, "-driver-port="+strconv.Itoa(int(libprocessPort)))
return &mesos.ExecutorInfo{
ExecutorId: util.NewExecutorID(node.Name),
Name: proto.String("etcd"),
Source: proto.String("go_test"),
Command: &mesos.CommandInfo{
Value: proto.String(execmd),
Uris: executorURIs,
},
Source: proto.String(s.FrameworkName),
Command: ci,
Resources: []*mesos.Resource{
util.NewScalarResource("cpus", 0.1),
util.NewScalarResource("mem", 32),
util.NewScalarResource("cpus", executorWantsCpus),
util.NewScalarResource("mem", executorWantsMem),
util.NewRangesResource("ports", []*mesos.Value_Range{
// see hack in launchOne(), libprocessPort is the base of the executor port resource range
util.NewValueRange(libprocessPort, libprocessPort+executorWantsPorts-1),
}),
},
}
}