1919def getAWSJsonOutput (cmd ):
2020 process = subprocess .Popen (cmd .split (), stdout = subprocess .PIPE )
2121 out , err = process .communicate ()
22- requestInfo = json .loads (out )
22+ try :
23+ requestInfo = json .loads (out )
24+ except ValueError as e :
25+ if str (e ) == "No JSON object could be decoded" :
26+ requestInfo = out
27+ else :
28+ requestInfo = out + err
2329 return requestInfo
2430
2531def loadConfig (configFile ):
@@ -114,6 +120,17 @@ def removeClusterIfUnused(clusterName):
114120 if sum ([result ['clusters' ][0 ]['pendingTasksCount' ],result ['clusters' ][0 ]['runningTasksCount' ],result ['clusters' ][0 ]['activeServicesCount' ]])== 0 :
115121 cmd = 'aws ecs delete-cluster --cluster ' + clusterName
116122 result = getAWSJsonOutput (cmd )
123+
124+ def downscaleSpotFleet (queue , spotFleetID ):
125+ visible , nonvisible = queue .returnLoad ()
126+ if visible > 0 :
127+ return
128+ else :
129+ cmd = 'aws ec2 describe-spot-fleet-instances --spot-fleet-request-id ' + spotFleetID
130+ status = getAWSJsonOutput (cmd )
131+ if nonvisible < len (status ['ActiveInstances' ]):
132+ cmd = "aws ec2 modify-spot-fleet-request --excess-capacity-termination-policy NoTermination --target-capacity " + str (nonvisible )+ " --spot-fleet-request-id " + spotFleetID
133+ result = getAWSJsonOutput (cmd )
117134
118135#################################
119136# CLASS TO HANDLE SQS QUEUE
@@ -149,6 +166,12 @@ def pendingLoad(self):
149166 else :
150167 return False
151168
169+ def returnLoad (self ):
170+ self .queue .load ()
171+ visible = int ( self .queue .attributes ['ApproximateNumberOfMessages' ] )
172+ nonVis = int ( self .queue .attributes ['ApproximateNumberOfMessagesNotVisible' ] )
173+ return visible , nonVis
174+
152175
153176#################################
154177# SERVICE 1: SUBMIT JOB
@@ -278,11 +301,17 @@ def monitor():
278301 # Step 1: Create job and count messages periodically
279302 queue = JobQueue (name = queueId )
280303 while queue .pendingLoad ():
281- #Once a day check for terminated machines and delete their alarms.
282- #These records are only kept for 48 hours , which is why we don't just do it at the end
304+ #Once an hour check for terminated machines and delete their alarms.
305+ #This is slooooooow , which is why we don't just do it at the end
283306 curtime = datetime .datetime .now ().strftime ('%H%M' )
284- if curtime == '1200 ' :
307+ if curtime [ - 2 :] == '00 ' :
285308 killdeadAlarms (fleetId ,monitorapp )
309+ #Once every 10 minutes, check if all jobs are in process, and if so scale the spot fleet size to match
310+ #the number of jobs still in process WITHOUT force terminating them.
311+ #This can help keep costs down if, for example, you start up 100+ machines to run a large job, and
312+ #1-10 jobs with errors are keeping it rattling around for hours.
313+ if curtime [- 1 :]== '9' :
314+ downscaleSpotFleet (queue , fleetId )
286315 time .sleep (MONITOR_TIME )
287316
288317 # Step 2: When no messages are pending, stop service
@@ -309,6 +338,7 @@ def monitor():
309338 for eachinstance in result ['ActiveInstances' ]:
310339 delalarm = 'aws cloudwatch delete-alarms --alarm-name ' + monitorapp + '_' + eachinstance ["InstanceId" ]
311340 subprocess .Popen (delalarm .split ())
341+ time .sleep (3 )
312342 killdeadAlarms (fleetId ,monitorapp )
313343
314344 # Step 4: Read spot fleet id and terminate all EC2 instances
0 commit comments