Skip to content

Commit f40bb44

Browse files
authoredSep 5, 2024
[GOBBLIN-2150] calculate flow status correctly (apache#4048)
* calculate flow status correctly * address review comments
1 parent 0ea2414 commit f40bb44

File tree

3 files changed

+62
-16
lines changed

3 files changed

+62
-16
lines changed
 

‎gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828
import java.util.concurrent.Future;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.stream.Collectors;
3031

3132
import com.google.common.base.Optional;
3233
import com.google.common.collect.Maps;
@@ -174,7 +175,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
174175
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
175176
}
176177
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
177-
sendCancellationEvent(dagNodeToCancel);
178+
sendJobCancellationEvent(dagNodeToCancel);
178179
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
179180
} catch (Exception e) {
180181
throw new IOException(e);
@@ -190,7 +191,7 @@ public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore
190191
}
191192
}
192193

193-
private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) {
194+
private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) {
194195
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
195196
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
196197
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
@@ -357,4 +358,21 @@ private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,
357358
Set<Dag.DagNode<JobExecutionPlan>> canRun) {
358359
return node.getParentNodes() == null || canRun.containsAll(node.getParentNodes());
359360
}
361+
362+
public static String calcFlowStatus(Dag<JobExecutionPlan> dag) {
363+
Set<ExecutionStatus> jobsStatuses = dag.getNodes().stream().map(node -> node.getValue().getExecutionStatus())
364+
.collect(Collectors.toSet());
365+
366+
if (jobsStatuses.contains(FAILED)) {
367+
return TimingEvent.FlowTimings.FLOW_FAILED;
368+
} else if (jobsStatuses.contains(CANCELLED)) {
369+
return TimingEvent.FlowTimings.FLOW_CANCELLED;
370+
} else if (jobsStatuses.contains(PENDING_RESUME)) {
371+
return TimingEvent.FlowTimings.FLOW_PENDING_RESUME;
372+
} else if (jobsStatuses.stream().allMatch(jobStatus -> jobStatus == COMPLETE)) {
373+
return TimingEvent.FlowTimings.FLOW_SUCCEEDED;
374+
} else {
375+
return TimingEvent.FlowTimings.FLOW_RUNNING;
376+
}
377+
}
360378
}

‎gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
115115
dag.setFlowEvent(null);
116116
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
117117
} else if (DagProcUtils.isDagFinished(dag)) {
118-
if (dag.getFlowEvent() == null) {
119-
// If the dag flow event is not set and there are no more jobs running, then it is successful
120-
// also note that `onJobFinish` method does whatever is required to do after job finish, determining a Dag's
121-
// status is not possible on individual job's finish status
122-
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
123-
}
124-
String flowEvent = dag.getFlowEvent();
118+
String flowEvent = DagProcUtils.calcFlowStatus(dag);
119+
dag.setFlowEvent(flowEvent);
125120
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
126121
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
127122
// todo - verify if work from PR#3641 is required

‎gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import org.apache.gobblin.config.ConfigBuilder;
3535
import org.apache.gobblin.configuration.ConfigurationKeys;
36+
import org.apache.gobblin.metrics.event.TimingEvent;
3637
import org.apache.gobblin.runtime.api.JobSpec;
3738
import org.apache.gobblin.runtime.api.SpecExecutor;
3839
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -73,162 +74,194 @@ public void testGetJobSpecFromDag() throws Exception {
7374
}
7475

7576
@Test
76-
public void testIsDagFinishedSingleNode() throws URISyntaxException {
77+
public void testFlowStatusAndIsDagFinishedSingleNode() throws URISyntaxException {
7778
Dag<JobExecutionPlan> dag =
7879
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, proxyUser, additionalConfig);
7980

8081
setJobStatuses(dag, Collections.singletonList(COMPLETE));
8182
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
83+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag));
8284

8385
setJobStatuses(dag, Collections.singletonList(FAILED));
8486
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
87+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
8588

8689
setJobStatuses(dag, Collections.singletonList(CANCELLED));
8790
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
91+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
8892

8993
setJobStatuses(dag, Collections.singletonList(PENDING));
9094
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
95+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
9196

9297
setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
9398
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
99+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
94100

95101
setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
96102
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
103+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag));
97104

98105
setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
99106
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
107+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
100108

101109
setJobStatuses(dag, Collections.singletonList(RUNNING));
102110
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
111+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
103112
}
104113

105114
@Test
106-
public void testIsDagFinishedTwoNodes() throws URISyntaxException {
115+
public void testFlowStatusAndIsDagFinishedTwoNodes() throws URISyntaxException {
107116
Dag<JobExecutionPlan> dag =
108117
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig);
109118

110119
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
111120
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
121+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
112122

113123
setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
114124
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
125+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
115126

116127
setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
117128
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
129+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
118130

119131
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
120132
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
133+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
121134
}
122135

123136
@Test
124-
public void testIsDagFinishedThreeNodes() throws URISyntaxException {
137+
public void testFlowStatusAndIsDagFinishedThreeNodes() throws URISyntaxException {
125138
Dag<JobExecutionPlan> dag = buildComplexDag3();
126139

127140
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
128141
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
142+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
129143

130144
setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
131145
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
146+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
132147

133148
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
134149
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
150+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
135151
}
136152

137153
@Test
138-
public void testIsDagFinishedFourNodes() throws URISyntaxException {
154+
public void testFlowStatusAndIsDagFinishedFourNodes() throws URISyntaxException {
139155
Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();
140156

141157
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
142158
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
159+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
143160

144161
setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
145162
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
163+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
146164

147165
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
148166
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
167+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
149168

150169
setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
151170
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
171+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
152172
}
153173

154174
@Test
155-
public void testIsDagFinishedMultiNodes() throws URISyntaxException {
175+
public void testFlowStatusAndIsDagFinishedMultiNodes() throws URISyntaxException {
156176
Dag<JobExecutionPlan> dag = buildComplexDag1();
157177
setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
158178
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
159179
Collections.shuffle(dag.getNodes());
160180
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
181+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag));
161182

162183
Dag<JobExecutionPlan> dag2 = buildComplexDag1();
163184
setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
164185
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
165186
Collections.shuffle(dag2.getNodes());
166187
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
188+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag2));
167189

168190
Dag<JobExecutionPlan> dag3 = buildComplexDag1();
169191
setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
170192
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
171193
Collections.shuffle(dag3.getNodes());
172194
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
195+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag3));
173196

174197
Dag<JobExecutionPlan> dag4 = buildComplexDag1();
175198
setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
176199
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
177200
Collections.shuffle(dag4.getNodes());
178201
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
202+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag4));
179203

180204
Dag<JobExecutionPlan> dag5 = buildComplexDag1();
181205
setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
182206
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
183207
Collections.shuffle(dag5.getNodes());
184208
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
209+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag5));
185210

186211
Dag<JobExecutionPlan> dag6 = buildComplexDag1();
187212
setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
188213
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
189214
Collections.shuffle(dag6.getNodes());
190215
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
216+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag6));
191217

192218
Dag<JobExecutionPlan> dag7 = buildComplexDag1();
193219
setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
194220
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
195221
Collections.shuffle(dag7.getNodes());
196222
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
223+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag7));
197224

198225
Dag<JobExecutionPlan> dag8 = buildComplexDag1();
199226
setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
200227
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
201228
Collections.shuffle(dag8.getNodes());
202229
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
230+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag8));
203231

204232
Dag<JobExecutionPlan> dag9 = buildComplexDag1();
205233
setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
206234
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
207235
Collections.shuffle(dag9.getNodes());
208236
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
237+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag9));
209238
}
210239

211240
@Test
212-
public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException {
241+
public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException {
213242
Dag<JobExecutionPlan> dag =
214243
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig);
215244

216245
setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
217246
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
247+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));
218248

219249
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
220250
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
251+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
221252
}
222253

223254
@Test
224-
public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException {
255+
public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException {
225256
Dag<JobExecutionPlan> dag = buildComplexDagWithFinishRunningFailureOption();
226257

227258
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING));
228259
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
260+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
229261

230262
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, PENDING));
231263
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
264+
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
232265
}
233266

234267
private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> statuses) {

0 commit comments

Comments
 (0)
Please sign in to comment.