2
2
get_faasm_worker_ips ,
3
3
get_faasm_worker_names ,
4
4
)
5
- from faasmctl .util .flush import flush_workers
6
5
from faasmctl .util .planner import (
7
6
get_in_fligh_apps as planner_get_in_fligh_apps ,
8
7
set_next_evicted_host as planner_set_next_evicted_host ,
25
24
TaskObject ,
26
25
WorkQueueItem ,
27
26
)
28
- from tasks .makespan .env import (
29
- DGEMM_DOCKER_BINARY ,
30
- DGEMM_FAASM_FUNC ,
31
- DGEMM_FAASM_USER ,
32
- get_dgemm_cmdline ,
27
+ from tasks .util .elastic import (
28
+ ELASTIC_KERNEL ,
29
+ OPENMP_ELASTIC_FUNCTION ,
30
+ OPENMP_ELASTIC_NATIVE_BINARY ,
31
+ OPENMP_ELASTIC_USER ,
32
+ get_elastic_input_data ,
33
33
)
34
34
from tasks .util .faasm import (
35
35
get_faasm_exec_time_from_json ,
36
36
has_app_failed ,
37
37
post_async_msg_and_get_result_json ,
38
38
)
39
+ from tasks .util .kernels import get_openmp_kernel_cmdline
39
40
from tasks .util .k8s import wait_for_pods as wait_for_native_mpi_pods
40
41
from tasks .util .lammps import (
41
42
LAMMPS_FAASM_USER ,
53
54
ALLOWED_BASELINES ,
54
55
EXEC_TASK_INFO_FILE_PREFIX ,
55
56
GRANNY_BASELINES ,
57
+ GRANNY_ELASTIC_BASELINES ,
56
58
GRANNY_FT_BASELINES ,
57
59
GRANNY_MIGRATE_BASELINES ,
58
60
MPI_MIGRATE_WORKLOADS ,
59
61
MPI_WORKLOADS ,
60
62
NATIVE_BASELINES ,
61
63
NATIVE_FT_BASELINES ,
64
+ OPENMP_WORKLOADS ,
62
65
SCHEDULING_INFO_FILE_PREFIX ,
63
66
get_num_cpus_per_vm_from_trace ,
64
67
get_user_id_from_task ,
@@ -255,10 +258,7 @@ def thread_print(msg):
255
258
256
259
# Choose the right data file if running a LAMMPS simulation
257
260
if work_item .task .app in MPI_WORKLOADS :
258
- # We always use the same LAMMPS benchmark ("compute-xl")
259
- # TODO: FIXME: delete me!
260
- data_file = get_faasm_benchmark ("compute" )["data" ][0 ]
261
- # data_file = get_faasm_benchmark(LAMMPS_SIM_WORKLOAD)["data"][0]
261
+ data_file = get_faasm_benchmark (LAMMPS_SIM_WORKLOAD )["data" ][0 ]
262
262
263
263
# Record the start timestamp
264
264
start_ts = 0
@@ -301,13 +301,11 @@ def thread_print(msg):
301
301
"su mpirun -c '{}'" .format (mpirun_cmd ),
302
302
]
303
303
exec_cmd = " " .join (exec_cmd )
304
- elif work_item .task .app == "omp" :
305
- # TODO(omp): should we set the parallelism level to be
306
- # min(work_item.task.size, num_slots_per_vm) ? I.e. what will
307
- # happen when we oversubscribe?
308
- openmp_cmd = "bash -c '{} {}'" .format (
309
- DGEMM_DOCKER_BINARY ,
310
- get_dgemm_cmdline (work_item .task .size ),
304
+ elif work_item .task .app in OPENMP_WORKLOADS :
305
+ openmp_cmd = "bash -c '{} {} {}'" .format (
306
+ get_elastic_input_data (native = True ),
307
+ OPENMP_ELASTIC_NATIVE_BINARY ,
308
+ get_openmp_kernel_cmdline (ELASTIC_KERNEL , work_item .task .size ),
311
309
)
312
310
313
311
exec_cmd = [
@@ -358,43 +356,40 @@ def thread_print(msg):
358
356
msg ["input_data" ] = get_lammps_migration_params (
359
357
check_every = check_every
360
358
)
361
- elif work_item .task .app == "omp" :
359
+ elif work_item .task .app in OPENMP_WORKLOADS :
362
360
if work_item .task .size > num_cpus_per_vm :
363
361
print (
364
362
"Requested OpenMP execution with more parallelism"
365
363
"than slots in the current environment:"
366
364
"{} > {}" .format (work_item .task .size , num_cpus_per_vm )
367
365
)
368
366
raise RuntimeError ("Error in OpenMP task trace!" )
369
- user = DGEMM_FAASM_USER
370
- func = "{}_{}" . format ( DGEMM_FAASM_FUNC , work_item . task . task_id )
367
+ user = OPENMP_ELASTIC_USER
368
+ func = OPENMP_ELASTIC_FUNCTION
371
369
msg = {
372
370
"user" : user ,
373
371
"function" : func ,
374
- # The input_data is the number of OMP threads
375
- "cmdline" : get_dgemm_cmdline (work_item .task .size ),
372
+ "input_data" : get_elastic_input_data (),
373
+ "cmdline" : get_openmp_kernel_cmdline (ELASTIC_KERNEL , work_item .task .size ),
374
+ "isOmp" : True ,
375
+ "ompNumThreads" : work_item .task .size ,
376
376
}
377
377
378
378
req ["user" ] = user
379
379
req ["function" ] = func
380
+ req ["singleHostHint" ] = True
381
+ req ["elasticScaleHint" ] = baseline in GRANNY_ELASTIC_BASELINES
380
382
381
- start_ts = time ()
382
383
# Post asynch request and wait for JSON result
383
- try :
384
- result_json = post_async_msg_and_get_result_json (msg , req_dict = req )
385
- actual_time = int (get_faasm_exec_time_from_json (result_json ))
386
- has_failed = has_app_failed (result_json )
387
- thread_print (
388
- "Finished executiong app {} (time: {})" .format (
389
- result_json [0 ]["appId" ], actual_time
390
- )
391
- )
392
- except RuntimeError :
393
- print ("WEE EVER HERE?? DELETE THIS CATCH" )
394
- actual_time = - 1
395
- sch_logger .error (
396
- "Error executing task {}" .format (work_item .task .task_id )
384
+ start_ts = time ()
385
+ result_json = post_async_msg_and_get_result_json (msg , req_dict = req )
386
+ actual_time = int (get_faasm_exec_time_from_json (result_json ))
387
+ has_failed = has_app_failed (result_json )
388
+ thread_print (
389
+ "Finished executiong app {} (time: {})" .format (
390
+ result_json [0 ]["appId" ], actual_time
397
391
)
392
+ )
398
393
399
394
end_ts = time ()
400
395
@@ -432,7 +427,7 @@ class SchedulerState:
432
427
# number of cpus per vm
433
428
trace_str : str
434
429
# The workload indicates the type of application we are runing. It can
435
- # either be `omp` or `mpi-migrate`, or `mpi-evict`
430
+ # either be `omp-elastic ` or `mpi-migrate` or `mpi-evict` or `mpi-spot `
436
431
workload : str
437
432
num_tasks : int
438
433
num_cpus_per_vm : int
@@ -833,16 +828,25 @@ def num_available_slots_from_vm_list(self, vm_list):
833
828
# Helper method to know if we have enough slots to schedule a task
834
829
def have_enough_slots_for_task (self , task : TaskObject ):
835
830
if self .state .baseline in NATIVE_BASELINES :
836
- # For `mpi-evict` we run a multi-tenant trace, and prevent apps
837
- # from different users from running in the same VM
838
831
if self .state .workload == "mpi-evict" :
832
+ # For `mpi-evict` we run a multi-tenant trace, and prevent apps
833
+ # from different users from running in the same VM
839
834
sorted_vms = sorted (
840
835
self .state .vm_map .items (), key = lambda item : item [1 ], reverse = True
841
836
)
842
837
843
838
pruned_vms = self .prune_node_list_from_different_users (sorted_vms , task )
844
839
845
840
return self .num_available_slots_from_vm_list (pruned_vms ) >= task .size
841
+ elif self .state .workload in OPENMP_WORKLOADS :
842
+ # For OpenMP workloads, we can only allocate them in one VM, so
843
+ # we compare the requested size with the largest capacity we
844
+ # have in one VM
845
+ sorted_vms = sorted (
846
+ self .state .vm_map .items (), key = lambda item : item [1 ], reverse = True
847
+ )
848
+
849
+ return sorted_vms [0 ][1 ] >= task .size
846
850
else :
847
851
return self .state .total_available_slots >= task .size
848
852
else :
@@ -862,6 +866,13 @@ def have_enough_slots_for_task(self, task: TaskObject):
862
866
num_evicted_vms = self .state .num_faults ,
863
867
) >= task .size
864
868
869
+ if self .state .workload in OPENMP_WORKLOADS :
870
+ return get_num_available_slots_from_in_flight_apps (
871
+ self .state .num_vms ,
872
+ self .state .num_cpus_per_vm ,
873
+ openmp = True ,
874
+ ) >= task .size
875
+
865
876
return get_num_available_slots_from_in_flight_apps (
866
877
self .state .num_vms ,
867
878
self .state .num_cpus_per_vm
@@ -898,10 +909,12 @@ def schedule_task_to_vm(
898
909
if self .state .workload == "mpi-evict" :
899
910
sorted_vms = self .prune_node_list_from_different_users (sorted_vms , task )
900
911
912
+ if self .state .workload in OPENMP_WORKLOADS :
913
+ sorted_vms = [sorted_vms [0 ]]
914
+
901
915
# For GRANNY baselines we can skip the python-side accounting as the
902
916
# planner has all the scheduling information
903
- # TODO(omp): why should it be any different with OpenMP?
904
- if self .state .baseline in NATIVE_BASELINES and task .app in MPI_WORKLOADS :
917
+ if self .state .baseline in NATIVE_BASELINES :
905
918
for vm , num_slots in sorted_vms :
906
919
# Work out how many slots can we take up in this pod
907
920
if self .state .baseline == "batch" :
@@ -935,36 +948,6 @@ def schedule_task_to_vm(
935
948
raise RuntimeError (
936
949
"Scheduling error: inconsistent scheduler state"
937
950
)
938
- """
939
- elif task.app == "omp":
940
- if len(sorted_vms) == 0:
941
- # TODO: maybe we should raise an inconsistent state error here
942
- return NOT_ENOUGH_SLOTS
943
- vm, num_slots = sorted_vms[0]
944
- if num_slots == 0:
945
- # TODO: maybe we should raise an inconsistent state error here
946
- return NOT_ENOUGH_SLOTS
947
- if self.state.baseline in NATIVE_BASELINES:
948
- if task.size > self.state.num_cpus_per_vm:
949
- print(
950
- "Overcomitting for task {} ({} > {})".format(
951
- task.task_id,
952
- task.size,
953
- self.state.num_cpus_per_vm,
954
- )
955
- )
956
- num_on_this_vm = self.state.num_cpus_per_vm
957
- else:
958
- if num_slots < task.size:
959
- return NOT_ENOUGH_SLOTS
960
- num_on_this_vm = task.size
961
-
962
- scheduling_decision.append((vm, num_on_this_vm))
963
- self.state.vm_map[vm] -= num_on_this_vm
964
- # TODO: when we overcommit, do we substract the number of cores
965
- # we occupy, or the ones we agree to run?
966
- self.state.total_available_slots -= num_on_this_vm
967
- """
968
951
969
952
# Before returning, persist the scheduling decision to state
970
953
self .state .in_flight_tasks [task .task_id ] = scheduling_decision
0 commit comments