Skip to content

Commit 27c09eb

Browse files
authored
#698 - Add support for extra yarn tags (#702)
* #698 - Add support for extra yarn tags
1 parent 89c0765 commit 27c09eb

File tree

7 files changed

+175
-17
lines changed

7 files changed

+175
-17
lines changed

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package za.co.absa.hyperdrive.trigger.api.rest.services
1717

1818
import org.springframework.stereotype.Service
19-
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{KeysToMerge, MergedValuesSeparator}
19+
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{SparkExtraJavaOptions, SparkTags}
2020
import za.co.absa.hyperdrive.trigger.models._
2121
import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.{SparkConfigList, SparkConfigMap}
2222

@@ -124,8 +124,12 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService {
124124
secondary ++ primary
125125

126126
private def mergeSortedMapEntries(key: String, firstValue: String, secondValue: String): String =
127-
if (KeysToMerge.contains(key)) {
128-
s"$secondValue$MergedValuesSeparator$firstValue".trim
127+
if (SparkExtraJavaOptions.KeysToMerge.contains(key)) {
128+
s"$secondValue${SparkExtraJavaOptions.MergedValuesSeparator}$firstValue".trim
129+
} else if (SparkTags.KeysToMerge.contains(key)) {
130+
(
131+
secondValue.split(SparkTags.MergedValuesSeparator) ++ firstValue.split(SparkTags.MergedValuesSeparator)
132+
).toSet[String].map(_.trim).mkString(SparkTags.MergedValuesSeparator)
129133
} else {
130134
firstValue
131135
}

src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/JobDefinitionConfig.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
package za.co.absa.hyperdrive.trigger.configuration.application
1717

1818
object JobDefinitionConfig {
19-
val KeysToMerge = Set("spark.executor.extraJavaOptions", "spark.driver.extraJavaOptions")
20-
val MergedValuesSeparator = " "
19+
object SparkExtraJavaOptions {
20+
val KeysToMerge = Set("spark.executor.extraJavaOptions", "spark.driver.extraJavaOptions")
21+
val MergedValuesSeparator = " "
22+
}
23+
24+
object SparkTags {
25+
val KeysToMerge = Set("spark.yarn.tags")
26+
val MergedValuesSeparator = ","
27+
}
2128
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkClusterService.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515

1616
package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
1717

18-
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{KeysToMerge, MergedValuesSeparator}
18+
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{SparkExtraJavaOptions, SparkTags}
1919
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}
2020

21-
import scala.concurrent.{ExecutionContext, Future}
21+
import scala.concurrent.Future
2222

2323
trait SparkClusterService {
2424
def submitJob(
@@ -32,10 +32,21 @@ trait SparkClusterService {
3232
protected def mergeAdditionalSparkConfig(
3333
globalConfig: Map[String, String],
3434
jobConfig: Map[String, String]
35-
): Map[String, String] =
36-
KeysToMerge.map { key =>
35+
): Map[String, String] = {
36+
val extraJavaOptionsMerge = SparkExtraJavaOptions.KeysToMerge.map { key =>
3737
val globalValue = globalConfig.getOrElse(key, "")
3838
val jobValue = jobConfig.getOrElse(key, "")
39-
key -> s"$globalValue$MergedValuesSeparator$jobValue".trim
40-
}.toMap
39+
key -> s"$globalValue${SparkExtraJavaOptions.MergedValuesSeparator}$jobValue".trim
40+
}
41+
val tagsOptions = SparkTags.KeysToMerge.map { key =>
42+
val globalValue = globalConfig.get(key)
43+
val jobValue = jobConfig.get(key)
44+
val value = (
45+
globalValue.map(_.split(SparkTags.MergedValuesSeparator)).getOrElse(Array.empty[String]) ++
46+
jobValue.map(_.split(SparkTags.MergedValuesSeparator)).getOrElse(Array.empty[String])
47+
).toSet[String].map(_.trim).mkString(SparkTags.MergedValuesSeparator)
48+
if (value.nonEmpty) Some(key -> value) else None
49+
}
50+
(extraJavaOptionsMerge ++ tagsOptions.flatten).toMap
51+
}
4152
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,11 @@ class SparkEmrClusterServiceImpl @Inject() (
139139
private def getSparkArgs(id: String, jobName: String, jobParameters: SparkInstanceParameters) = {
140140
val config = sparkConfig.emr
141141
val sparkSubmitConfs = Map("--deploy-mode" -> "cluster")
142-
val confs = Map("spark.app.name" -> jobName, "spark.yarn.tags" -> id) ++
142+
val confs = Map("spark.app.name" -> jobName) ++
143143
config.additionalConfs ++
144144
jobParameters.additionalSparkConfig.toKeyValueMap ++
145145
mergeAdditionalSparkConfig(
146-
config.additionalConfs,
146+
config.additionalConfs ++ Map("spark.yarn.tags" -> id),
147147
jobParameters.additionalSparkConfig.toKeyValueMap
148148
)
149149
val files = config.filesToDeploy ++ jobParameters.additionalFiles

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkYarnClusterServiceImpl.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class SparkYarnClusterServiceImpl @Inject() (implicit
8888
.setAppResource(jobParameters.jobJar)
8989
.setAppName(jobName)
9090
.setConf("spark.app.name", jobName)
91-
.setConf("spark.yarn.tags", id)
9291
.addAppArgs(jobParameters.appArguments.toSeq.map(fix_json_for_yarn): _*)
9392
.addSparkArg("--verbose")
9493
config.filesToDeploy.foreach(file => sparkLauncher.addFile(file))
@@ -97,10 +96,9 @@ class SparkYarnClusterServiceImpl @Inject() (implicit
9796
jobParameters.additionalFiles.foreach(sparkLauncher.addFile)
9897
jobParameters.additionalSparkConfig.foreach(conf => sparkLauncher.setConf(conf.key, conf.value))
9998
mergeAdditionalSparkConfig(
100-
config.additionalConfs,
99+
config.additionalConfs ++ Map("spark.yarn.tags" -> id),
101100
jobParameters.additionalSparkConfig.toKeyValueMap
102-
)
103-
.foreach(conf => sparkLauncher.setConf(conf._1, conf._2))
101+
).foreach(conf => sparkLauncher.setConf(conf._1, conf._2))
104102

105103
sparkLauncher
106104
}

src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,41 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers {
369369
)
370370
}
371371

372+
it should "in additionalSparkConfig, concatenate the values and filter duplicates if the key is spark.yarn.tags" in {
373+
// given
374+
val userParameters = SparkDefinitionParameters(
375+
jobType = JobTypes.Spark,
376+
jobJar = None,
377+
mainClass = None,
378+
additionalSparkConfig = List(
379+
AdditionalSparkConfig("spark.yarn.tags", "first,second,third")
380+
)
381+
)
382+
val templateParameters = SparkTemplateParameters(
383+
jobType = JobTypes.Spark,
384+
jobJar = "jobJar",
385+
mainClass = "mainClass",
386+
additionalSparkConfig = List(
387+
AdditionalSparkConfig("spark.yarn.tags", "third,first,fourth")
388+
)
389+
)
390+
391+
val jobTemplate = GenericSparkJobTemplate.copy(jobParameters = templateParameters)
392+
val jobDefinition = createJobDefinition().copy(jobTemplateId = Some(jobTemplate.id), jobParameters = userParameters)
393+
val dagDefinitionJoined = createDagDefinitionJoined(jobDefinition)
394+
395+
// when
396+
val resolvedJobDefinitions = underTest.resolveDagDefinitionJoined(dagDefinitionJoined, Seq(jobTemplate))
397+
398+
// then
399+
val resolvedJobDefinition = resolvedJobDefinitions.head
400+
resolvedJobDefinition.jobParameters
401+
.asInstanceOf[SparkInstanceParameters]
402+
.additionalSparkConfig should contain theSameElementsAs List(
403+
AdditionalSparkConfig("spark.yarn.tags", "third,first,fourth,second")
404+
)
405+
}
406+
372407
it should "throw an error if the jobTemplate is of the different type as job definiton" in {
373408
// given
374409
val jobTemplate = GenericShellJobTemplate
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
17+
18+
import org.scalatest.{FlatSpec, Matchers}
19+
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}
20+
21+
import scala.concurrent.Future
22+
23+
class SparkClusterServiceTest extends FlatSpec with Matchers with SparkClusterService {
24+
25+
override def submitJob(
26+
jobInstance: JobInstance,
27+
jobParameters: SparkInstanceParameters,
28+
updateJob: JobInstance => Future[Unit]
29+
): Future[Unit] = Future.successful()
30+
31+
override def handleMissingYarnStatus(
32+
jobInstance: JobInstance,
33+
updateJob: JobInstance => Future[Unit]
34+
): Future[Unit] = Future.successful()
35+
36+
"SparkClusterService.mergeAdditionalSparkConfig" should "merge empty inputs" in {
37+
val first = Map.empty[String, String]
38+
val second = Map.empty[String, String]
39+
40+
val result = this.mergeAdditionalSparkConfig(first, second)
41+
42+
result should contain theSameElementsAs Map(
43+
"spark.executor.extraJavaOptions" -> "",
44+
"spark.driver.extraJavaOptions" -> ""
45+
)
46+
}
47+
48+
it should "concatenate the values if the key is extraJavaOptions" in {
49+
val first = Map(
50+
"spark.driver.extraJavaOptions" -> "-user.prop=userDriver",
51+
"spark.executor.extraJavaOptions" -> "-user.prop=userExecutor"
52+
)
53+
val second = Map(
54+
"spark.driver.extraJavaOptions" -> "-template.prop=templateDriver",
55+
"spark.executor.extraJavaOptions" -> "-template.prop=templateExecutor"
56+
)
57+
58+
val result = this.mergeAdditionalSparkConfig(first, second)
59+
60+
result should contain theSameElementsAs Map(
61+
"spark.driver.extraJavaOptions" -> "-user.prop=userDriver -template.prop=templateDriver",
62+
"spark.executor.extraJavaOptions" -> "-user.prop=userExecutor -template.prop=templateExecutor"
63+
)
64+
}
65+
66+
it should "concatenate the values if the key is spark.yarn.tags" in {
67+
val first = Map(
68+
"spark.yarn.tags" -> "first,second,third"
69+
)
70+
val second = Map(
71+
"spark.yarn.tags" -> "third,first,fourth"
72+
)
73+
74+
val result = this.mergeAdditionalSparkConfig(first, second)
75+
76+
result should contain theSameElementsAs Map(
77+
"spark.yarn.tags" -> "first,second,third,fourth",
78+
"spark.driver.extraJavaOptions" -> "",
79+
"spark.executor.extraJavaOptions" -> ""
80+
)
81+
}
82+
83+
it should "concatenate the values if the key is extraJavaOptions or spark.yarn.tags" in {
84+
val first = Map(
85+
"spark.driver.extraJavaOptions" -> "-user.prop=userDriver",
86+
"spark.executor.extraJavaOptions" -> "-user.prop=userExecutor",
87+
"spark.yarn.tags" -> "first,second,third"
88+
)
89+
val second = Map(
90+
"spark.driver.extraJavaOptions" -> "-template.prop=templateDriver",
91+
"spark.executor.extraJavaOptions" -> "-template.prop=templateExecutor",
92+
"spark.yarn.tags" -> "third,first,fourth"
93+
)
94+
95+
val result = this.mergeAdditionalSparkConfig(first, second)
96+
97+
result should contain theSameElementsAs Map(
98+
"spark.driver.extraJavaOptions" -> "-user.prop=userDriver -template.prop=templateDriver",
99+
"spark.executor.extraJavaOptions" -> "-user.prop=userExecutor -template.prop=templateExecutor",
100+
"spark.yarn.tags" -> "first,second,third,fourth"
101+
)
102+
}
103+
}

0 commit comments

Comments
 (0)