Skip to content

Commit 8c71ecb

Browse files
Feature/467 poc emr submitter (#493)
* Refactor submitJob out in a separate service * Refactor SparkYarnSinkConfig to SparkConfig * Add SparkEmrClusterService * Add custom validation for spark config * Small refactorings * Add support for local aws profile * Fixes * Add config validation * Add test * Fixes after merge
1 parent 5601253 commit 8c71ecb

29 files changed

+804
-239
lines changed

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
<snakeyaml.version>1.26</snakeyaml.version>
122122
<commons-validator.version>1.7</commons-validator.version>
123123
<hibernate.validator.version>6.2.0.Final</hibernate.validator.version>
124+
<aws.java.sdk.bom.version>1.12.29</aws.java.sdk.bom.version>
124125

125126
<!-- Deployment -->
126127
<gpg.plugin.version>1.6</gpg.plugin.version>
@@ -133,6 +134,18 @@
133134
<skip.postgres.tests>true</skip.postgres.tests>
134135
</properties>
135136

137+
<dependencyManagement>
138+
<dependencies>
139+
<dependency>
140+
<groupId>com.amazonaws</groupId>
141+
<artifactId>aws-java-sdk-bom</artifactId>
142+
<version>${aws.java.sdk.bom.version}</version>
143+
<type>pom</type>
144+
<scope>import</scope>
145+
</dependency>
146+
</dependencies>
147+
</dependencyManagement>
148+
136149
<dependencies>
137150
<dependency>
138151
<groupId>com.fasterxml.jackson.module</groupId>
@@ -250,6 +263,10 @@
250263
<artifactId>hibernate-validator</artifactId>
251264
<version>${hibernate.validator.version}</version>
252265
</dependency>
266+
<dependency>
267+
<groupId>com.amazonaws</groupId>
268+
<artifactId>aws-java-sdk-emr</artifactId>
269+
</dependency>
253270

254271
<!-- For Liquibase yaml -->
255272
<dependency>

src/main/resources/application.properties

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ kafkaSource.properties.sasl.jaas.config=
8888
shellExecutor.executablesFolder=/
8989

9090
#Spark yarn sink properties. Properties used to deploy and run Spark job in Yarn.
91+
spark.submitApi=yarn
9192
sparkYarnSink.hadoopResourceManagerUrlBase=http://localhost:8088
93+
sparkYarnSink.userUsedToKillJob=
9294
sparkYarnSink.hadoopConfDir=/opt/hadoop
9395
sparkYarnSink.sparkHome=/opt/spark
9496
sparkYarnSink.master=yarn
@@ -106,7 +108,12 @@ sparkYarnSink.additionalConfs.spark.yarn.principal=
106108
sparkYarnSink.additionalConfs.spark.shuffle.service.enabled=true
107109
sparkYarnSink.additionalConfs.spark.dynamicAllocation.enabled=true
108110
sparkYarnSink.executablesFolder=/
109-
sparkYarnSink.userUsedToKillJob=
111+
112+
spark.emr.clusterId=
113+
spark.emr.filesToDeploy=
114+
spark.emr.additionalConfs=
115+
spark.emr.awsProfile=
116+
spark.emr.region=
110117

111118
#Postgresql properties for connection to trigger metastore
112119
db.driver=net.bull.javamelody.JdbcDriver

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/AppInfoController.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
package za.co.absa.hyperdrive.trigger.api.rest.controllers
1717

1818
import org.springframework.web.bind.annotation._
19-
import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, SparkYarnSinkConfig}
19+
import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, SparkConfig}
2020
import za.co.absa.hyperdrive.trigger.models.AppInfo
2121

2222
import javax.inject.Inject
2323

2424
@RestController
25-
class AppInfoController @Inject()(sparkYarnSinkConfig: SparkYarnSinkConfig, generalConfig: GeneralConfig) {
25+
class AppInfoController @Inject()(sparkConfig: SparkConfig, generalConfig: GeneralConfig) {
2626
val environment: String = generalConfig.environment
2727
val version: String = generalConfig.version
28-
val resourceManagerUrl: String = sparkYarnSinkConfig.hadoopResourceManagerUrlBase
28+
val resourceManagerUrl: String = sparkConfig.hadoopResourceManagerUrlBase
2929

3030
@GetMapping(path = Array("/app/info"))
3131
def appInfo(): AppInfo = {

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/health/YarnConnectionHealthIndicator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ package za.co.absa.hyperdrive.trigger.api.rest.health
1818

1919
import org.springframework.boot.actuate.health.{Health, HealthIndicator}
2020
import org.springframework.stereotype.Component
21-
import za.co.absa.hyperdrive.trigger.configuration.application.{HealthConfig, SparkYarnSinkConfig}
21+
import za.co.absa.hyperdrive.trigger.configuration.application.{HealthConfig, SparkConfig}
2222

2323
import java.net.{HttpURLConnection, MalformedURLException, URL}
2424
import javax.inject.Inject
2525
import scala.util.{Failure, Success, Try}
2626

2727
@Component
28-
class YarnConnectionHealthIndicator @Inject()(sparkYarnSinkConfig: SparkYarnSinkConfig, healthConfig: HealthConfig) extends HealthIndicator {
28+
class YarnConnectionHealthIndicator @Inject()(sparkConfig: SparkConfig, healthConfig: HealthConfig) extends HealthIndicator {
2929
val successCode = 200
3030

3131
override protected def health(): Health = {
32-
val yarnBaseUrl = sparkYarnSinkConfig.hadoopResourceManagerUrlBase.stripSuffix("/")
32+
val yarnBaseUrl = sparkConfig.hadoopResourceManagerUrlBase.stripSuffix("/")
3333
val yarnTestEndpoint = healthConfig.yarnConnectionTestEndpoint.stripPrefix("/")
3434

3535
Try(new URL(s"$yarnBaseUrl/$yarnTestEndpoint")).flatMap(url =>

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobInstanceService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import za.co.absa.hyperdrive.trigger.api.rest.utils.WSClientProvider
2121
import za.co.absa.hyperdrive.trigger.models.JobInstance
2222
import za.co.absa.hyperdrive.trigger.persistance.JobInstanceRepository
2323
import play.api.libs.ws.JsonBodyWritables.writeableOf_JsValue
24-
import za.co.absa.hyperdrive.trigger.configuration.application.SparkYarnSinkConfig
24+
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
2525

2626
import scala.concurrent.{ExecutionContext, Future}
2727
import scala.util.Try
@@ -34,15 +34,15 @@ trait JobInstanceService {
3434

3535
@Service
3636
class JobInstanceServiceImpl(override val jobInstanceRepository: JobInstanceRepository,
37-
sparkYarnSinkConfig: SparkYarnSinkConfig) extends JobInstanceService {
37+
sparkConfig: SparkConfig) extends JobInstanceService {
3838

3939
override def getJobInstances(dagInstanceId: Long)(implicit ec: ExecutionContext): Future[Seq[JobInstance]] = {
4040
jobInstanceRepository.getJobInstances(dagInstanceId)
4141
}
4242

4343
override def killJob(applicationId: String)(implicit ec: ExecutionContext): Future[Boolean] = {
4444
val url: String =
45-
s"${sparkYarnSinkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps/$applicationId/state?user.name=${sparkYarnSinkConfig.userUsedToKillJob}"
45+
s"${sparkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps/$applicationId/state?user.name=${sparkConfig.userUsedToKillJob}"
4646
val data = Json.obj(
4747
"state" -> "KILLED"
4848
)

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.api.rest.services
1717

1818
import org.springframework.stereotype.Service
1919
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{KeysToMerge, MergedValuesSeparator}
20-
import za.co.absa.hyperdrive.trigger.configuration.application.{ShellExecutorConfig, SparkYarnSinkConfig}
20+
import za.co.absa.hyperdrive.trigger.configuration.application.{ShellExecutorConfig, SparkConfig}
2121
import za.co.absa.hyperdrive.trigger.models._
2222

2323
import java.nio.file.Paths
@@ -28,7 +28,7 @@ trait JobTemplateResolutionService {
2828
}
2929

3030
@Service
31-
class JobTemplateResolutionServiceImpl(sparkYarnSinkConfig: SparkYarnSinkConfig, shellExecutorConfig: ShellExecutorConfig) extends JobTemplateResolutionService {
31+
class JobTemplateResolutionServiceImpl(sparkConfig: SparkConfig, shellExecutorConfig: ShellExecutorConfig) extends JobTemplateResolutionService {
3232
def resolveDagDefinitionJoined(dagDefinitionJoined: DagDefinitionJoined, jobTemplates: Seq[JobTemplate]): Seq[ResolvedJobDefinition] = {
3333
val jobTemplatesLookup = jobTemplates.map(t => t.id -> t).toMap
3434
dagDefinitionJoined.jobDefinitions.map(jd => {
@@ -63,22 +63,22 @@ class JobTemplateResolutionServiceImpl(sparkYarnSinkConfig: SparkYarnSinkConfig,
6363

6464
private def mergeSparkAndHyperdriveParameters(definitionParams: HyperdriveDefinitionParameters, templateParams: SparkTemplateParameters): SparkInstanceParameters = {
6565
SparkInstanceParameters(
66-
jobJar = Paths.get(sparkYarnSinkConfig.executablesFolder, templateParams.jobJar.getOrElse("")).toString,
66+
jobJar = Paths.get(sparkConfig.yarn.executablesFolder, templateParams.jobJar.getOrElse("")).toString,
6767
mainClass = templateParams.mainClass.getOrElse(""),
6868
appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments),
69-
additionalJars = mergeLists(definitionParams.additionalJars, templateParams.additionalJars).map(jar => Paths.get(sparkYarnSinkConfig.executablesFolder, jar).toString),
70-
additionalFiles = mergeLists(definitionParams.additionalFiles, templateParams.additionalFiles).map(file => Paths.get(sparkYarnSinkConfig.executablesFolder, file).toString),
69+
additionalJars = mergeLists(definitionParams.additionalJars, templateParams.additionalJars).map(jar => Paths.get(sparkConfig.yarn.executablesFolder, jar).toString),
70+
additionalFiles = mergeLists(definitionParams.additionalFiles, templateParams.additionalFiles).map(file => Paths.get(sparkConfig.yarn.executablesFolder, file).toString),
7171
additionalSparkConfig = mergeMaps(definitionParams.additionalSparkConfig, templateParams.additionalSparkConfig, mergeSortedMapEntries)
7272
)
7373
}
7474

7575
private def mergeSparkParameters(definitionParams: SparkDefinitionParameters, templateParams: SparkTemplateParameters): SparkInstanceParameters = {
7676
SparkInstanceParameters(
77-
jobJar = Paths.get(sparkYarnSinkConfig.executablesFolder, mergeOptionStrings(definitionParams.jobJar, templateParams.jobJar)).toString,
77+
jobJar = Paths.get(sparkConfig.yarn.executablesFolder, mergeOptionStrings(definitionParams.jobJar, templateParams.jobJar)).toString,
7878
mainClass = mergeOptionStrings(definitionParams.mainClass, templateParams.mainClass),
7979
appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments),
80-
additionalJars = mergeLists(definitionParams.additionalJars, templateParams.additionalJars).map(jar => Paths.get(sparkYarnSinkConfig.executablesFolder, jar).toString),
81-
additionalFiles = mergeLists(definitionParams.additionalFiles, templateParams.additionalFiles).map(file => Paths.get(sparkYarnSinkConfig.executablesFolder, file).toString),
80+
additionalJars = mergeLists(definitionParams.additionalJars, templateParams.additionalJars).map(jar => Paths.get(sparkConfig.yarn.executablesFolder, jar).toString),
81+
additionalFiles = mergeLists(definitionParams.additionalFiles, templateParams.additionalFiles).map(file => Paths.get(sparkConfig.yarn.executablesFolder, file).toString),
8282
additionalSparkConfig = mergeMaps(definitionParams.additionalSparkConfig, templateParams.additionalSparkConfig, mergeSortedMapEntries)
8383
)
8484
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
2+
/*
3+
* Copyright 2018 ABSA Group Limited
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.hyperdrive.trigger.configuration.application;
18+
19+
import javax.validation.Constraint;
20+
import javax.validation.Payload;
21+
import java.lang.annotation.ElementType;
22+
import java.lang.annotation.Retention;
23+
import java.lang.annotation.RetentionPolicy;
24+
import java.lang.annotation.Target;
25+
26+
@Target({ ElementType.METHOD, ElementType.FIELD })
27+
@Retention(RetentionPolicy.RUNTIME)
28+
@Constraint(validatedBy = AwsRegionValidator.class)
29+
public @interface AwsRegion {
30+
31+
String message() default "{awsRegion.constraint}";
32+
33+
Class<?>[] groups() default {};
34+
35+
Class<? extends Payload>[] payload() default {};
36+
}
37+

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executor.scala renamed to src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/AwsRegionValidator.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/*
23
* Copyright 2018 ABSA Group Limited
34
*
@@ -13,13 +14,14 @@
1314
* limitations under the License.
1415
*/
1516

16-
package za.co.absa.hyperdrive.trigger.scheduler.executors
17-
18-
import za.co.absa.hyperdrive.trigger.models.{JobInstance, JobInstanceParameters}
17+
package za.co.absa.hyperdrive.trigger.configuration.application
1918

20-
import scala.concurrent.{ExecutionContext, Future}
19+
import com.amazonaws.regions.Regions
20+
import javax.validation.{ConstraintValidator, ConstraintValidatorContext}
21+
import scala.util.Try
2122

22-
trait Executor[T <: JobInstanceParameters] {
23-
def execute(jobInstance: JobInstance, jobParameters: T, updateJob: JobInstance => Future[Unit])
24-
(implicit executionContext: ExecutionContext, executorConfig: ExecutorConfig): Future[Unit]
23+
class AwsRegionValidator extends ConstraintValidator[AwsRegion, String]{
24+
override def isValid(awsRegion: String, constraintValidatorContext: ConstraintValidatorContext): Boolean = {
25+
awsRegion == null || awsRegion.isEmpty || Try(Regions.fromName(awsRegion)).isSuccess
26+
}
2527
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
2+
/*
3+
* Copyright 2018 ABSA Group Limited
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.hyperdrive.trigger.configuration.application
18+
19+
import org.springframework.boot.context.properties.bind.{DefaultValue, Name}
20+
import org.springframework.boot.context.properties.{ConfigurationProperties, ConstructorBinding}
21+
import org.springframework.validation.annotation.Validated
22+
import org.springframework.validation.{Errors, Validator}
23+
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig.{toNonEmptyOption, transformAdditionalConfsProperty, transformFilesProperty}
24+
25+
import java.util.Properties
26+
import javax.validation.constraints.{NotBlank, NotNull}
27+
import scala.annotation.meta.field
28+
29+
@ConfigurationProperties
30+
@ConstructorBinding
31+
@Validated
32+
class SparkConfig (
33+
@DefaultValue(Array("yarn"))
34+
@Name("spark.submitApi")
35+
val submitApi: String,
36+
@Name("sparkYarnSink")
37+
val yarn: SparkYarnSinkConfig,
38+
@Name("spark.emr")
39+
val emr: SparkEmrSinkConfig,
40+
@(NotBlank @field)
41+
@Name("sparkYarnSink.hadoopResourceManagerUrlBase")
42+
val hadoopResourceManagerUrlBase: String,
43+
@DefaultValue(Array("Unknown"))
44+
@Name("sparkYarnSink.userUsedToKillJob")
45+
val userUsedToKillJob: String
46+
) {
47+
if (submitApi == "yarn" && yarn == null) {
48+
throw new RuntimeException("If spark.submitApi is yarn, sparkYarnSink arguments are required")
49+
} else if (submitApi == "emr" && emr == null) {
50+
throw new RuntimeException("If spark.submitApi is emr, spark.emr arguments are required")
51+
} else if (submitApi != "yarn" && submitApi != "emr") {
52+
throw new RuntimeException("spark.submitApi has to be either 'yarn' or 'emr'")
53+
}
54+
}
55+
56+
object SparkConfig {
57+
def transformFilesProperty(filesToDeployInternal: String): Seq[String] = Option(filesToDeployInternal)
58+
.map(_.split(",").toSeq)
59+
.getOrElse(Seq())
60+
.filter(_.nonEmpty)
61+
62+
def transformAdditionalConfsProperty(additionalConfsInternal: Properties): Map[String, String] = {
63+
import scala.collection.JavaConverters._
64+
Option(additionalConfsInternal)
65+
.map(_.asScala.toMap).getOrElse(Map())
66+
}
67+
68+
def toNonEmptyOption(string: String): Option[String] = {
69+
Option(string).collect { case x if x.trim.nonEmpty => x}
70+
}
71+
}
72+
73+
class SparkYarnSinkConfig (
74+
@NotNull
75+
val submitTimeout: Int,
76+
@(NotBlank @field)
77+
val hadoopConfDir: String,
78+
@(NotBlank @field)
79+
val master: String,
80+
@(NotBlank @field)
81+
val sparkHome: String,
82+
@Name("filesToDeploy")
83+
filesToDeployInternal: String,
84+
@Name("additionalConfs")
85+
additionalConfsInternal: Properties,
86+
@NotNull
87+
val executablesFolder: String
88+
) {
89+
val filesToDeploy: Seq[String] = transformFilesProperty(filesToDeployInternal)
90+
val additionalConfs: Map[String, String] = transformAdditionalConfsProperty(additionalConfsInternal)
91+
}
92+
93+
class SparkEmrSinkConfig (
94+
@NotNull
95+
val clusterId: String,
96+
@Name("awsProfile")
97+
awsProfileInternal: String,
98+
@Name("region")
99+
@(AwsRegion @field)(message = "Not a valid aws region")
100+
val regionInternal: String,
101+
@Name("filesToDeploy")
102+
filesToDeployInternal: String,
103+
@Name("additionalConfs")
104+
additionalConfsInternal: Properties
105+
) {
106+
val awsProfile: Option[String] = toNonEmptyOption(awsProfileInternal)
107+
val region: Option[String] = toNonEmptyOption(regionInternal)
108+
val filesToDeploy: Seq[String] = transformFilesProperty(filesToDeployInternal)
109+
val additionalConfs: Map[String, String] = transformAdditionalConfsProperty(additionalConfsInternal)
110+
}

0 commit comments

Comments
 (0)