Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package org.trustedanalytics.atk.engine.command.mgmt

import java.util.concurrent.TimeUnit
import java.net._

import org.joda.time.{ DateTimeZone, LocalDateTime }
import org.trustedanalytics.atk.domain.jobcontext.JobContext
import org.trustedanalytics.atk.engine.{ EngineConfig, Engine }
import org.trustedanalytics.atk.engine.plugin.Invocation
Expand All @@ -28,22 +30,37 @@ import org.trustedanalytics.atk.event.EventLogging
class YarnJobsMonitor(engine: Engine)(implicit invocation: Invocation) extends Runnable with EventLogging {

lazy val timeoutMinutes: Long = EngineConfig.yarnMonitorTaskTimeout
lazy val timeoutMillis: Long = timeoutMinutes * 60 * 1000

def run(): Unit = {
info(s"YarnJobsMonitor started. Task timeout is $timeoutMinutes minutes.")
val localHost = InetAddress.getLocalHost
val nowMillis = System.currentTimeMillis()
info(s"YarnJobsMonitor started on $localHost at $nowMillis total ms. Task timeout is $timeoutMinutes minutes (or $timeoutMillis ms).")
while (true) {
engine.getCommandsNotComplete().foreach { command =>
engine.getCommandJobContext(command) match {
case Some(context) => if (hasStaleContext(context)) {
engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting."))
}
case Some(context) =>
val (answer, msg) = hasStaleContext(context)
if (answer) {
engine.cancelCommand(command.id, Some(s" by ATK context monitor due to timeout. The job context ${context.clientId} has not provided an update for more than $timeoutMinutes minutes. This may indicate that a task is running for a very long time. Try increasing the 'trustedanalytics.atk.engine.yarn-monitor-task-timeout' config setting. Details: $msg"))
}
case None => ; // there is no know YARN job to shutdown (command remains not complete, but this is not the responsibility of a YARN jobs monitor
}
}
TimeUnit.MINUTES.sleep(timeoutMinutes)
}
}

def hasStaleContext(context: JobContext): Boolean =
System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000
def hasStaleContext(context: JobContext): (Boolean, String) = {
val localHost = InetAddress.getLocalHost
//val nowMillis = System.currentTimeMillis()
val now = new LocalDateTime().toDateTime(DateTimeZone.UTC)
val nowMillis = now.getMillis
val lastModMillis = context.modifiedOn.getMillis
val msg = s"YarnJobsMonitor hasStaleContext check called by $localHost: $nowMillis - $lastModMillis > $timeoutMillis"
info(msg)
//System.currentTimeMillis() - context.modifiedOn.getMillis > timeoutMinutes * 60 * 1000
val answer = nowMillis - lastModMillis > timeoutMillis
(answer, msg)
}
}
3 changes: 2 additions & 1 deletion engine/interfaces/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ trustedanalytics.atk {
fs {
# the system will create an "intelanalaytics" folder at this location.
# All Trusted Analytics Toolkit files will be stored somewhere under that base location.
root = "hdfs://invalid-fsroot-host/user/atkuser"
//root = "hdfs://invalid-fsroot-host/user/atkuser"
root = "hdfs://paulsimon.hf.intel.com:8020/user/iauser"

# Directory to load checkpoints into
checkpoint-directory = ${trustedanalytics.atk.engine.fs.root}"/checkpoints"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package org.trustedanalytics.atk.repository

import org.apache.commons.dbcp.BasicDataSource
import java.net._
import com.github.tototoshi.slick.GenericJodaSupport

import org.trustedanalytics.atk.domain.gc.{ GarbageCollectionEntryTemplate, GarbageCollectionEntry, GarbageCollection, GarbageCollectionTemplate }
import org.trustedanalytics.atk.domain.jobcontext.{ JobContext, JobContextTemplate }
import org.trustedanalytics.atk.domain.schema.Schema
import org.joda.time.DateTime
import org.joda.time.{ LocalDateTime, DateTimeZone, DateTime }
import scala.slick.driver.JdbcDriver
import org.flywaydb.core.Flyway
import spray.json._
Expand Down Expand Up @@ -58,6 +59,8 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging {
// Different versions of implicits are imported here based on the driver
import genericJodaSupport._

def getNowTime(): DateTime = new LocalDateTime().toDateTime(DateTimeZone.UTC)

// Defining mappings for custom column types
implicit val schemaColumnType = MappedColumnType.base[Schema, String](
{ schema => schema.toJson.prettyPrint }, // Schema to String
Expand Down Expand Up @@ -1230,7 +1233,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging {
}

override def insert(jobContext: JobContextTemplate)(implicit session: Session): Try[JobContext] = Try {
val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, new DateTime(), new DateTime(), None, None)
val m = JobContext(1, jobContext.userId, None, None, jobContext.clientId, getNowTime(), getNowTime(), None, None)
jobContextAutoInc.insert(m)
}

Expand All @@ -1244,18 +1247,24 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging {

override def updateJobServerUri(id: Long, uri: String)(implicit session: Session): Unit = {
val columns = for (c <- jobContextTable if c.id === id) yield (c.jobServerUri, c.modifiedOn)
columns.update(Some(uri), new DateTime)
columns.update(Some(uri), getNowTime())
}

override def updateProgress(id: Long, progress: String)(implicit session: Session): Unit = {
val columns = for (c <- jobContextTable if c.id === id) yield (c.progress, c.modifiedOn)
columns.update(Some(progress), new DateTime)
val localHost = InetAddress.getLocalHost
//val now = new DateTime
val now = getNowTime()
val nowMillis = now.getMillis
info(s"JobContextTable.updateProgress from host $localHost at $now ($nowMillis)")
columns.update(Some(progress), now)
//columns.update(Some(progress), new DateTime)

}

override def updateYarnAppName(id: Long, yarnAppName: String)(implicit session: Session): Unit = {
val columns = for (c <- jobContextTable if c.id === id) yield (c.yarnAppName, c.modifiedOn)
columns.update(Some(yarnAppName), new DateTime)
columns.update(Some(yarnAppName), getNowTime)
}

override def scan(offset: Int = 0, count: Int = defaultScanCount)(implicit session: Session): Seq[JobContext] = {
Expand All @@ -1278,7 +1287,7 @@ trait SlickMetaStoreComponent extends MetaStoreComponent with EventLogging {
}

override def lookupRecentlyActive(seconds: Int)(implicit session: Session): Seq[JobContext] = {
val recentTime = (new DateTime).minusSeconds(seconds)
val recentTime = (getNowTime()).minusSeconds(seconds)
jobContextTable.where(_.modifiedOn >= recentTime).list
}

Expand Down