Skip to content

Commit 89c0765

Browse files
authored
Feature/689 keep order of key value input fields (#695)
#689 - keep order of key value input fields
1 parent ddc6a4b commit 89c0765

File tree

39 files changed

+794
-125
lines changed

39 files changed

+794
-125
lines changed

src/main/resources/db_scripts/liquibase/db.changelog.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,8 @@ databaseChangeLog:
8282
file: v0.5.5.optimize-dag-run-query.yml
8383
- include:
8484
relativeToChangelogFile: true
85-
file: v0.5.7.add-workflow-version-field.yml
85+
file: v0.5.7.add-workflow-version-field.yml
86+
- include:
87+
relativeToChangelogFile: true
88+
file: v0.5.11.additional-spark-config-map-to-array.yml
89+
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Convert job_definition's additionalSparkConfig from map into array
17+
update job_definition
18+
set job_parameters = jsonb_set(
19+
job_definition.job_parameters::jsonb,
20+
array ['additionalSparkConfig'],
21+
updated_additional_spark_config.additional_spark_config,
22+
true
23+
)
24+
from (
25+
select job_definition_for_join.id, coalesce(aggregated_additional_spark_config.additional_spark_config::jsonb, '[]'::jsonb) as additional_spark_config
26+
from (
27+
select flattened_additional_spark_config.id, jsonb_agg(flattened_additional_spark_config.additional_spark_config)::jsonb as additional_spark_config
28+
from (
29+
select id, jsonb_each_text(job_definition.job_parameters::jsonb -> 'additionalSparkConfig') as additional_spark_config
30+
from job_definition
31+
) as flattened_additional_spark_config
32+
group by flattened_additional_spark_config.id
33+
) as aggregated_additional_spark_config
34+
right join job_definition as job_definition_for_join
35+
on job_definition_for_join.id = aggregated_additional_spark_config.id
36+
) as updated_additional_spark_config
37+
where updated_additional_spark_config.id = job_definition.id;
38+
39+
-- Convert job_instance's additionalSparkConfig from map into array
40+
update job_instance
41+
set job_parameters = jsonb_set(
42+
job_instance.job_parameters::jsonb,
43+
array ['additionalSparkConfig'],
44+
updated_additional_spark_config.additional_spark_config,
45+
true
46+
)
47+
from (
48+
select job_instance_for_join.id, coalesce(aggregated_additional_spark_config.additional_spark_config::jsonb, '[]'::jsonb) as additional_spark_config
49+
from (
50+
select flattened_additional_spark_config.id, jsonb_agg(flattened_additional_spark_config.additional_spark_config)::jsonb as additional_spark_config
51+
from (
52+
select id, jsonb_each_text(job_instance.job_parameters::jsonb -> 'additionalSparkConfig') as additional_spark_config
53+
from job_instance
54+
) as flattened_additional_spark_config
55+
group by flattened_additional_spark_config.id
56+
) as aggregated_additional_spark_config
57+
right join job_instance as job_instance_for_join
58+
on job_instance_for_join.id = aggregated_additional_spark_config.id
59+
) as updated_additional_spark_config
60+
where updated_additional_spark_config.id = job_instance.id;
61+
62+
-- Convert job_template's additionalSparkConfig from map into array
63+
update job_template
64+
set job_parameters = jsonb_set(
65+
job_template.job_parameters::jsonb,
66+
array ['additionalSparkConfig'],
67+
(updated_additional_spark_config.additional_spark_config),
68+
true
69+
)
70+
from (
71+
select job_template_for_join.id, coalesce(aggregated_additional_spark_config.additional_spark_config::jsonb, '[]'::jsonb) as additional_spark_config
72+
from (
73+
select flattened_additional_spark_config.id, jsonb_agg(flattened_additional_spark_config.additional_spark_config)::jsonb as additional_spark_config
74+
from (
75+
select id, jsonb_each_text(job_template.job_parameters::jsonb -> 'additionalSparkConfig') as additional_spark_config
76+
from job_template
77+
) as flattened_additional_spark_config
78+
group by flattened_additional_spark_config.id
79+
) as aggregated_additional_spark_config
80+
right join job_template as job_template_for_join
81+
on job_template_for_join.id = aggregated_additional_spark_config.id
82+
) as updated_additional_spark_config
83+
where updated_additional_spark_config.id = job_template.id;
84+
85+
-- Convert job_template_history's additionalSparkConfig from map into array
86+
update job_template_history
87+
set job_template = jsonb_set(
88+
job_template_history.job_template::jsonb,
89+
array ['jobParameters','additionalSparkConfig'],
90+
(updated_additional_spark_config.additional_spark_config),
91+
true
92+
)
93+
from (
94+
select job_template_history_for_join.id, coalesce(aggregated_additional_spark_config.additional_spark_config::jsonb, '[]'::jsonb) as additional_spark_config
95+
from (
96+
select flattened_additional_spark_config.id, jsonb_agg(flattened_additional_spark_config.additional_spark_config)::jsonb as additional_spark_config
97+
from (
98+
select id, jsonb_each_text(job_template_history.job_template::jsonb -> 'jobParameters' -> 'additionalSparkConfig') as additional_spark_config
99+
from job_template_history
100+
) as flattened_additional_spark_config
101+
group by flattened_additional_spark_config.id
102+
) as aggregated_additional_spark_config
103+
right join job_template_history as job_template_history_for_join
104+
on job_template_history_for_join.id = aggregated_additional_spark_config.id
105+
) as updated_additional_spark_config
106+
where updated_additional_spark_config.id = job_template_history.id;
107+
108+
-- Convert workflow_history's additionalSparkConfig from map into array
109+
with flattened_job_definitions as (
110+
select id, jsonb_array_elements(workflow::jsonb -> 'dagDefinitionJoined' -> 'jobDefinitions') as job_definition
111+
from workflow_history
112+
),
113+
flattened_job_definitions_with_ids as (
114+
select flattened_job_definitions.id, flattened_job_definitions.job_definition::jsonb -> 'id' as job_id, flattened_job_definitions.job_definition as job_definition
115+
from flattened_job_definitions
116+
),
117+
updated_additional_spark_config as (
118+
select flattened_job_definitions_with_ids_for_join.id, flattened_job_definitions_with_ids_for_join.job_id, coalesce(aggregated_additional_spark_config.additional_spark_config::jsonb, '[]'::jsonb) as additional_spark_config
119+
from (
120+
select flattened_additional_spark_config.id, job_id, jsonb_agg(flattened_additional_spark_config.additional_spark_config)::jsonb as additional_spark_config
121+
from (
122+
select id, job_id, jsonb_each_text(job_definition::jsonb -> 'jobParameters' -> 'additionalSparkConfig') as additional_spark_config
123+
from flattened_job_definitions_with_ids
124+
) as flattened_additional_spark_config
125+
group by flattened_additional_spark_config.id, job_id
126+
) as aggregated_additional_spark_config
127+
right join flattened_job_definitions_with_ids as flattened_job_definitions_with_ids_for_join
128+
on flattened_job_definitions_with_ids_for_join.id = aggregated_additional_spark_config.id
129+
AND flattened_job_definitions_with_ids_for_join.job_id = aggregated_additional_spark_config.job_id
130+
),
131+
updated_job_definitons as (
132+
select flattened_job_definitions_with_ids.id as id, flattened_job_definitions_with_ids.job_id,
133+
jsonb_set(
134+
flattened_job_definitions_with_ids.job_definition::jsonb,
135+
array ['jobParameters','additionalSparkConfig'],
136+
(updated_additional_spark_config.additional_spark_config),
137+
true
138+
) as job_definitions
139+
from flattened_job_definitions_with_ids, updated_additional_spark_config
140+
where flattened_job_definitions_with_ids.id = updated_additional_spark_config.id AND flattened_job_definitions_with_ids.job_id = updated_additional_spark_config.job_id
141+
),
142+
aggregated_job_definitions as (
143+
select updated_job_definitons.id, jsonb_agg(updated_job_definitons.job_definitions) as job_definitions
144+
from updated_job_definitons
145+
group by updated_job_definitons.id
146+
)
147+
update workflow_history
148+
set workflow = jsonb_set(workflow::jsonb, array['dagDefinitionJoined', 'jobDefinitions'], aggregated_job_definitions.job_definitions)
149+
from aggregated_job_definitions
150+
where aggregated_job_definitions.id=workflow_history.id;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.5.11.additional-spark-config-map-to-array
19+
logicalFilePath: v0.5.11.additional-spark-config-map-to-array
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.5.11.additional-spark-config-map-to-array.sql

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package za.co.absa.hyperdrive.trigger.api.rest.services
1818
import org.springframework.stereotype.Service
1919
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{KeysToMerge, MergedValuesSeparator}
2020
import za.co.absa.hyperdrive.trigger.models._
21+
import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.{SparkConfigList, SparkConfigMap}
2122

2223
import scala.util.{Failure, Success, Try}
2324

@@ -103,8 +104,11 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService {
103104
appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments),
104105
additionalJars = mergeLists(definitionParams.additionalJars, templateParams.additionalJars),
105106
additionalFiles = mergeLists(definitionParams.additionalFiles, templateParams.additionalFiles),
106-
additionalSparkConfig =
107-
mergeMaps(definitionParams.additionalSparkConfig, templateParams.additionalSparkConfig, mergeSortedMapEntries)
107+
additionalSparkConfig = mergeMaps(
108+
definitionParams.additionalSparkConfig.toKeyValueMap,
109+
templateParams.additionalSparkConfig.toKeyValueMap,
110+
mergeSortedMapEntries
111+
).toAdditionalSparkConfigList
108112
)
109113

110114
private def mergeShellParameters(

ui/src/app/models/jobParameters.model.ts renamed to src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/Extensions.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,18 @@
1313
* limitations under the License.
1414
*/
1515

16-
export type JobParametersModel = {
17-
variables: Map<string, string>;
18-
maps: Map<string, Set<string>>;
19-
keyValuePairs: Map<string, Map<string, string>>;
20-
};
16+
package za.co.absa.hyperdrive.trigger.api.rest.utils
2117

22-
export class JobParametersModelFactory {
23-
static create(
24-
variables: Map<string, string>,
25-
maps: Map<string, Set<string>>,
26-
keyValuePairs: Map<string, Map<string, string>>,
27-
): JobParametersModel {
28-
return { variables: variables, maps: maps, keyValuePairs: keyValuePairs };
18+
import za.co.absa.hyperdrive.trigger.models.AdditionalSparkConfig
19+
20+
object Extensions {
21+
implicit class SparkConfigList(list: List[AdditionalSparkConfig]) {
22+
def toKeyValueMap: Map[String, String] =
23+
list.map(element => element.key -> element.value).toMap
2924
}
3025

31-
static createEmpty(): JobParametersModel {
32-
return this.create(new Map(), new Map(), new Map());
26+
implicit class SparkConfigMap(map: Map[String, String]) {
27+
def toAdditionalSparkConfigList: List[AdditionalSparkConfig] =
28+
map.map(element => AdditionalSparkConfig(element._1, element._2)).toList
3329
}
3430
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.models
17+
18+
import play.api.libs.json.{Json, OFormat}
19+
20+
case class AdditionalSparkConfig(key: String, value: String)
21+
22+
object AdditionalSparkConfig {
23+
implicit val additionalSparkConfigFormat: OFormat[AdditionalSparkConfig] =
24+
Json.using[Json.WithDefaultValues].format[AdditionalSparkConfig]
25+
}

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobDefinitionParameters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ case class SparkDefinitionParameters(
3030
appArguments: List[String] = List.empty[String],
3131
additionalJars: List[String] = List.empty[String],
3232
additionalFiles: List[String] = List.empty[String],
33-
additionalSparkConfig: Map[String, String] = Map.empty[String, String]
33+
additionalSparkConfig: List[AdditionalSparkConfig] = List.empty[AdditionalSparkConfig]
3434
) extends JobDefinitionParameters
3535

3636
case class ShellDefinitionParameters(jobType: JobType = JobTypes.Shell, scriptLocation: Option[String])

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstanceParameters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ case class SparkInstanceParameters(
3030
appArguments: List[String] = List.empty[String],
3131
additionalJars: List[String] = List.empty[String],
3232
additionalFiles: List[String] = List.empty[String],
33-
additionalSparkConfig: Map[String, String] = Map.empty[String, String]
33+
additionalSparkConfig: List[AdditionalSparkConfig] = List.empty[AdditionalSparkConfig]
3434
) extends JobInstanceParameters
3535

3636
case class ShellInstanceParameters(jobType: JobType = JobTypes.Shell, scriptLocation: String)

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobTemplateParameters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ case class SparkTemplateParameters(
3030
appArguments: List[String] = List.empty[String],
3131
additionalJars: List[String] = List.empty[String],
3232
additionalFiles: List[String] = List.empty[String],
33-
additionalSparkConfig: Map[String, String] = Map.empty[String, String]
33+
additionalSparkConfig: List[AdditionalSparkConfig] = List.empty[AdditionalSparkConfig]
3434
) extends JobTemplateParameters
3535

3636
case class ShellTemplateParameters(jobType: JobType = JobTypes.Shell, scriptLocation: String)

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import javax.inject.Inject
3939
import scala.annotation.tailrec
4040
import scala.concurrent.{ExecutionContext, Future}
4141
import scala.util.{Failure, Success, Try}
42+
import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.SparkConfigList
4243

4344
@Service
4445
class SparkEmrClusterServiceImpl @Inject() (
@@ -140,8 +141,11 @@ class SparkEmrClusterServiceImpl @Inject() (
140141
val sparkSubmitConfs = Map("--deploy-mode" -> "cluster")
141142
val confs = Map("spark.app.name" -> jobName, "spark.yarn.tags" -> id) ++
142143
config.additionalConfs ++
143-
jobParameters.additionalSparkConfig ++
144-
mergeAdditionalSparkConfig(config.additionalConfs, jobParameters.additionalSparkConfig)
144+
jobParameters.additionalSparkConfig.toKeyValueMap ++
145+
mergeAdditionalSparkConfig(
146+
config.additionalConfs,
147+
jobParameters.additionalSparkConfig.toKeyValueMap
148+
)
145149
val files = config.filesToDeploy ++ jobParameters.additionalFiles
146150
SparkEmrArgs(
147151
mainClass = jobParameters.mainClass,

0 commit comments

Comments
 (0)