22
22
import java .util .Comparator ;
23
23
import java .util .HashMap ;
24
24
import java .util .HashSet ;
25
+ import java .util .List ;
25
26
import java .util .Map ;
26
27
import java .util .Objects ;
27
28
import java .util .PriorityQueue ;
32
33
import java .util .stream .Collectors ;
33
34
34
35
import org .apache .commons .compress .utils .Sets ;
36
+
37
+ import org .apache .gobblin .cluster .GobblinHelixConstants ;
35
38
import org .apache .gobblin .stream .WorkUnitChangeEvent ;
39
+
40
+ import org .apache .hadoop .yarn .api .records .Container ;
36
41
import org .apache .hadoop .yarn .api .records .Resource ;
37
42
import org .apache .helix .HelixDataAccessor ;
38
43
import org .apache .helix .HelixManager ;
60
65
import org .apache .gobblin .cluster .HelixUtils ;
61
66
import org .apache .gobblin .util .ConfigUtils ;
62
67
import org .apache .gobblin .util .ExecutorsUtils ;
68
+ import org .apache .gobblin .yarn .event .ContainerReleaseRequest ;
63
69
64
70
import static org .apache .gobblin .yarn .GobblinYarnTaskRunner .HELIX_YARN_INSTANCE_NAME_PREFIX ;
65
71
71
77
@ Slf4j
72
78
public class YarnAutoScalingManager extends AbstractIdleService {
73
79
private final String AUTO_SCALING_PREFIX = GobblinYarnConfigurationKeys .GOBBLIN_YARN_PREFIX + "autoScaling." ;
74
- private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
75
- AUTO_SCALING_PREFIX + "pollingIntervalSeconds" ;
80
+ private final String AUTO_SCALING_POLLING_INTERVAL_SECS = AUTO_SCALING_PREFIX + "pollingIntervalSeconds" ;
76
81
private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX + "taskAttemptsThreshold" ;
77
82
private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20 ;
78
83
private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold" ;
@@ -82,21 +87,24 @@ public class YarnAutoScalingManager extends AbstractIdleService {
82
87
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer" ;
83
88
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1 ;
84
89
private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor" ;
90
+ private final String STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES =
91
+ AUTO_SCALING_PREFIX + "stuckTaskContainerReleaseThresholdMinutes" ;
92
+ private final String RELEASE_CONTAINER_IF_TASK_IS_STUCK = AUTO_SCALING_PREFIX + "releaseContainerIfTaskIsStuck" ;
93
+ private final String DETECT_IF_TASK_IS_STUCK = AUTO_SCALING_PREFIX + "detectIfTaskIsStuck" ;
94
+ private final String ENABLE_DETECTION_FOR_TASK_STATES = AUTO_SCALING_PREFIX + "enableDetectionForTaskStates" ;
85
95
private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0 ;
96
+ private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay" ;
97
+ private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60 ;
98
+ private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize" ;
99
+ public final static int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10 ;
100
+ private final static int DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK = 20 ;
101
+
86
102
// The cluster level default tags for Helix instances
87
103
private final String defaultHelixInstanceTags ;
88
104
private final int defaultContainerMemoryMbs ;
89
105
private final int defaultContainerCores ;
90
-
91
- private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay" ;
92
- private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60 ;
93
106
private int taskAttemptsThreshold ;
94
107
private final boolean splitWorkUnitReachThreshold ;
95
-
96
- private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize" ;
97
-
98
- public final static int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10 ;
99
-
100
108
private final Config config ;
101
109
private final HelixManager helixManager ;
102
110
private final ScheduledExecutorService autoScalingExecutor ;
@@ -105,6 +113,10 @@ public class YarnAutoScalingManager extends AbstractIdleService {
105
113
private final double overProvisionFactor ;
106
114
private final SlidingWindowReservoir slidingFixedSizeWindow ;
107
115
private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES ;
116
+ private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask ;
117
+ private final boolean enableReleasingContainerHavingStuckTask ;
118
+ private final boolean enableDetectionStuckTask ;
119
+ private final HashSet <TaskPartitionState > detectionForStuckTaskStates ;
108
120
private static final HashSet <TaskPartitionState >
109
121
UNUSUAL_HELIX_TASK_STATES = Sets .newHashSet (TaskPartitionState .ERROR , TaskPartitionState .DROPPED , TaskPartitionState .COMPLETED , TaskPartitionState .TIMED_OUT );
110
122
@@ -136,6 +148,34 @@ public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
136
148
DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD );
137
149
this .splitWorkUnitReachThreshold = ConfigUtils .getBoolean (this .config , SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD ,
138
150
DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD );
151
+ this .maxTimeInMinutesBeforeReleasingContainerHavingStuckTask = ConfigUtils .getInt (this .config ,
152
+ STUCK_TASK_CONTAINER_RELEASE_THRESHOLD_MINUTES ,
153
+ DEFAULT_MAX_TIME_MINUTES_TO_RELEASE_CONTAINER_HAVING_HELIX_TASK_THAT_IS_STUCK );
154
+ this .enableReleasingContainerHavingStuckTask = ConfigUtils .getBoolean (this .config ,
155
+ RELEASE_CONTAINER_IF_TASK_IS_STUCK , false );
156
+ this .enableDetectionStuckTask = ConfigUtils .getBoolean (this .config , DETECT_IF_TASK_IS_STUCK , false );
157
+ this .detectionForStuckTaskStates = getTaskStatesForWhichDetectionIsEnabled ();
158
+ }
159
+
160
+ private HashSet <TaskPartitionState > getTaskStatesForWhichDetectionIsEnabled () {
161
+ HashSet <TaskPartitionState > taskStates = new HashSet <>();
162
+ if (this .enableDetectionStuckTask ) {
163
+ List <String > taskStatesEnabledForDetection = ConfigUtils .getStringList (this .config , ENABLE_DETECTION_FOR_TASK_STATES );
164
+ for (String taskState : taskStatesEnabledForDetection ) {
165
+ try {
166
+ TaskPartitionState helixTaskState = TaskPartitionState .valueOf (taskState );
167
+ if (helixTaskState == TaskPartitionState .RUNNING ) {
168
+ log .warn ("Running state is not allowed for detection as it is not a stuck state, ignoring" );
169
+ continue ;
170
+ }
171
+ taskStates .add (helixTaskState );
172
+ } catch (IllegalArgumentException e ) {
173
+ log .warn ("Invalid task state {} provided for detection, ignoring" , taskState );
174
+ }
175
+ }
176
+ }
177
+ log .info ("Detection of task being stuck is enabled on following task states {}" , taskStates );
178
+ return taskStates ;
139
179
}
140
180
141
181
@ Override
@@ -150,7 +190,9 @@ protected void startUp() {
150
190
this .autoScalingExecutor .scheduleAtFixedRate (new YarnAutoScalingRunnable (new TaskDriver (this .helixManager ),
151
191
this .yarnService , this .partitionsPerContainer , this .overProvisionFactor ,
152
192
this .slidingFixedSizeWindow , this .helixManager .getHelixDataAccessor (), this .defaultHelixInstanceTags ,
153
- this .defaultContainerMemoryMbs , this .defaultContainerCores , this .taskAttemptsThreshold , this .splitWorkUnitReachThreshold ),
193
+ this .defaultContainerMemoryMbs , this .defaultContainerCores , this .taskAttemptsThreshold ,
194
+ this .splitWorkUnitReachThreshold , this .maxTimeInMinutesBeforeReleasingContainerHavingStuckTask ,
195
+ this .enableReleasingContainerHavingStuckTask , this .enableDetectionStuckTask , this .detectionForStuckTaskStates ),
154
196
initialDelay , scheduleInterval , TimeUnit .SECONDS );
155
197
}
156
198
@@ -179,13 +221,22 @@ static class YarnAutoScalingRunnable implements Runnable {
179
221
private final int defaultContainerCores ;
180
222
private final int taskAttemptsThreshold ;
181
223
private final boolean splitWorkUnitReachThreshold ;
224
+ private final int maxTimeInMinutesBeforeReleasingContainerHavingStuckTask ;
225
+ private final boolean enableReleasingContainerHavingStuckTask ;
226
+ private final boolean enableDetectionStuckTask ;
227
+ private final HashSet <TaskPartitionState > taskStates ;
182
228
183
229
/**
184
230
* A static map that keep track of an idle instance and its latest beginning idle time.
185
231
* If an instance is no longer idle when inspected, it will be dropped from this map.
186
232
*/
187
233
private static final Map <String , Long > instanceIdleSince = new HashMap <>();
188
-
234
+ /**
235
+ * A static nested map that keep track of an instances which contains the tasks which are present in any of the
236
+ * configured states along with its latest beginning idle time in any of those states. If an instance is no longer
237
+ * in the given states when inspected, it will be dropped from this map.
238
+ */
239
+ private static final Map <String , Long > instanceStuckSince = new HashMap <>();
189
240
190
241
@ Override
191
242
public void run () {
@@ -219,13 +270,30 @@ private String getInuseParticipantForHelixPartition(JobContext jobContext, int p
219
270
return null ;
220
271
}
221
272
273
+
274
+ private String getParticipantInGivenStateForHelixPartition (final JobContext jobContext , final int partition ,
275
+ final HashSet <TaskPartitionState > taskStates ) {
276
+ if (taskStates .contains (jobContext .getPartitionState (partition ))) {
277
+ log .info ("Helix task {} is in {} state at helix participant {}" ,
278
+ jobContext .getTaskIdForPartition (partition ), jobContext .getPartitionState (partition ),
279
+ jobContext .getAssignedParticipant (partition ));
280
+ return jobContext .getAssignedParticipant (partition );
281
+ }
282
+
283
+ return null ;
284
+ }
285
+
222
286
/**
223
287
* Iterate through the workflows configured in Helix to figure out the number of required partitions
224
288
* and request the {@link YarnService} to scale to the desired number of containers.
225
289
*/
226
290
@ VisibleForTesting
227
291
void runInternal () {
228
292
Set <String > inUseInstances = new HashSet <>();
293
+ // helixInstancesContainingStuckTasks maintains the set of helix instances/participants containing tasks that are
294
+ // stuck in any of the configured states.
295
+ final Set <String > helixInstancesContainingStuckTasks = new HashSet <>();
296
+
229
297
YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle ();
230
298
for (Map .Entry <String , WorkflowConfig > workFlowEntry : taskDriver .getWorkflows ().entrySet ()) {
231
299
WorkflowContext workflowContext = taskDriver .getWorkflowContext (workFlowEntry .getKey ());
@@ -259,6 +327,13 @@ void runInternal() {
259
327
.map (i -> getInuseParticipantForHelixPartition (jobContext , i ))
260
328
.filter (Objects ::nonNull ).collect (Collectors .toSet ()));
261
329
330
+ if (enableDetectionStuckTask ) {
331
+ // if feature is not enabled the set helixInstancesContainingStuckTasks will always be empty
332
+ helixInstancesContainingStuckTasks .addAll (jobContext .getPartitionSet ().stream ()
333
+ .map (helixPartition -> getParticipantInGivenStateForHelixPartition (jobContext , helixPartition , taskStates ))
334
+ .filter (Objects ::nonNull ).collect (Collectors .toSet ()));
335
+ }
336
+
262
337
numPartitions = jobContext .getPartitionSet ().size ();
263
338
// Job level config for helix instance tags takes precedence over other tag configurations
264
339
if (jobConfig != null ) {
@@ -286,6 +361,8 @@ void runInternal() {
286
361
// and potentially replanner-instance.
287
362
Set <String > allParticipants = HelixUtils .getParticipants (helixDataAccessor , HELIX_YARN_INSTANCE_NAME_PREFIX );
288
363
364
+ final Set <Container > containersToRelease = new HashSet <>();
365
+
289
366
// Find all joined participants not in-use for this round of inspection.
290
367
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
291
368
for (String participant : allParticipants ) {
@@ -299,27 +376,79 @@ void runInternal() {
299
376
// Remove this instance if existed in the tracking map.
300
377
instanceIdleSince .remove (participant );
301
378
}
379
+
380
+ if (helixInstancesContainingStuckTasks .contains (participant )) {
381
+ instanceStuckSince .putIfAbsent (participant , System .currentTimeMillis ());
382
+ if (isInstanceStuck (participant )) {
383
+ // release the corresponding container as the helix task is stuck for a long time
384
+ log .info ("Instance {} has some helix partition that is stuck for {} minutes, "
385
+ + "releasing the container enabled : {}" , participant ,
386
+ TimeUnit .MILLISECONDS .toMinutes (System .currentTimeMillis () - instanceStuckSince .get (participant )),
387
+ enableReleasingContainerHavingStuckTask );
388
+
389
+ // get container of the helix participant
390
+ Optional <Container > container = yarnService .getContainerInfoGivenHelixParticipant (participant );
391
+ instanceStuckSince .remove (participant );
392
+ String containerId = "" ;
393
+ if (container .isPresent ()) {
394
+ if (enableReleasingContainerHavingStuckTask ) {
395
+ containersToRelease .add (container .get ());
396
+ }
397
+ containerId = container .get ().getId ().toString ();
398
+ } else {
399
+ log .warn ("Container information for participant {} is not found" , participant );
400
+ }
401
+
402
+ if (this .yarnService .getEventSubmitter ().isPresent ()) {
403
+ // send GTE
404
+ this .yarnService .getEventSubmitter ().get ().submit (GobblinYarnEventConstants .EventNames .HELIX_PARTITION_STUCK ,
405
+ GobblinHelixConstants .HELIX_INSTANCE_NAME_KEY , participant ,
406
+ GobblinYarnMetricTagNames .CONTAINER_ID , containerId );
407
+ }
408
+ }
409
+ } else {
410
+ instanceStuckSince .remove (participant );
411
+ }
412
+ }
413
+
414
+ // release the containers
415
+ if (!containersToRelease .isEmpty ()) {
416
+ this .yarnService .getEventBus ().post (new ContainerReleaseRequest (containersToRelease , true ));
302
417
}
418
+
303
419
slidingWindowReservoir .add (yarnContainerRequestBundle );
304
420
421
+
305
422
log .debug ("There are {} containers being requested in total, tag-count map {}, tag-resource map {}" ,
306
423
yarnContainerRequestBundle .getTotalContainers (), yarnContainerRequestBundle .getHelixTagContainerCountMap (),
307
424
yarnContainerRequestBundle .getHelixTagResourceMap ());
308
425
309
426
this .yarnService .requestTargetNumberOfContainers (slidingWindowReservoir .getMax (), inUseInstances );
310
427
}
311
428
312
- @ VisibleForTesting
313
429
/**
314
430
* Return true is the condition for tagging an instance as "unused" holds.
315
431
* The condition, by default is that if an instance went back to
316
432
* active (having partition running on it) within {@link #maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
317
433
* not tag that instance as "unused" and have that as the candidate for scaling down.
318
434
*/
435
+ @ VisibleForTesting
319
436
boolean isInstanceUnused (String participant ){
320
437
return System .currentTimeMillis () - instanceIdleSince .get (participant ) >
321
438
TimeUnit .MINUTES .toMillis (maxIdleTimeInMinutesBeforeScalingDown );
322
439
}
440
+
441
+ /**
442
+ * Return true is the condition for tagging an instance as stuck.
443
+ * The condition, by default is that if a task running on an instance went back to any other state other than given
444
+ * states within {@link #maxTimeInMinutesBeforeReleasingContainerHavingStuckTask} minutes, we will
445
+ * not tag that instance as stuck and the container will not be scaled down.
446
+ */
447
+ @ VisibleForTesting
448
+ boolean isInstanceStuck (final String participant ) {
449
+ return System .currentTimeMillis () - instanceStuckSince .get (participant ) >
450
+ TimeUnit .MINUTES .toMillis (maxTimeInMinutesBeforeReleasingContainerHavingStuckTask );
451
+ }
323
452
}
324
453
325
454
/**
0 commit comments