@@ -189,7 +189,6 @@ def mark_test_complete(
189
189
self .check_schedule (node , duration = duration )
190
190
191
191
def mark_test_pending (self , item : str ) -> None :
192
-
193
192
assert self .collection is not None
194
193
self .pending .insert (
195
194
0 ,
@@ -205,7 +204,9 @@ def remove_pending_tests_from_node(
205
204
) -> None :
206
205
raise NotImplementedError ()
207
206
208
- def check_schedule (self , node : WorkerController , duration : float = 0 , from_dsession : bool = False ) -> None :
207
+ def check_schedule (
208
+ self , node : WorkerController , duration : float = 0 , from_dsession : bool = False
209
+ ) -> None :
209
210
"""Maybe schedule new items on the node.
210
211
211
212
If there are any globally pending nodes left then this will
@@ -214,7 +215,9 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
214
215
heuristic to influence how many tests the node is assigned.
215
216
"""
216
217
if node .shutting_down :
217
- self .report_line (f"[-] [csg] { node .workerinput ['workerid' ]} is already shutting down" )
218
+ self .report_line (
219
+ f"[-] [csg] { node .workerinput ['workerid' ]} is already shutting down"
220
+ )
218
221
return
219
222
220
223
if self .pending :
@@ -227,18 +230,25 @@ def check_schedule(self, node: WorkerController, duration: float = 0, from_dsess
227
230
if self .pending_groups :
228
231
dist_group_key = self .pending_groups .pop (0 )
229
232
dist_group = self .dist_groups [dist_group_key ]
230
- nodes = cycle (self .nodes [0 :dist_group ['group_workers' ]])
231
- schedule_log : dict [str , Any ] = {n .gateway .id :[] for n in self .nodes [0 :dist_group ['group_workers' ]]}
232
- for _ in range (len (dist_group ['test_indices' ])):
233
+ nodes = cycle (self .nodes [0 : dist_group ["group_workers" ]])
234
+ schedule_log : dict [str , Any ] = {
235
+ n .gateway .id : []
236
+ for n in self .nodes [0 : dist_group ["group_workers" ]]
237
+ }
238
+ for _ in range (len (dist_group ["test_indices" ])):
233
239
n = next (nodes )
234
- #needs cleaner way to be identified
235
- tests_per_node = self .dist_groups [dist_group_key ]['pending_indices' ][:1 ]
240
+ # needs cleaner way to be identified
241
+ tests_per_node = self .dist_groups [dist_group_key ][
242
+ "pending_indices"
243
+ ][:1 ]
236
244
schedule_log [n .gateway .id ].extend (tests_per_node )
237
245
238
246
self ._send_tests_group (n , 1 , dist_group_key )
239
247
del self .dist_groups [dist_group_key ]
240
- message = (f"\n [-] [csg] check_schedule: processed scheduling for { dist_group_key } :"
241
- f" { ' ' .join ([f'{ nid } ({ len (nt )} )' for nid ,nt in schedule_log .items ()])} " )
248
+ message = (
249
+ f"\n [-] [csg] check_schedule: processed scheduling for { dist_group_key } :"
250
+ f" { ' ' .join ([f'{ nid } ({ len (nt )} )' for nid ,nt in schedule_log .items ()])} "
251
+ )
242
252
self .report_line (message )
243
253
244
254
else :
@@ -310,26 +320,28 @@ def schedule(self) -> None:
310
320
311
321
if self .is_first_time :
312
322
for i , test in enumerate (self .collection ):
313
- if '@' in test :
314
- group_mark = test .split ('@' )[- 1 ]
315
- group_workers = int (group_mark .split ('_' )[- 1 ])
323
+ if "@" in test :
324
+ group_mark = test .split ("@" )[- 1 ]
325
+ group_workers = int (group_mark .split ("_" )[- 1 ])
316
326
if group_workers > len (self .nodes ):
317
327
# We can only distribute across as many nodes as we have available
318
328
# If a group requests more, we fallback to our actual max
319
329
group_workers = len (self .nodes )
320
330
else :
321
- group_mark = ' default'
331
+ group_mark = " default"
322
332
group_workers = len (self .nodes )
323
- existing_tests = dist_groups .get (group_mark , {}).get (' tests' , [])
333
+ existing_tests = dist_groups .get (group_mark , {}).get (" tests" , [])
324
334
existing_tests .append (test )
325
- existing_indices = dist_groups .get (group_mark , {}).get ('test_indices' , [])
335
+ existing_indices = dist_groups .get (group_mark , {}).get (
336
+ "test_indices" , []
337
+ )
326
338
existing_indices .append (i )
327
339
328
340
dist_groups [group_mark ] = {
329
- ' tests' : existing_tests ,
330
- ' group_workers' : group_workers ,
331
- ' test_indices' : existing_indices ,
332
- ' pending_indices' : existing_indices
341
+ " tests" : existing_tests ,
342
+ " group_workers" : group_workers ,
343
+ " test_indices" : existing_indices ,
344
+ " pending_indices" : existing_indices ,
333
345
}
334
346
self .dist_groups = dist_groups
335
347
self .pending_groups = list (dist_groups .keys ())
@@ -342,17 +354,21 @@ def schedule(self) -> None:
342
354
return
343
355
dist_group_key = self .pending_groups .pop (0 )
344
356
dist_group = self .dist_groups [dist_group_key ]
345
- nodes = cycle (self .nodes [0 :dist_group ['group_workers' ]])
346
- schedule_log : dict [str , Any ] = {n .gateway .id : [] for n in self .nodes [0 :dist_group ['group_workers' ]]}
347
- for _ in range (len (dist_group ['test_indices' ])):
357
+ nodes = cycle (self .nodes [0 : dist_group ["group_workers" ]])
358
+ schedule_log : dict [str , Any ] = {
359
+ n .gateway .id : [] for n in self .nodes [0 : dist_group ["group_workers" ]]
360
+ }
361
+ for _ in range (len (dist_group ["test_indices" ])):
348
362
n = next (nodes )
349
363
# needs cleaner way to be identified
350
- tests_per_node = self .dist_groups [dist_group_key ][' pending_indices' ][:1 ]
364
+ tests_per_node = self .dist_groups [dist_group_key ][" pending_indices" ][:1 ]
351
365
schedule_log [n .gateway .id ].extend (tests_per_node )
352
366
self ._send_tests_group (n , 1 , dist_group_key )
353
367
del self .dist_groups [dist_group_key ]
354
- message = ("\n [-] [csg] schedule: processed scheduling for "
355
- f"{ dist_group_key } : { ' ' .join ([f'{ nid } ({ len (nt )} )' for nid , nt in schedule_log .items ()])} " )
368
+ message = (
369
+ "\n [-] [csg] schedule: processed scheduling for "
370
+ f"{ dist_group_key } : { ' ' .join ([f'{ nid } ({ len (nt )} )' for nid , nt in schedule_log .items ()])} "
371
+ )
356
372
self .report_line (message )
357
373
358
374
def _send_tests (self , node : WorkerController , num : int ) -> None :
@@ -362,16 +378,17 @@ def _send_tests(self, node: WorkerController, num: int) -> None:
362
378
self .node2pending [node ].extend (tests_per_node )
363
379
node .send_runtest_some (tests_per_node )
364
380
365
- def _send_tests_group (self , node : WorkerController , num : int , dist_group_key : str ) -> None :
366
- tests_per_node = self .dist_groups [dist_group_key ]['pending_indices' ][:num ]
381
+ def _send_tests_group (
382
+ self , node : WorkerController , num : int , dist_group_key : str
383
+ ) -> None :
384
+ tests_per_node = self .dist_groups [dist_group_key ]["pending_indices" ][:num ]
367
385
if tests_per_node :
368
- del self .dist_groups [dist_group_key ][' pending_indices' ][:num ]
386
+ del self .dist_groups [dist_group_key ][" pending_indices" ][:num ]
369
387
for test_index in tests_per_node :
370
388
self .pending .remove (test_index )
371
389
self .node2pending [node ].extend (tests_per_node )
372
390
node .send_runtest_some (tests_per_node )
373
391
374
-
375
392
def _check_nodes_have_same_collection (self ) -> bool :
376
393
"""Return True if all nodes have collected the same items.
377
394
0 commit comments