Skip to content

Commit d1e02d7

Browse files
Feature/405 persist yarn (#416)
* Persist application id to JobInstance * Frontend impl * Display N/A if there are no logs * Satisfy sonarcloud * Update db_latest sql
1 parent bb552dd commit d1e02d7

File tree

24 files changed

+148
-31
lines changed

24 files changed

+148
-31
lines changed

src/main/resources/db_scripts/db_script_latest.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ create table "job_instance" (
3131
"key_value_pairs" VARCHAR NOT NULL,
3232
"job_status" VARCHAR NOT NULL,
3333
"executor_job_id" VARCHAR,
34+
"application_id" VARCHAR,
3435
"created" TIMESTAMP NOT NULL,
3536
"updated" TIMESTAMP,
3637
"order" INTEGER NOT NULL,

src/main/resources/db_scripts/liquibase/db.changelog.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@ databaseChangeLog:
4747
- include:
4848
relativeToChangelogFile: true
4949
file: v0.3.4.add-scheduler-instances.yml
50+
- include:
51+
relativeToChangelogFile: true
52+
file: v0.4.0.add-application-id.yml
53+
5054

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
alter table "job_instance" add column "application_id" VARCHAR;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.4.0.add-application-id
19+
logicalFilePath: v0.4.0.add-application-id
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.4.0.add-application-id.sql

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/AppInfoController.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ class AppInfoController {
2525
val environment: String = ""
2626
@Value("${version:Unknown}")
2727
val version: String = ""
28+
@Value("${sparkYarnSink.hadoopResourceManagerUrlBase:Unknown}")
29+
val resourceManagerUrl: String = ""
2830

2931
@GetMapping(path = Array("/app/info"))
3032
def appInfo(): AppInfo = {
3133
AppInfo(
3234
environment = environment,
33-
version = version
35+
version = version,
36+
resourceManagerUrl = resourceManagerUrl
3437
)
3538
}
3639

src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/DagInstanceService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class DagInstanceServiceImpl(override val jobTemplateService: JobTemplateService
6060
jobParameters = resolvedJobDefinition.jobParameters,
6161
jobStatus = initialJobStatus,
6262
executorJobId = None,
63+
applicationId = None,
6364
created = now,
6465
updated = finished,
6566
order = resolvedJobDefinition.order,

src/main/scala/za/co/absa/hyperdrive/trigger/models/AppInfo.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@
1515

1616
package za.co.absa.hyperdrive.trigger.models
1717

18-
case class AppInfo(environment: String, version: String)
18+
case class AppInfo(
19+
environment: String,
20+
version: String,
21+
resourceManagerUrl: String
22+
)

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ case class JobInstance(
2626
jobParameters: JobParameters,
2727
jobStatus: JobStatus,
2828
executorJobId: Option[String],
29+
applicationId: Option[String],
2930
created: LocalDateTime,
3031
updated: Option[LocalDateTime],
3132
order: Int,
@@ -41,6 +42,7 @@ case class JobInstanceJoined(
4142
jobParameters: JobParameters,
4243
jobStatus: JobStatus,
4344
executorJobId: Option[String],
45+
applicationId: Option[String],
4446
created: LocalDateTime,
4547
updated: Option[LocalDateTime],
4648
order: Int,

src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ trait JobInstanceTable {
3737
def keyValuePairs: Rep[Map[String, SortedMap[String, String]]] = column[Map[String, SortedMap[String, String]]]("key_value_pairs")
3838
def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status")
3939
def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id")
40+
def applicationId: Rep[Option[String]] = column[Option[String]]("application_id")
4041
def created: Rep[LocalDateTime] = column[LocalDateTime]("created")
4142
def updated: Rep[Option[LocalDateTime]] = column[Option[LocalDateTime]]("updated")
4243
def order: Rep[Int] = column[Int]("order")
@@ -54,6 +55,7 @@ trait JobInstanceTable {
5455
keyValuePairs,
5556
jobStatus,
5657
executorJobId,
58+
applicationId,
5759
created,
5860
updated,
5961
order,
@@ -71,11 +73,12 @@ trait JobInstanceTable {
7173
),
7274
jobStatus = jobInstanceTuple._6,
7375
executorJobId = jobInstanceTuple._7,
74-
created = jobInstanceTuple._8,
75-
updated = jobInstanceTuple._9,
76-
order = jobInstanceTuple._10,
77-
dagInstanceId = jobInstanceTuple._11,
78-
id = jobInstanceTuple._12
76+
applicationId = jobInstanceTuple._8,
77+
created = jobInstanceTuple._9,
78+
updated = jobInstanceTuple._10,
79+
order = jobInstanceTuple._11,
80+
dagInstanceId = jobInstanceTuple._12,
81+
id = jobInstanceTuple._13
7982
),
8083
(jobInstance: JobInstance) =>
8184
Option(
@@ -86,6 +89,7 @@ trait JobInstanceTable {
8689
jobInstance.jobParameters.keyValuePairs,
8790
jobInstance.jobStatus,
8891
jobInstance.executorJobId,
92+
jobInstance.applicationId,
8993
jobInstance.created,
9094
jobInstance.updated,
9195
jobInstance.order,

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ object SparkExecutor extends Executor {
6464
case Some(asd) => asd.apps.app
6565
case None => Seq.empty
6666
}) match {
67-
case Seq(first) => updateJob(jobInstance.copy(jobStatus = getStatus(first.finalStatus)))
67+
case Seq(first) => updateJob(jobInstance.copy(
68+
applicationId = Some(first.id),
69+
jobStatus = getStatus(first.finalStatus)))
6870
case _ if jobInstance.jobStatus == Submitting => updateJob(jobInstance.copy(jobStatus = SubmissionTimeout))
6971
case _ => updateJob(jobInstance.copy(jobStatus = Lost))
7072
}

0 commit comments

Comments
 (0)