Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-json_2.11</artifactId>
<version>2.4.6</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
Expand All @@ -102,8 +107,6 @@
<version>1.9.5</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/indix/mesos/FrameworkConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ class FrameworkConfig(config: Config) {
val goUserName = rootConfig.getString("go-server.user-name")
val goPassword = rootConfig.getString("go-server.password")

val goAuthEnabled = if(rootConfig.hasPath("go-server.auth-enabled")) rootConfig.getBoolean("go-server.auth-enabled") else false

val goAgentKey = if(rootConfig.hasPath("go-agent.auto-register-key")) Some(rootConfig.getString("go-agent.auto-register-key")) else None
}
69 changes: 61 additions & 8 deletions src/main/scala/com/indix/mesos/GOCDPoller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,42 @@ import com.google.common.io.BaseEncoding

import scala.collection.mutable
import scalaj.http._
import play.api.libs.json._

case class GOCDPoller(conf: FrameworkConfig) {

val authToken = BaseEncoding.base64().encode(s"${conf.goUserName}:${conf.goPassword}".getBytes("UTF-8"));

val responseHistory: scala.collection.mutable.MutableList[Int] = mutable.MutableList.empty[Int]


private[mesos] def request(url: String) = {
val request = if(conf.goAuthEnabled) {
Http(url)
.header("Authorization", s"Basic $authToken")
} else {
Http(url)
}.header("Accept", "application/vnd.go.cd.v1+json")
request.asString.body
}

private[mesos] def jsonRequest(url: String) = {
Json.parse(request(url))
}

private[mesos] def xmlRequest(url: String) = {
scala.xml.XML.loadString(request(url))
}

private def buildUrl() = {
s"http://${conf.goServerHost}:${conf.goServerPort}"
}


def goTaskQueueSize(): Int = {
println("Polling GO Server for scheduled jobs")
try {
val response: HttpResponse[String] = Http(s"http://${conf.goServerHost}:${conf.goServerPort}" + "/go/api/jobs/scheduled.xml").asString //.header("Authorization", s"Basic ${authToken}").asString
val responseXml = scala.xml.XML.loadString(response.body)
val responseXml = xmlRequest(buildUrl() + "/go/api/jobs/scheduled.xml")
(responseXml \\ "scheduledJobs" \\ "job").size
} catch {
case e: SocketTimeoutException => {
Expand All @@ -28,19 +52,48 @@ case class GOCDPoller(conf: FrameworkConfig) {
}

def goIdleAgentsCount() = {
println("Polling Go server for idle agents")
try {
val responseJson = jsonRequest(buildUrl() + "/go/api/agents")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tamizhgeek I know the api.go.cd response says this is the schema of the response, but can you quickly check the same on our servers? Our main build is running the latest 16.2.0. They look different actually. We might want to file a bug against it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashwanthkumar The difference in the responses are because of the Accept header we send. The difference is this:

curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic asdadsadasdasd="

and

curl -X GET http://build.gocd.io/go/api/agents -H "Authorization: Basic adsdasdasda=" -H "Accept: application/vnd.go.cd.v1+json"

:)

(responseJson \ "_embedded" \ "agents").toOption.map(jsValue => {
jsValue.as[JsArray].value.map(agent => (agent \ "status").get.as[String]).count(_.equalsIgnoreCase("idle"))
}).getOrElse(0)
} catch {
case e: SocketTimeoutException => {
println("GOCD Server timed out!!")
1
}
}
}

// Return true when the demand remains, same or increased during the last five attempts.
private[mesos] def isDemandPositive(): Boolean = {
val demandTrend = responseHistory.foldRight(0)((h1, h2) => {
if(h2 >= h1) {
1
} else {
-1
}
})
demandTrend == 1
}

def pollAndAddTask() = {
val scheduled : Int = goTaskQueueSize()
println(s"Go server has ${scheduled.toString} pending jobs to be scheduled")
if(scheduled > 0)
if(scheduled > 0) {
responseHistory += scheduled

}
if(responseHistory.size > 5) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like the 5 number (scheduledJobs > 5) to be a configuration :)

println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
responseHistory.clear()
println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.")
if(goIdleAgentsCount() == 0 && isDemandPositive()) {
println("No idle agents found. Launching a new agent launch now.")
TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
responseHistory.clear()
} else {
println("Idle agents found. Not launching any new Go Agent now.")
responseHistory.drop(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do a drop(0) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is to always keep the last 5(basically, any n) responses in the history before taking decisions. Once the decision is taken (upscale/downscale), we start from a clean slate. Would love to have other ideas on how to implement it better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Azhagu - looking at last 5 polls for job queue makes sense. But, starting
clean after a decision is made, doesn't seem to. Shouldn't you dropping the
oldest one and still use the rest of them..?

On Sat, Feb 20, 2016 at 8:41 PM, Azhaguselvan SP notifications@github.com
wrote:

In src/main/scala/com/indix/mesos/GOCDPoller.scala
#3 (comment):

 if(responseHistory.size > 5) {
  •  println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
    
  •  TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •  responseHistory.clear()
    
  •  println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.")
    
  •  if(goIdleAgentsCount() == 0 && isDemandPositive()) {
    
  •    println("No idle agents found. Launching a new agent launch now.")
    
  •    TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •    responseHistory.clear()
    
  •  } else {
    
  •    println("Idle agents found. Not launching any new Go Agent now.")
    
  •    responseHistory.drop(0)
    

My idea is to always keep the last 5(basically, any n) responses in the
history before taking decisions. Once the decision is taken
(upscale/downscale), we start from a clean slate. Would love to have other
ideas on how to implement it better.


Reply to this email directly or view it on GitHub
https://github.com/ind9/gocd-mesos/pull/3/files#r53551737.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sriram-R Hmmm. Once a new agent is launched, we don't expect the jobs to get scheduled immediately. There is some lag between that. If we don't clean the history, isn't there is a possibility that before this agent is launched and the job is scheduled, another one might also get launched?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, that's handled via "UPSCALING" state in your GoCD mesos
framework.

On Sat, Feb 20, 2016 at 8:54 PM, Azhaguselvan SP notifications@github.com
wrote:

In src/main/scala/com/indix/mesos/GOCDPoller.scala
#3 (comment):

 if(responseHistory.size > 5) {
  •  println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
    
  •  TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •  responseHistory.clear()
    
  •  println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.")
    
  •  if(goIdleAgentsCount() == 0 && isDemandPositive()) {
    
  •    println("No idle agents found. Launching a new agent launch now.")
    
  •    TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •    responseHistory.clear()
    
  •  } else {
    
  •    println("Idle agents found. Not launching any new Go Agent now.")
    
  •    responseHistory.drop(0)
    

@Sriram-R https://github.com/sriram-r Hmmm. Once a new agent is
launched, we don't expect the jobs to get scheduled immediately. There is
some lag between that. If we don't clean the history, isn't there is a
possibility that before this agent is launched and the job is scheduled,
another one might also get launched?


Reply to this email directly or view it on GitHub
https://github.com/ind9/gocd-mesos/pull/3/files#r53551958.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, during "UPSCALING" state, we don't schedule any more agent launches. Thanks @Sriram-R will think in those lines 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is what I had in my mind:

  • Poll for job queue status
  • If there's an upscale action needed, determine how many agents need to be
    brought up.
  • Issue scale event for the # of agents
  • Mark the Scalar state to UPSCALING
  • Meanwhile, polling continues, but, use it only to update history, not to
    trigger events.
  • Mark Scalar state to IDLE once upscaling completes
  • Everything continues to chug along

On Sat, Feb 20, 2016 at 8:56 PM, Sriram Ramachandrasekaran <sriram@indix.com

wrote:

Generally, that's handled via "UPSCALING" state in your GoCD mesos
framework.

On Sat, Feb 20, 2016 at 8:54 PM, Azhaguselvan SP <notifications@github.com

wrote:

In src/main/scala/com/indix/mesos/GOCDPoller.scala
#3 (comment):

 if(responseHistory.size > 5) {
  •  println(s"More than 5 jobs pending in the GOCD. queuing a new agent launch now.")
    
  •  TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •  responseHistory.clear()
    
  •  println(s"More than 5 jobs pending in the GOCD. checking if any agents are idle.")
    
  •  if(goIdleAgentsCount() == 0 && isDemandPositive()) {
    
  •    println("No idle agents found. Launching a new agent launch now.")
    
  •    TaskQueue.enqueue(GoTask("", conf.goAgentDocker, ""))
    
  •    responseHistory.clear()
    
  •  } else {
    
  •    println("Idle agents found. Not launching any new Go Agent now.")
    
  •    responseHistory.drop(0)
    

@Sriram-R https://github.com/sriram-r Hmmm. Once a new agent is
launched, we don't expect the jobs to get scheduled immediately. There is
some lag between that. If we don't clean the history, isn't there is a
possibility that before this agent is launched and the job is scheduled,
another one might also get launched?


Reply to this email directly or view it on GitHub
https://github.com/ind9/gocd-mesos/pull/3/files#r53551958.

}
}
}

}
123 changes: 119 additions & 4 deletions src/test/scala/com/indix/mesos/GoCDPollerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.indix.mesos
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.mockito.Mockito._
import play.api.libs.json.Json

class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {

Expand All @@ -19,8 +20,10 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {

"GoCDPoller#pollAndAddTask" should "add a goTask after 5 polls" in {
// Given
val pollerSpy = spy(poller);
val pollerSpy = spy(poller)
doReturn(1).when(pollerSpy).goTaskQueueSize()
doReturn(0).when(pollerSpy).goIdleAgentsCount()
doReturn(true).when(pollerSpy).isDemandPositive()

// When
for(i <- 0 to 5) {
Expand All @@ -31,25 +34,64 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
TaskQueue.queue.size() should be(1)
}

"GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if Idle agents count is > 0" in {
// Given
val pollerSpy = spy(poller)
doReturn(1).when(pollerSpy).goTaskQueueSize()
doReturn(2).when(pollerSpy).goIdleAgentsCount()
doReturn(true).when(pollerSpy).isDemandPositive()

// When
for(i <- 0 to 5) {
pollerSpy.pollAndAddTask()
}

// Then
TaskQueue.queue.size() should be(0)
}

"GoCDPoller#pollAndAddTask" should "not add a goTask after 5 polls if isDemandPositive returns false" in {
// Given
val pollerSpy = spy(poller)
doReturn(1).when(pollerSpy).goTaskQueueSize()
doReturn(0).when(pollerSpy).goIdleAgentsCount()
doReturn(false).when(pollerSpy).isDemandPositive()

// When
for(i <- 0 to 5) {
pollerSpy.pollAndAddTask()
}

// Then
TaskQueue.queue.size() should be(0)
}



"GoCDPoller#pollAndAddTask" should "add a goTask after 15 polls" in {
// Given
val pollerSpy = spy(poller);
val pollerSpy = spy(poller)
doReturn(1).when(pollerSpy).goTaskQueueSize()
doReturn(0).when(pollerSpy).goIdleAgentsCount()
doReturn(true).when(pollerSpy).isDemandPositive()


// When
for(i <- 0 to 20) {
pollerSpy.pollAndAddTask()
}

// Then
TaskQueue.queue.size() should be(3)
TaskQueue.queue.size() should be(4)
}

"GoCDPoller#pollAndAddTask" should "add a goTask with expected attributes" in {
// Given
val pollerSpy = spy(poller);
val pollerSpy = spy(poller)
doReturn(1).when(pollerSpy).goTaskQueueSize()
doReturn(0).when(pollerSpy).goIdleAgentsCount()
doReturn(true).when(pollerSpy).isDemandPositive()


// When
for(i <- 0 to 5) {
Expand All @@ -60,4 +102,77 @@ class GoCDPollerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
TaskQueue.queue.size() should be(1)
TaskQueue.dequeue.dockerImage should be ("travix/gocd-agent:latest")
}

"GoCDPoller#goIdleAgentsCount" should "fetch idle agents count given a valid json" in {
val responseJson = Json.parse(
"""
|
|{
| "_links": {
| "self": {
| "href": "https://ci.example.com/go/api/agents"
| },
| "doc": {
| "href": "http://api.go.cd/#agents"
| }
| },
| "_embedded": {
| "agents": [
| {
| "_links": {
| "self": {
| "href": "https://ci.example.com/go/api/agents/adb9540a-b954-4571-9d9b-2f330739d4da"
| },
| "doc": {
| "href": "http://api.go.cd/#agents"
| },
| "find": {
| "href": "https://ci.example.com/go/api/agents/:uuid"
| }
| },
| "uuid": "adb9540a-b954-4571-9d9b-2f330739d4da",
| "hostname": "agent01.example.com",
| "ip_address": "10.12.20.47",
| "enabled": true,
| "sandbox": "/Users/ketanpadegaonkar/projects/gocd/gocd/agent",
| "status": "Idle",
| "operating_system": "Mac OS X",
| "free_space": 84983328768,
| "resources": ["java", "linux", "firefox"],
| "environments": ["perf", "UAT"]
| }
| ]
| }
|}
|
""".stripMargin)
// Given
val pollerSpy = spy(poller)
doReturn(responseJson).when(pollerSpy).jsonRequest("http://localhost:8080/go/api/agents")

// When, then
pollerSpy.goIdleAgentsCount() should be(1)
}

"GoCDPoller#goTaskQueueSize" should "fetch pending jobs count given a valid json" in {
val responseJson =
"""
|<scheduledJobs>
| <job name="job1" id="6">
| <link rel="self" href="https://ci.example.com/go/tab/build/detail/mypipeline/5/defaultStage/1/job1"/>
| <buildLocator>mypipeline/5/defaultStage/1/job1</buildLocator>
| </job>
| <job name="job2" id="7">
| <link rel="self" href="https://ci.example.com/go/tab/build/detail/mypipeline/5/defaultStage/1/job2"/>
| <buildLocator>mypipeline/5/defaultStage/1/job2</buildLocator>
| </job>
|</scheduledJobs>
""".stripMargin
// Given
val pollerSpy = spy(poller)
doReturn(responseJson).when(pollerSpy).request("http://localhost:8080/go/api/jobs/scheduled.xml")

// When, then
pollerSpy.goTaskQueueSize() should be(2)
}
}