Skip to content

Commit 03fca8e

Browse files
Feature/819 store error messages (#827)
* Add error_message field to DB * Store diagnostics in job instance table * Rename error_message -> diagnostics * Add diagnostics to notification * List all causes * Add bulletpoints * Fixes * Fix format
1 parent 51f440e commit 03fca8e

File tree

10 files changed

+266
-12
lines changed

10 files changed

+266
-12
lines changed

src/main/resources/db_scripts/db_script_latest.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ create table "job_instance" (
3434
"id" BIGSERIAL NOT NULL PRIMARY KEY,
3535
"application_id" VARCHAR,
3636
"step_id" VARCHAR,
37-
"job_parameters" JSONB NOT NULL DEFAULT '{}'
37+
"job_parameters" JSONB NOT NULL DEFAULT '{}',
38+
"diagnostics" VARCHAR
3839
);
3940

4041
create table "job_definition" (
@@ -148,7 +149,8 @@ create table archive_job_instance
148149
references archive_dag_instance,
149150
id bigint primary key,
150151
application_id varchar,
151-
step_id varchar
152+
step_id varchar,
153+
diagnostics varchar
152154
);
153155

154156
create table archive_event
@@ -317,8 +319,8 @@ BEGIN
317319
GET DIAGNOSTICS _cnt = ROW_COUNT;
318320
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;
319321

320-
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id)
321-
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id
322+
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
323+
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
322324
FROM job_instance ji
323325
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
324326
ON CONFLICT (id) DO NOTHING;

src/main/resources/db_scripts/liquibase/db.changelog.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,6 @@ databaseChangeLog:
9595
- include:
9696
relativeToChangelogFile: true
9797
file: v0.5.14.remove-deprecated-columns.yml
98+
- include:
99+
relativeToChangelogFile: true
100+
file: v0.5.20.add-diagnostics-field.yml
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
ALTER TABLE "job_instance"
17+
ADD COLUMN "diagnostics" VARCHAR;
18+
ALTER TABLE "archive_job_instance"
19+
ADD COLUMN "diagnostics" VARCHAR;
20+
21+
22+
23+
CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk(
24+
IN i_min_id BIGINT,
25+
IN i_max_id BIGINT
26+
)
27+
AS $$
28+
-------------------------------------------------------------------------------
29+
--
30+
-- Procedure: archive_dag_instances_chunk(2)
31+
-- Copies dag_instances with a final status from i_min_id to i_max_id to the
32+
-- archive_dag_instance table.
33+
-- Along with dag_instance, referenced job_instances and events are
34+
-- archived to the archive_job_instance and archive_event tables, respectively.
35+
-- This method should not be called directly. Instead, use archive_dag_instances
36+
--
37+
-- Parameters:
38+
-- i_min_id - Minimum dag instance id to archive
39+
-- i_max_id - Maximum dag instance id to archive
40+
--
41+
-------------------------------------------------------------------------------
42+
DECLARE
43+
_cnt INT;
44+
BEGIN
45+
RAISE NOTICE '=============';
46+
RAISE NOTICE ' START BATCH';
47+
RAISE NOTICE '=============';
48+
49+
CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS
50+
SELECT di.id
51+
FROM dag_instance di
52+
WHERE di.status NOT IN ('Running', 'InQueue')
53+
AND di.id >= i_min_id
54+
AND di.id <= i_max_id;
55+
GET DIAGNOSTICS _cnt = ROW_COUNT;
56+
RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id;
57+
58+
INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by)
59+
SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by
60+
FROM dag_instance di
61+
JOIN dag_instance_ids_to_archive diita ON di.id = diita.id
62+
ON CONFLICT (id) DO NOTHING;
63+
GET DIAGNOSTICS _cnt = ROW_COUNT;
64+
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;
65+
66+
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
67+
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
68+
FROM job_instance ji
69+
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
70+
ON CONFLICT (id) DO NOTHING;
71+
GET DIAGNOSTICS _cnt = ROW_COUNT;
72+
RAISE NOTICE 'Archived % job instances', _cnt;
73+
74+
INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload)
75+
SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload
76+
FROM "event" e
77+
JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id
78+
ON CONFLICT (id) DO NOTHING;
79+
GET DIAGNOSTICS _cnt = ROW_COUNT;
80+
RAISE NOTICE 'Archived % events', _cnt;
81+
82+
RAISE NOTICE 'Going to delete dag instances';
83+
84+
DELETE FROM job_instance ji
85+
USING dag_instance_ids_to_archive diita
86+
WHERE ji.dag_instance_id = diita.id;
87+
GET DIAGNOSTICS _cnt = ROW_COUNT;
88+
RAISE NOTICE 'Deleted % job instances', _cnt;
89+
90+
DELETE FROM "event" e
91+
USING dag_instance_ids_to_archive diita
92+
WHERE e.dag_instance_id = diita.id;
93+
GET DIAGNOSTICS _cnt = ROW_COUNT;
94+
RAISE NOTICE 'Deleted % events', _cnt;
95+
96+
DELETE FROM dag_instance di
97+
USING dag_instance_ids_to_archive diita
98+
WHERE di.id = diita.id;
99+
GET DIAGNOSTICS _cnt = ROW_COUNT;
100+
RAISE NOTICE 'Deleted % dag instances', _cnt;
101+
102+
DROP TABLE dag_instance_ids_to_archive;
103+
104+
RAISE NOTICE '=============';
105+
RAISE NOTICE ' END BATCH';
106+
RAISE NOTICE '=============';
107+
END;
108+
$$ LANGUAGE plpgsql;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.5.20.add-diagnostics-field
19+
logicalFilePath: v0.5.20.add-diagnostics-field
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.5.20.add-diagnostics-field.sql
26+
splitStatements: false

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ case class JobInstance(
2323
jobName: String,
2424
jobParameters: JobInstanceParameters,
2525
jobStatus: JobStatus,
26+
diagnostics: Option[String] = None,
2627
executorJobId: Option[String],
2728
applicationId: Option[String],
2829
stepId: Option[String],

src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ trait JobInstanceTable {
3030
def jobName: Rep[String] = column[String]("job_name")
3131
def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB"))
3232
def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status")
33+
def diagnostics: Rep[Option[String]] = column[Option[String]]("diagnostics")
3334
def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id")
3435
def applicationId: Rep[Option[String]] = column[Option[String]]("application_id")
3536
def stepId: Rep[Option[String]] = column[Option[String]]("step_id")
@@ -46,6 +47,7 @@ trait JobInstanceTable {
4647
jobName,
4748
jobParameters,
4849
jobStatus,
50+
diagnostics,
4951
executorJobId,
5052
applicationId,
5153
stepId,

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
1717

1818
import play.api.libs.json.{Json, OFormat}
1919

20-
case class App(id: String, name: String, state: String, finalStatus: String)
20+
case class App(id: String, name: String, state: String, finalStatus: String, diagnostics: String)
2121

2222
case class Apps(app: Seq[App])
2323

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ object SparkExecutor {
5454
case None => Seq.empty
5555
}) match {
5656
case Seq(first) =>
57-
updateJob(jobInstance.copy(applicationId = Some(first.id), jobStatus = getStatus(first.finalStatus)))
57+
updateJob(getUpdatedJobInstance(jobInstance, first))
5858
case _
5959
// It relies on the same value set for sparkYarnSink.submitTimeout in multi instance deployment
6060
if jobInstance.jobStatus == JobStatuses.Submitting && jobInstance.updated
@@ -69,6 +69,23 @@ object SparkExecutor {
6969
private def getStatusUrl(executorJobId: String)(implicit sparkConfig: SparkConfig): String =
7070
s"${sparkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps?applicationTags=$executorJobId"
7171

72+
private def getUpdatedJobInstance(
73+
jobInstance: JobInstance,
74+
app: App
75+
): JobInstance = {
76+
val diagnostics = app.diagnostics match {
77+
case "" => None
78+
case _ => Some(app.diagnostics)
79+
}
80+
81+
jobInstance.copy(
82+
jobStatus = getStatus(app.finalStatus),
83+
applicationId = Some(app.id),
84+
updated = Option(LocalDateTime.now()),
85+
diagnostics = diagnostics
86+
)
87+
}
88+
7289
private def getStatus(finalStatus: String): JobStatus =
7390
finalStatus match {
7491
case fs if fs == YarnFinalStatuses.Undefined.name => Running

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class NotificationSenderImpl(
5252
private val yarnBaseUrl = sparkConfig.hadoopResourceManagerUrlBase
5353
private val dateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
5454
private val messageQueue = new ConcurrentLinkedQueue[Message]
55+
private val causedByPattern = """Caused by: (.*)\n""".r
5556

5657
def createNotifications(dagInstance: DagInstance, jobInstances: Seq[JobInstance])(
5758
implicit ec: ExecutionContext
@@ -114,15 +115,40 @@ class NotificationSenderImpl(
114115
"Finished" -> dagInstance.finished.map(_.format(dateTimeFormatter)).getOrElse("Couldn't get finish time"),
115116
"Status" -> dagInstance.status.name
116117
)
117-
jobInstances
118+
val failedJob = jobInstances
118119
.sortBy(_.order)(Ordering.Int.reverse)
119120
.find(_.jobStatus.isFailed)
120-
.map(_.applicationId.map { appId =>
121-
val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId"
122-
messageMap += ("Failed application" -> applicationUrl)
123-
})
121+
failedJob.map(_.applicationId.map { appId =>
122+
val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId"
123+
messageMap += ("Failed application" -> applicationUrl)
124+
})
125+
124126
messageMap += ("Notification rule ID" -> notificationRule.id.toString)
125-
val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" + footer
127+
128+
val diagnosticsOpt = failedJob.flatMap(_.diagnostics)
129+
val causes = diagnosticsOpt
130+
.map { diagnostics =>
131+
causedByPattern
132+
.findAllMatchIn(diagnostics)
133+
.map(_.group(1))
134+
.toSeq
135+
.map("- " + _)
136+
.reduce(_ + "\n" + _)
137+
}
138+
.map("Causes:\n" + _ + "\n\n")
139+
.getOrElse("")
140+
141+
val stackTrace = diagnosticsOpt
142+
.map { diagnostics =>
143+
s"Stack trace:\n$diagnostics\n\n"
144+
}
145+
.getOrElse("")
146+
147+
val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) +
148+
"\n\n" +
149+
causes +
150+
stackTrace +
151+
footer
126152
Message(notificationRule.recipients, subject, message, 1)
127153
}
128154
}

src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,75 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi
131131
messagesCaptor.getValue should include(s"Failed application: $clusterBaseUrl/cluster/app/application_9876_4567")
132132
}
133133

134+
it should "print the error message if diagnostics are available" in {
135+
// given
136+
val di = createDagInstance().copy(
137+
status = DagInstanceStatuses.Failed,
138+
started = LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(12, 30)),
139+
finished = Some(LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(14, 30)))
140+
)
141+
142+
val diagnostics =
143+
"""User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION
144+
|Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
145+
|=== Streaming Query ===
146+
|Caused by: org.apache.spark.SparkException: Job aborted.
147+
|at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
148+
|Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
149+
|at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
150+
|""".stripMargin
151+
val ji =
152+
createJobInstance().copy(
153+
jobStatus = JobStatuses.Failed,
154+
applicationId = Some("application_1234_4567"),
155+
order = 1,
156+
diagnostics = Some(diagnostics)
157+
)
158+
159+
val nr1 = createNotificationRule().copy(id = 1, recipients = Seq("[email protected]", "[email protected]"))
160+
val w = createWorkflow()
161+
162+
when(notificationRuleService.getMatchingNotificationRules(eqTo(di.workflowId), eqTo(di.status))(any()))
163+
.thenReturn(Future(Some(Seq(nr1), w)))
164+
165+
// when
166+
await(underTest.createNotifications(di, Seq(ji)))
167+
underTest.sendNotifications()
168+
169+
// then
170+
val expectedMessage =
171+
"""Environment: TEST
172+
|Project: project
173+
|Workflow Name: workflow
174+
|Started: 2020-03-02T12:30:00
175+
|Finished: 2020-03-02T14:30:00
176+
|Status: Failed
177+
|Failed application: http://localhost:8088/cluster/app/application_1234_4567
178+
|Notification rule ID: 1
179+
|
180+
|Causes:
181+
|- org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
182+
|- org.apache.spark.SparkException: Job aborted.
183+
|- java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
184+
|
185+
|Stack trace:
186+
|User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION
187+
|Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
188+
|=== Streaming Query ===
189+
|Caused by: org.apache.spark.SparkException: Job aborted.
190+
|at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
191+
|Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
192+
|at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
193+
|
194+
|
195+
|This message has been generated automatically. Please don't reply to it.
196+
|
197+
|HyperdriveDevTeam""".stripMargin
198+
val messagesCaptor: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String])
199+
verify(emailService).sendMessageToBccRecipients(any(), any(), any(), messagesCaptor.capture())
200+
messagesCaptor.getValue shouldBe expectedMessage
201+
}
202+
134203
it should "retry sending the message at most maxRetries times" in {
135204
// given
136205
val maxRetries = 5

0 commit comments

Comments
 (0)