diff --git a/deployment/actionManger.go b/deployment/actionManger.go index 421745d..db66d69 100644 --- a/deployment/actionManger.go +++ b/deployment/actionManger.go @@ -1,6 +1,7 @@ package deployment import ( + "errors" "math/rand" "sync" "v9_deployment_manager/activator" @@ -12,23 +13,35 @@ import ( const headHashSentinel = "HEAD" const updaterChanSize = 1024 +type HashAndInstanceCount struct { + hash string + instanceCount int +} + +type PathAndInstanceCount struct { + path worker.ComponentPath + instanceCount int +} + type ActionManager struct { driver *database.Driver activator *activator.Activator workers []*worker.V9Worker - pathHashMux sync.Mutex - pathHashes map[worker.ComponentPath]string - pathHashUpdater chan worker.ComponentID + pathHashMux sync.Mutex + pathHashes map[worker.ComponentPath]*HashAndInstanceCount + pathHashUpdater chan worker.ComponentID + instanceCountUpdater chan PathAndInstanceCount dirtyStateNotifier chan struct{} } func NewActionManager(activator *activator.Activator, dr *database.Driver, workers []*worker.V9Worker) *ActionManager { - pathHashes := make(map[worker.ComponentPath]string) + pathHashes := make(map[worker.ComponentPath]*HashAndInstanceCount) pathHashUpdater := make(chan worker.ComponentID, updaterChanSize) + instanceCountUpdater := make(chan PathAndInstanceCount, updaterChanSize) dirtyStateNotifier := make(chan struct{}, 1) mgr := &ActionManager{ @@ -37,12 +50,21 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke activator: activator, workers: workers, - pathHashes: pathHashes, - pathHashUpdater: pathHashUpdater, + pathHashes: pathHashes, + pathHashUpdater: pathHashUpdater, + instanceCountUpdater: instanceCountUpdater, dirtyStateNotifier: dirtyStateNotifier, } + //Populate PathHashes with current system state + mgr.pathHashMux.Lock() + defer mgr.pathHashMux.Unlock() + err := mgr.PopulatePathHashes() + if err != nil { + log.Error.Println("Could not get current system state") + } + //Thread for handling hash changes go func() { for { updatedID := <-pathHashUpdater @@ -52,7 +74,21 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke } mgr.pathHashMux.Lock() - mgr.pathHashes[path] = updatedID.Hash + mgr.pathHashes[path].hash = updatedID.Hash + mgr.pathHashMux.Unlock() + + mgr.NotifyComponentStateChanged() + } + }() + //Thread for handling instance count changes + go func() { + for { + updatedID := <-instanceCountUpdater + path := updatedID.path + + mgr.pathHashMux.Lock() + log.Info.Println("component pathHash:", mgr.pathHashes[path]) + mgr.pathHashes[path].instanceCount = updatedID.instanceCount mgr.pathHashMux.Unlock() mgr.NotifyComponentStateChanged() @@ -73,6 +109,19 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke return mgr } +func (mgr *ActionManager) PopulatePathHashes() error { + compMap := getCurrentInstanceState(mgr.workers) + for compID, compStats := range compMap { + tmp := &HashAndInstanceCount{compID.Hash, compStats.instanceCount} + mgr.pathHashes[worker.ComponentPath{ + User: compID.User, + Repo: compID.Repo, + }] = tmp + } + + return nil +} + func (mgr *ActionManager) NotifyComponentStateChanged() { // Put something in the `dirtyStateNotifier` -- unless someone else already notified that the state was dirty select { @@ -85,6 +134,10 @@ func (mgr *ActionManager) UpdateComponentHash(compID worker.ComponentID) { mgr.pathHashUpdater <- compID } +func (mgr *ActionManager) UpdateInstanceCount(compPath worker.ComponentPath, instanceCount int) { + mgr.instanceCountUpdater <- PathAndInstanceCount{compPath, instanceCount} +} + func (mgr *ActionManager) HandleDirtyState() error { // TODO: Parallelize this step (it basically single threads the deployment manager at the moment) @@ -114,8 +167,8 @@ func (mgr *ActionManager) HandleDirtyState() error { log.Info.Println("Starting active but not running components") for _, activeComp := range active { var hashToDeploy = headHashSentinel - if mapHash, ok := mgr.pathHashes[activeComp]; ok { - hashToDeploy = mapHash + if mapVal, ok := mgr.pathHashes[activeComp]; ok { + hashToDeploy = mapVal.hash } err = mgr.activateMissing(worker.ComponentID{ @@ -136,9 +189,9 @@ func (mgr *ActionManager) HandleDirtyState() error { correctCompID := worker.ComponentID{ User: activeComp.User, Repo: activeComp.Repo, - Hash: correctHash, + Hash: correctHash.hash, } - err = mgr.ensureSomeWorkerIsRunning(correctCompID) + err = mgr.ensureNWorkerIsRunning(correctCompID) if err != nil { return err } @@ -157,7 +210,7 @@ func (mgr *ActionManager) HandleDirtyState() error { correctCompID := worker.ComponentID{ User: activeComp.User, Repo: activeComp.Repo, - Hash: correctHash, + Hash: correctHash.hash, } for _, w := range mgr.workers { err = mgr.deactivateIfHashDiffers(w, correctCompID) @@ -167,6 +220,8 @@ func (mgr *ActionManager) HandleDirtyState() error { } } + // Scale up or down as requested by autoscaler + // TODO this is probably in the wrong place log.Info.Println("Finished dirty state handling") return nil } @@ -221,63 +276,137 @@ func (mgr *ActionManager) activateMissing(toCheck worker.ComponentID) error { mgr.pathHashes[worker.ComponentPath{ User: toCheck.User, Repo: toCheck.Repo, - }] = activatedHash + }].hash = activatedHash } return nil } -func (mgr *ActionManager) ensureSomeWorkerIsRunning(compID worker.ComponentID) error { +func (mgr *ActionManager) ensureNWorkerIsRunning(compID worker.ComponentID) error { compPath := worker.ComponentPath{ User: compID.User, Repo: compID.Repo, } - notRunningAnyVersion := make([]*worker.V9Worker, 0) - - for _, w := range mgr.workers { - status, err := w.Status() - if err != nil { - return err - } - - for _, runningComp := range status.ActiveComponents { - // If we find someone running exactly this ID, we have ensured some worker is running this ID - if runningComp.ID == compID { + //Get instance states + compMap := getCurrentInstanceState(mgr.workers) + // Check if the component is deployed on anything + if _, ok := compMap[compID]; ok { + //Check if should scale up current state instance count vs intended count + if compMap[compID].instanceCount < mgr.pathHashes[compPath].instanceCount { + log.Info.Println("SCALING UP:", compID) + //Find worker where this comp isn't deployed + workerToDeployTo, err := mgr.findWorkerToDeployTo(compID) + if err != nil { + log.Error.Println("Worker deployed on all nodes") return nil } + //Deploy component and update hash + err = mgr.deployToWorkerAndUpdateHash(workerToDeployTo, compID) + if err != nil { + return err + } + return nil } - - if !status.ContainsPath(compPath) { - notRunningAnyVersion = append(notRunningAnyVersion, w) + //Check if should scale down intended count vs actual count + if compMap[compID].instanceCount > mgr.pathHashes[compPath].instanceCount { + //Deactivate component on some worker + log.Info.Println("SCALING DOWN:", compID) + err := mgr.deactivateComponentOnSomeWorker(compID) + if err != nil { + return err + } + return nil } } + //First time deploy + //Deploy component to a random worker + err := mgr.deployToWorkerAndUpdateHash(mgr.workers[rand.Intn(len(mgr.workers))], compID) + if err != nil { + return err + } + mgr.pathHashes[compPath] = &HashAndInstanceCount{compID.Hash, 1} + return nil +} - // If we get here we need to deploy to some worker - var workerToDeployTo *worker.V9Worker - if len(notRunningAnyVersion) > 0 { - workerToDeployTo = notRunningAnyVersion[rand.Intn(len(notRunningAnyVersion))] - } else { - // If everyone is running it, then we need to create a place to deploy to - workerToDeployTo = mgr.workers[rand.Intn(len(mgr.workers))] - err := mgr.activator.Deactivate(compID, workerToDeployTo) - if err != nil { - return err +func (mgr *ActionManager) deployToWorkerAndUpdateHash(w *worker.V9Worker, compID worker.ComponentID) error { + compPath := worker.ComponentPath{ + User: compID.User, + Repo: compID.Repo, + } + deployedHash, err := mgr.activator.Activate(compID, w) + if err != nil { + return err + } + + //If this comp isn't in the map add it + if _, ok := mgr.pathHashes[compPath]; ok { + // Update the hash we're storing if we had HEAD + if compID.Hash == headHashSentinel { + mgr.pathHashes[compPath].hash = deployedHash } + } else { + tmp := &HashAndInstanceCount{deployedHash, 1} + mgr.pathHashes[compPath] = tmp } + return nil +} - log.Info.Println("Doing to deploy to ensure", compID, "is on some worker", workerToDeployTo) - deployedHash, err := mgr.activator.Activate(compID, workerToDeployTo) +func (mgr *ActionManager) deactivateComponentOnSomeWorker(compID worker.ComponentID) error { + //Find worker where this comp is deployed + workerToDeactivateOn, err := mgr.findWorkerToDeactivateOn(compID) + if err != nil { + log.Error.Println("Comp not running on any workers") + return nil + } + //Deactivate + err = mgr.activator.Deactivate(compID, workerToDeactivateOn) if err != nil { return err } + return nil +} - // Update the hash we're storing if we had HEAD - if compID.Hash == headHashSentinel { - mgr.pathHashes[compPath] = deployedHash +func (mgr *ActionManager) findWorkerToDeployTo(compID worker.ComponentID) (*worker.V9Worker, error) { + for _, w := range mgr.workers { + status, err := w.Status() + if err != nil { + return nil, err + } + compIsOnWorker := false + for _, runningComp := range status.ActiveComponents { + if runningComp.ID == compID { + compIsOnWorker = true + break + } + } + if !compIsOnWorker { + return w, nil + } } + err := errors.New("comp on every worker") + return nil, err +} - return nil +func (mgr *ActionManager) findWorkerToDeactivateOn(compID worker.ComponentID) (*worker.V9Worker, error) { + for _, w := range mgr.workers { + status, err := w.Status() + if err != nil { + return nil, err + } + compIsOnWorker := false + for _, runningComp := range status.ActiveComponents { + if runningComp.ID == compID { + compIsOnWorker = true + break + } + } + if compIsOnWorker { + return w, nil + } + } + err := errors.New("comp on no workers") + return nil, err } func (mgr *ActionManager) deactivateIfHashDiffers(w *worker.V9Worker, compID worker.ComponentID) error { diff --git a/deployment/auto_scaler.go b/deployment/auto_scaler.go new file mode 100644 index 0000000..66ab234 --- /dev/null +++ b/deployment/auto_scaler.go @@ -0,0 +1,105 @@ +package deployment + +import ( + "fmt" + "time" + "v9_deployment_manager/database" + "v9_deployment_manager/log" + "v9_deployment_manager/worker" +) + +type AutoScaler struct { + actionManager *ActionManager + driver *database.Driver + workers []*worker.V9Worker +} + +type ComponentStatAndInstances struct { + instanceCount int + averageStats worker.ComponentStats +} + +var MaxHits = 100.0 +var MinHits = 15.0 + +func (scaler *AutoScaler) AutoScale() { + //Get formatted worker ids + // TODO: Pull this out into a helper function + workerIDs := make([]string, len(scaler.workers)) + for i := range scaler.workers { + name := fmt.Sprintf("worker_%d", i) + id, err := scaler.driver.FindWorkerID(name) + if err != nil { + log.Error.Println("error getting worker id:", err) + continue + } + workerIDs[i] = id + } + + // Collect status of each comp on each worker + compMap := getCurrentInstanceState(scaler.workers) + + log.Info.Println("----------------------------") + for _, stats := range compMap { + hits := stats.averageStats.Hits + repo := stats.averageStats.ID.Repo + log.Info.Println("repo: ", repo, "hits: ", hits) + //Evaluate if scaling up is needed + if stats.averageStats.Hits > MaxHits { + log.Info.Println("This repo needs scaling UP repo: ", repo) + scaler.actionManager.UpdateInstanceCount(worker.ComponentPath{ + User: stats.averageStats.ID.User, + Repo: stats.averageStats.ID.Repo, + }, stats.instanceCount+1) + } + //Evaluate if scaling down is needed + if stats.instanceCount > 1 && stats.averageStats.Hits < MinHits { + log.Info.Println("This repo needs scaling DOWN repo: ", repo) + scaler.actionManager.UpdateInstanceCount(worker.ComponentPath{ + User: stats.averageStats.ID.User, + Repo: stats.averageStats.ID.Repo, + }, stats.instanceCount-1) + } + } +} + +func getCurrentInstanceState(workers []*worker.V9Worker) map[worker.ComponentID]*ComponentStatAndInstances { + // Collect status of each comp on each worker + compMap := make(map[worker.ComponentID]*ComponentStatAndInstances) + for _, w := range workers { + status, err := w.Status() + if err != nil { + log.Warning.Println("error getting worker status:", err) + continue + } + + // Keep track of what components are running + for _, componentStats := range status.ActiveComponents { + cID := componentStats.ID + if _, ok := compMap[cID]; ok { + //If CID already in map then average Hits + compMap[cID].instanceCount++ + compMap[cID].averageStats.Hits += componentStats.Hits + compMap[cID].averageStats.Hits /= float64(compMap[cID].instanceCount) + } else { + compMap[cID] = &ComponentStatAndInstances{1, componentStats} + } + } + } + return compMap +} + +func StartAutoScaler(actionManager *ActionManager, driver *database.Driver, workers []*worker.V9Worker, cadence time.Duration) { + scaler := AutoScaler{ + actionManager: actionManager, + driver: driver, + workers: workers, + } + + go func() { + for { + scaler.AutoScale() + time.Sleep(cadence) + } + }() +} diff --git a/main.go b/main.go index d41cffd..20ea09a 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( ) const databasePollingInterval = time.Second * 3 +const autoScalerPollingInterval = time.Second * 15 func main() { //Initialize default ports @@ -71,6 +72,9 @@ func main() { // State may be dirty when we start actionManager.NotifyComponentStateChanged() + //Start Auto Scaler + deployment.StartAutoScaler(actionManager, driver, workers, autoScalerPollingInterval) + http.Handle("/payload", handlers.NewPushHandler(actionManager, driver)) http.Handle("/api/set_deployment_intention", handlers.NewDeploymentIntentionHandler(actionManager, driver)) log.Info.Println("Starting Server...")