Skip to content

Commit 3a141a0

Browse files
authored
#738: Resurrect scheduler (#740)
* #738: Resurrect scheduler
1 parent 60bc41f commit 3a141a0

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ class JobScheduler @Inject() (
8484
def stopManager(): Future[Unit] = {
8585
logger.info("Stopping Manager")
8686
isManagerRunningAtomic.set(false)
87-
sensors.cleanUpSensors()
88-
workflowBalancer.resetSchedulerInstanceId()
87+
cleanUp()
8988
runningScheduler
9089
}
9190

@@ -97,8 +96,8 @@ class JobScheduler @Inject() (
9796
runningAssignWorkflows = workflowBalancer
9897
.getAssignedWorkflows(runningDags.keys.map(_.workflowId).toSeq)
9998
.recover { case e: SchedulerInstanceAlreadyDeactivatedException =>
100-
logger.error("Stopping scheduler because the instance has already been deactivated", e)
101-
stopManager()
99+
logger.warn("Restarting scheduler because the instance has been deactivated by other instance", e)
100+
cleanUp()
102101
throw e
103102
}
104103
.map(_.map(_.id))
@@ -166,4 +165,8 @@ class JobScheduler @Inject() (
166165
}
167166
}
168167

168+
private def cleanUp(): Unit = {
169+
sensors.cleanUpSensors()
170+
workflowBalancer.resetSchedulerInstanceId()
171+
}
169172
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@ import play.api.libs.json.{JsValue, Json}
1919
import play.api.libs.ws.JsonBodyReadables._
2020
import za.co.absa.hyperdrive.trigger.api.rest.utils.WSClientProvider
2121
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
22+
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses
2223
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses._
2324
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}
2425
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{FinalStatuses => YarnFinalStatuses}
2526

27+
import java.time.LocalDateTime
28+
import java.time.temporal.ChronoUnit
2629
import scala.concurrent.{ExecutionContext, Future}
2730

2831
object SparkExecutor {
32+
private val ExtraSubmitTimeout = 60000
33+
2934
def execute(
3035
jobInstance: JobInstance,
3136
jobParameters: SparkInstanceParameters,
@@ -50,6 +55,13 @@ object SparkExecutor {
5055
}) match {
5156
case Seq(first) =>
5257
updateJob(jobInstance.copy(applicationId = Some(first.id), jobStatus = getStatus(first.finalStatus)))
58+
case _
59+
// It relies on the same value set for sparkYarnSink.submitTimeout in multi instance deployment
60+
if jobInstance.jobStatus == JobStatuses.Submitting && jobInstance.updated
61+
.map(lastUpdated => ChronoUnit.MILLIS.between(lastUpdated, LocalDateTime.now()))
62+
.exists(_ < sparkConfig.yarn.submitTimeout + ExtraSubmitTimeout) =>
63+
// Do nothing for submit timeout period to avoid two parallel job submissions/executions
64+
Future((): Unit)
5365
case _ => sparkClusterService.handleMissingYarnStatus(jobInstance, updateJob)
5466
}
5567
}

0 commit comments

Comments
 (0)