@@ -19,12 +19,13 @@ import za.co.absa.hyperdrive.trigger.models.{JobInstance, JobParameters}
19
19
20
20
import scala .concurrent .{ExecutionContext , Future }
21
21
import java .util .UUID .randomUUID
22
+ import java .util .concurrent .{CountDownLatch , TimeUnit }
22
23
23
24
import akka .actor .ActorSystem
24
25
import akka .stream .ActorMaterializer
25
26
26
27
import scala .collection .JavaConverters ._
27
- import org .apache .spark .launcher .SparkLauncher
28
+ import org .apache .spark .launcher .{ SparkAppHandle , SparkLauncher }
28
29
import play .api .libs .json .{JsValue , Json }
29
30
import play .api .libs .ws .ahc .StandaloneAhcWSClient
30
31
import za .co .absa .hyperdrive .trigger .models .enums .JobStatuses ._
@@ -51,9 +52,19 @@ object SparkExecutor extends Executor {
51
52
val id = randomUUID().toString
52
53
val ji = jobInstance.copy(executorJobId = Some (id), jobStatus = Submitting )
53
54
updateJob(ji).map { _ =>
54
- val running = getSparkLauncher(id, ji.jobName, ji.jobParameters).launch()
55
- Thread .sleep(SparkExecutorConfig .getSubmitTimeOut)
56
- running.destroyForcibly()
55
+ val submitTimeOut = SparkExecutorConfig .getSubmitTimeOut
56
+ val latch = new CountDownLatch (1 )
57
+ val sparkAppHandle = getSparkLauncher(id, ji.jobName, ji.jobParameters).startApplication(new SparkAppHandle .Listener {
58
+ override def stateChanged (handle : SparkAppHandle ): Unit =
59
+ if (handle.getState == SparkAppHandle .State .SUBMITTED ) {
60
+ latch.countDown()
61
+ }
62
+ override def infoChanged (handle : SparkAppHandle ): Unit = {
63
+ // do nothing
64
+ }
65
+ })
66
+ latch.await(submitTimeOut, TimeUnit .MILLISECONDS )
67
+ sparkAppHandle.kill()
57
68
}
58
69
}
59
70
0 commit comments