Skip to content
Open
217 changes: 173 additions & 44 deletions deployment/actionManger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deployment

import (
"errors"
"math/rand"
"sync"
"v9_deployment_manager/activator"
Expand All @@ -12,23 +13,35 @@ import (
const headHashSentinel = "HEAD"
const updaterChanSize = 1024

type HashAndInstanceCount struct {
hash string
instanceCount int
}

type PathAndInstanceCount struct {
path worker.ComponentPath
instanceCount int
}

type ActionManager struct {
driver *database.Driver

activator *activator.Activator
workers []*worker.V9Worker

pathHashMux sync.Mutex
pathHashes map[worker.ComponentPath]string
pathHashUpdater chan worker.ComponentID
pathHashMux sync.Mutex
pathHashes map[worker.ComponentPath]*HashAndInstanceCount
Comment thread
hjaensch7 marked this conversation as resolved.
pathHashUpdater chan worker.ComponentID
instanceCountUpdater chan PathAndInstanceCount

dirtyStateNotifier chan struct{}
}

func NewActionManager(activator *activator.Activator, dr *database.Driver, workers []*worker.V9Worker) *ActionManager {
pathHashes := make(map[worker.ComponentPath]string)
pathHashes := make(map[worker.ComponentPath]*HashAndInstanceCount)

pathHashUpdater := make(chan worker.ComponentID, updaterChanSize)
instanceCountUpdater := make(chan PathAndInstanceCount, updaterChanSize)
dirtyStateNotifier := make(chan struct{}, 1)

mgr := &ActionManager{
Expand All @@ -37,12 +50,21 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke
activator: activator,
workers: workers,

pathHashes: pathHashes,
pathHashUpdater: pathHashUpdater,
pathHashes: pathHashes,
pathHashUpdater: pathHashUpdater,
instanceCountUpdater: instanceCountUpdater,

dirtyStateNotifier: dirtyStateNotifier,
}
//Populate PathHashes with current system state
mgr.pathHashMux.Lock()
defer mgr.pathHashMux.Unlock()
err := mgr.PopulatePathHashes()
if err != nil {
log.Error.Println("Could not get current system state")
}

//Thread for handling hash changes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before the beginning of comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait do we want a space or not?

go func() {
for {
updatedID := <-pathHashUpdater
Expand All @@ -52,7 +74,21 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke
}

mgr.pathHashMux.Lock()
mgr.pathHashes[path] = updatedID.Hash
mgr.pathHashes[path].hash = updatedID.Hash
mgr.pathHashMux.Unlock()

mgr.NotifyComponentStateChanged()
}
}()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment: should we be batching hash updates in one lock?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this as well. The question is like, do we want a thread for each channel. One thread will be updated by the autoscaler periodically. The other will be changing the hash as needed.

//Thread for handling instance count changes
go func() {
for {
updatedID := <-instanceCountUpdater
path := updatedID.path

mgr.pathHashMux.Lock()
log.Info.Println("component pathHash:", mgr.pathHashes[path])
mgr.pathHashes[path].instanceCount = updatedID.instanceCount
mgr.pathHashMux.Unlock()

mgr.NotifyComponentStateChanged()
Expand All @@ -73,6 +109,19 @@ func NewActionManager(activator *activator.Activator, dr *database.Driver, worke
return mgr
}

func (mgr *ActionManager) PopulatePathHashes() error {
compMap := getCurrentInstanceState(mgr.workers)
for compID, compStats := range compMap {
tmp := &HashAndInstanceCount{compID.Hash, compStats.instanceCount}
mgr.pathHashes[worker.ComponentPath{
User: compID.User,
Repo: compID.Repo,
}] = tmp
}

return nil
}

func (mgr *ActionManager) NotifyComponentStateChanged() {
// Put something in the `dirtyStateNotifier` -- unless someone else already notified that the state was dirty
select {
Expand All @@ -85,6 +134,10 @@ func (mgr *ActionManager) UpdateComponentHash(compID worker.ComponentID) {
mgr.pathHashUpdater <- compID
}

func (mgr *ActionManager) UpdateInstanceCount(compPath worker.ComponentPath, instanceCount int) {
mgr.instanceCountUpdater <- PathAndInstanceCount{compPath, instanceCount}
}

func (mgr *ActionManager) HandleDirtyState() error {
// TODO: Parallelize this step (it basically single threads the deployment manager at the moment)

Expand Down Expand Up @@ -114,8 +167,8 @@ func (mgr *ActionManager) HandleDirtyState() error {
log.Info.Println("Starting active but not running components")
for _, activeComp := range active {
var hashToDeploy = headHashSentinel
if mapHash, ok := mgr.pathHashes[activeComp]; ok {
hashToDeploy = mapHash
if mapVal, ok := mgr.pathHashes[activeComp]; ok {
hashToDeploy = mapVal.hash
}

err = mgr.activateMissing(worker.ComponentID{
Expand All @@ -136,9 +189,9 @@ func (mgr *ActionManager) HandleDirtyState() error {
correctCompID := worker.ComponentID{
User: activeComp.User,
Repo: activeComp.Repo,
Hash: correctHash,
Hash: correctHash.hash,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird naming here

}
err = mgr.ensureSomeWorkerIsRunning(correctCompID)
err = mgr.ensureNWorkerIsRunning(correctCompID)
if err != nil {
return err
}
Expand All @@ -157,7 +210,7 @@ func (mgr *ActionManager) HandleDirtyState() error {
correctCompID := worker.ComponentID{
User: activeComp.User,
Repo: activeComp.Repo,
Hash: correctHash,
Hash: correctHash.hash,
}
for _, w := range mgr.workers {
err = mgr.deactivateIfHashDiffers(w, correctCompID)
Expand All @@ -167,6 +220,8 @@ func (mgr *ActionManager) HandleDirtyState() error {
}
}

// Scale up or down as requested by autoscaler
// TODO this is probably in the wrong place
log.Info.Println("Finished dirty state handling")
return nil
}
Expand Down Expand Up @@ -221,63 +276,137 @@ func (mgr *ActionManager) activateMissing(toCheck worker.ComponentID) error {
mgr.pathHashes[worker.ComponentPath{
User: toCheck.User,
Repo: toCheck.Repo,
}] = activatedHash
}].hash = activatedHash
}

return nil
}

func (mgr *ActionManager) ensureSomeWorkerIsRunning(compID worker.ComponentID) error {
func (mgr *ActionManager) ensureNWorkerIsRunning(compID worker.ComponentID) error {
Comment thread
hjaensch7 marked this conversation as resolved.
Comment thread
hjaensch7 marked this conversation as resolved.
compPath := worker.ComponentPath{
User: compID.User,
Repo: compID.Repo,
}

notRunningAnyVersion := make([]*worker.V9Worker, 0)

for _, w := range mgr.workers {
status, err := w.Status()
if err != nil {
return err
}

for _, runningComp := range status.ActiveComponents {
// If we find someone running exactly this ID, we have ensured some worker is running this ID
if runningComp.ID == compID {
//Get instance states
compMap := getCurrentInstanceState(mgr.workers)
// Check if the component is deployed on anything
if _, ok := compMap[compID]; ok {
//Check if should scale up current state instance count vs intended count
if compMap[compID].instanceCount < mgr.pathHashes[compPath].instanceCount {
log.Info.Println("SCALING UP:", compID)
//Find worker where this comp isn't deployed
workerToDeployTo, err := mgr.findWorkerToDeployTo(compID)
if err != nil {
log.Error.Println("Worker deployed on all nodes")
return nil
}
//Deploy component and update hash
err = mgr.deployToWorkerAndUpdateHash(workerToDeployTo, compID)
if err != nil {
return err
}
return nil
}

if !status.ContainsPath(compPath) {
notRunningAnyVersion = append(notRunningAnyVersion, w)
//Check if should scale down intended count vs actual count
if compMap[compID].instanceCount > mgr.pathHashes[compPath].instanceCount {
//Deactivate component on some worker
log.Info.Println("SCALING DOWN:", compID)
err := mgr.deactivateComponentOnSomeWorker(compID)
if err != nil {
return err
}
return nil
}
}
//First time deploy
//Deploy component to a random worker
err := mgr.deployToWorkerAndUpdateHash(mgr.workers[rand.Intn(len(mgr.workers))], compID)
if err != nil {
return err
}
mgr.pathHashes[compPath] = &HashAndInstanceCount{compID.Hash, 1}
return nil
}

// If we get here we need to deploy to some worker
var workerToDeployTo *worker.V9Worker
if len(notRunningAnyVersion) > 0 {
workerToDeployTo = notRunningAnyVersion[rand.Intn(len(notRunningAnyVersion))]
} else {
// If everyone is running it, then we need to create a place to deploy to
workerToDeployTo = mgr.workers[rand.Intn(len(mgr.workers))]
err := mgr.activator.Deactivate(compID, workerToDeployTo)
if err != nil {
return err
func (mgr *ActionManager) deployToWorkerAndUpdateHash(w *worker.V9Worker, compID worker.ComponentID) error {
compPath := worker.ComponentPath{
User: compID.User,
Repo: compID.Repo,
}
deployedHash, err := mgr.activator.Activate(compID, w)
if err != nil {
return err
}

//If this comp isn't in the map add it
if _, ok := mgr.pathHashes[compPath]; ok {
// Update the hash we're storing if we had HEAD
if compID.Hash == headHashSentinel {
mgr.pathHashes[compPath].hash = deployedHash
}
} else {
tmp := &HashAndInstanceCount{deployedHash, 1}
mgr.pathHashes[compPath] = tmp
}
return nil
}

log.Info.Println("Doing to deploy to ensure", compID, "is on some worker", workerToDeployTo)
deployedHash, err := mgr.activator.Activate(compID, workerToDeployTo)
func (mgr *ActionManager) deactivateComponentOnSomeWorker(compID worker.ComponentID) error {
//Find worker where this comp is deployed
workerToDeactivateOn, err := mgr.findWorkerToDeactivateOn(compID)
if err != nil {
log.Error.Println("Comp not running on any workers")
return nil
}
//Deactivate
err = mgr.activator.Deactivate(compID, workerToDeactivateOn)
if err != nil {
return err
}
return nil
}

// Update the hash we're storing if we had HEAD
if compID.Hash == headHashSentinel {
mgr.pathHashes[compPath] = deployedHash
func (mgr *ActionManager) findWorkerToDeployTo(compID worker.ComponentID) (*worker.V9Worker, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this function already exist in this file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function ensures that the worker it returns doesn't have the component running on it.

for _, w := range mgr.workers {
status, err := w.Status()
if err != nil {
return nil, err
}
compIsOnWorker := false
for _, runningComp := range status.ActiveComponents {
if runningComp.ID == compID {
compIsOnWorker = true
break
}
}
if !compIsOnWorker {
return w, nil
}
}
err := errors.New("comp on every worker")
return nil, err
}

return nil
func (mgr *ActionManager) findWorkerToDeactivateOn(compID worker.ComponentID) (*worker.V9Worker, error) {
for _, w := range mgr.workers {
status, err := w.Status()
if err != nil {
return nil, err
}
compIsOnWorker := false
for _, runningComp := range status.ActiveComponents {
if runningComp.ID == compID {
compIsOnWorker = true
break
}
}
if compIsOnWorker {
return w, nil
}
}
err := errors.New("comp on no workers")
return nil, err
}

func (mgr *ActionManager) deactivateIfHashDiffers(w *worker.V9Worker, compID worker.ComponentID) error {
Expand Down
Loading