diff --git a/config/opal_configure_options.m4 b/config/opal_configure_options.m4 index 0cfb7090418..1302c5e8460 100644 --- a/config/opal_configure_options.m4 +++ b/config/opal_configure_options.m4 @@ -544,9 +544,21 @@ fi AC_DEFINE_UNQUOTED([OPAL_ENABLE_GETPWUID], [$opal_want_getpwuid], [Disable getpwuid support (default: enabled)]) -dnl We no longer support the old OPAL_ENABLE_PROGRESS_THREADS. At -dnl some point, this should die. -AC_DEFINE([OPAL_ENABLE_PROGRESS_THREADS], - [0], - [Whether we want BTL progress threads enabled]) +# +# Disable progress threads +# +AC_MSG_CHECKING([if want asynchronous progress threads]) +AC_ARG_ENABLE([progress_threads], + [AS_HELP_STRING([--disable-progress-threads], + [Disable asynchronous progress threads (default: enabled)])]) +if test "$enable_progress_threads" = "no"; then + AC_MSG_RESULT([no]) + opal_want_progress_threads=0 +else + AC_MSG_RESULT([yes]) + opal_want_progress_threads=1 +fi +AC_DEFINE_UNQUOTED([OPAL_ENABLE_PROGRESS_THREADS], [$opal_want_progress_threads], + [Disable BTL asynchronous progress threads (default: enabled)]) + ])dnl diff --git a/ompi/instance/instance.c b/ompi/instance/instance.c index 4c28d7b69a0..24710bb5c7b 100644 --- a/ompi/instance/instance.c +++ b/ompi/instance/instance.c @@ -512,7 +512,7 @@ static int ompi_mpi_instance_init_common (int argc, char **argv) ddt_init, but before mca_coll_base_open, since some collective modules (e.g., the hierarchical coll component) may need ops in their query function. */ - if (OMPI_SUCCESS != (ret = ompi_op_base_find_available (OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) { + if (OMPI_SUCCESS != (ret = ompi_op_base_find_available (opal_async_progress_thread_spawned, ompi_mpi_thread_multiple))) { return ompi_instance_print_error ("ompi_op_base_find_available() failed", ret); } @@ -532,7 +532,7 @@ static int ompi_mpi_instance_init_common (int argc, char **argv) return ompi_instance_print_error ("mca_smsc_base_select() failed", ret); } - if (OMPI_SUCCESS != (ret = mca_pml_base_select (OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) { + if (OMPI_SUCCESS != (ret = mca_pml_base_select (opal_async_progress_thread_spawned, ompi_mpi_thread_multiple))) { return ompi_instance_print_error ("mca_pml_base_select() failed", ret); } @@ -613,11 +613,11 @@ static int ompi_mpi_instance_init_common (int argc, char **argv) return ompi_instance_print_error ("mca_pml_base_bsend_init() failed", ret); } - if (OMPI_SUCCESS != (ret = mca_coll_base_find_available (OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) { + if (OMPI_SUCCESS != (ret = mca_coll_base_find_available (opal_async_progress_thread_spawned, ompi_mpi_thread_multiple))) { return ompi_instance_print_error ("mca_coll_base_find_available() failed", ret); } - if (OMPI_SUCCESS != (ret = ompi_osc_base_find_available (OPAL_ENABLE_PROGRESS_THREADS, ompi_mpi_thread_multiple))) { + if (OMPI_SUCCESS != (ret = ompi_osc_base_find_available (opal_async_progress_thread_spawned, ompi_mpi_thread_multiple))) { return ompi_instance_print_error ("ompi_osc_base_find_available() failed", ret); } diff --git a/ompi/mpi/c/request_get_status.c b/ompi/mpi/c/request_get_status.c index f97e3af4b0b..8f3157fd728 100644 --- a/ompi/mpi/c/request_get_status.c +++ b/ompi/mpi/c/request_get_status.c @@ -45,9 +45,7 @@ static const char FUNC_NAME[] = "MPI_Request_get_status"; int MPI_Request_get_status(MPI_Request request, int *flag, MPI_Status *status) { -#if OPAL_ENABLE_PROGRESS_THREADS == 0 int do_it_once = 0; -#endif MEMCHECKER( memchecker_request(&request); @@ -63,9 +61,7 @@ int MPI_Request_get_status(MPI_Request request, int *flag, } } -#if OPAL_ENABLE_PROGRESS_THREADS == 0 recheck_request_status: -#endif opal_atomic_mb(); if( (request == MPI_REQUEST_NULL) || (request->req_state == OMPI_REQUEST_INACTIVE) ) { *flag = true; @@ -87,8 +83,8 @@ int MPI_Request_get_status(MPI_Request request, int *flag, } return MPI_SUCCESS; } -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - if( 0 == do_it_once ) { + + if( 0 == do_it_once && !opal_async_progress_thread_spawned ) { /* If we run the opal_progress then check the status of the request before leaving. We will call the opal_progress only once per call. */ @@ -96,7 +92,7 @@ int MPI_Request_get_status(MPI_Request request, int *flag, do_it_once++; goto recheck_request_status; } -#endif + *flag = false; return MPI_SUCCESS; } diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index b28ade8a67a..20ecc75f0cf 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -32,11 +32,9 @@ int ompi_request_default_test(ompi_request_t ** rptr, { ompi_request_t *request = *rptr; -#if OPAL_ENABLE_PROGRESS_THREADS == 0 int do_it_once = 0; recheck_request_status: -#endif if( request->req_state == OMPI_REQUEST_INACTIVE ) { *completed = true; if (MPI_STATUS_IGNORE != status) { @@ -81,8 +79,8 @@ int ompi_request_default_test(ompi_request_t ** rptr, return MPI_ERR_PROC_FAILED_PENDING; } #endif -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - if( 0 == do_it_once ) { + + if( 0 == do_it_once && !opal_async_progress_thread_spawned ) { /** * If we run the opal_progress then check the status of the request before * leaving. We will call the opal_progress only once per call. @@ -92,7 +90,7 @@ int ompi_request_default_test(ompi_request_t ** rptr, goto recheck_request_status; } } -#endif + *completed = false; return OMPI_SUCCESS; } @@ -163,9 +161,9 @@ int ompi_request_default_test_any( *index = MPI_UNDEFINED; if(num_requests_null_inactive != count) { *completed = false; -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - opal_progress(); -#endif + if (!opal_async_progress_thread_spawned) { + opal_progress(); + } } else { *completed = true; if (MPI_STATUS_IGNORE != status) { @@ -208,8 +206,8 @@ int ompi_request_default_test_all( return MPI_ERR_PROC_FAILED_PENDING; } #endif /* OPAL_ENABLE_FT_MPI */ -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - if (0 == do_it_once) { + + if (0 == do_it_once && !opal_async_progress_thread_spawned) { ++do_it_once; if (0 != opal_progress()) { /* continue walking the list, retest the current request */ @@ -217,7 +215,7 @@ int ompi_request_default_test_all( continue; } } -#endif /* OPAL_ENABLE_PROGRESS_THREADS */ + /* short-circuit */ break; } @@ -353,9 +351,9 @@ int ompi_request_default_test_some( *outcount = num_requests_done; if (num_requests_done == 0) { -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - opal_progress(); -#endif + if (!opal_async_progress_thread_spawned) { + opal_progress(); + } return OMPI_SUCCESS; } diff --git a/ompi/runtime/ompi_mpi_finalize.c b/ompi/runtime/ompi_mpi_finalize.c index ad8a328dc55..8e4d7e66c52 100644 --- a/ompi/runtime/ompi_mpi_finalize.c +++ b/ompi/runtime/ompi_mpi_finalize.c @@ -193,9 +193,10 @@ int ompi_mpi_finalize(void) opal_atomic_swap_32(&ompi_mpi_state, OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT); -#if OPAL_ENABLE_PROGRESS_THREADS == 0 - opal_progress_set_event_flag(OPAL_EVLOOP_ONCE | OPAL_EVLOOP_NONBLOCK); -#endif + /* shutdown async progress thread before tearing down further services */ + if (opal_async_progress_thread_spawned) { + opal_progress_shutdown_async_progress_thread(); + } /* NOTE: MPI-2.1 requires that MPI_FINALIZE is "collective" across *all* connected processes. This only means that all processes diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c index 19c0999d163..cc539aa3b5f 100644 --- a/ompi/runtime/ompi_mpi_init.c +++ b/ompi/runtime/ompi_mpi_init.c @@ -522,16 +522,16 @@ int ompi_mpi_init(int argc, char **argv, int requested, int *provided, time if so, then start the clock again */ OMPI_TIMING_NEXT("barrier"); -#if OPAL_ENABLE_PROGRESS_THREADS == 0 /* Start setting up the event engine for MPI operations. Don't block in the event library, so that communications don't take forever between procs in the dynamic code. This will increase CPU utilization for the remainder of MPI_INIT when we are blocking on RTE-level events, but may greatly reduce non-TCP latency. */ - int old_event_flags = opal_progress_set_event_flag(0); - opal_progress_set_event_flag(old_event_flags | OPAL_EVLOOP_NONBLOCK); -#endif + if (!opal_async_progress_thread_spawned) { + int old_event_flags = opal_progress_set_event_flag(0); + opal_progress_set_event_flag(old_event_flags | OPAL_EVLOOP_NONBLOCK); + } /* wire up the mpi interface, if requested. Do this after the non-block switch for non-TCP performance. Do before the diff --git a/opal/mca/btl/smcuda/btl_smcuda.c b/opal/mca/btl/smcuda/btl_smcuda.c index c4389d422f2..ad2537a0c12 100644 --- a/opal/mca/btl/smcuda/btl_smcuda.c +++ b/opal/mca/btl/smcuda/btl_smcuda.c @@ -487,7 +487,7 @@ static struct mca_btl_base_endpoint_t *create_sm_endpoint(int local_proc, struct OBJ_CONSTRUCT(&ep->endpoint_lock, opal_mutex_t); #if OPAL_ENABLE_PROGRESS_THREADS == 1 sprintf(path, "%s" OPAL_PATH_SEP "sm_fifo.%lu", opal_process_info.job_session_dir, - (unsigned long) proc->proc_name); + (unsigned long) proc->proc_name.vpid); ep->fifo_fd = open(path, O_WRONLY); if (ep->fifo_fd < 0) { opal_output(0, "mca_btl_smcuda_add_procs: open(%s) failed with errno=%d\n", path, errno); diff --git a/opal/mca/btl/smcuda/btl_smcuda_component.c b/opal/mca/btl/smcuda/btl_smcuda_component.c index a3662e2db0e..774607d7168 100644 --- a/opal/mca/btl/smcuda/btl_smcuda_component.c +++ b/opal/mca/btl/smcuda/btl_smcuda_component.c @@ -859,7 +859,7 @@ mca_btl_smcuda_component_init(int *num_btls, bool enable_progress_threads, bool #if OPAL_ENABLE_PROGRESS_THREADS == 1 /* create a named pipe to receive events */ sprintf(mca_btl_smcuda_component.sm_fifo_path, "%s" OPAL_PATH_SEP "sm_fifo.%lu", - opal_process_info.job_session_dir, (unsigned long) OPAL_PROC_MY_NAME->vpid); + opal_process_info.job_session_dir, (unsigned long) OPAL_PROC_MY_NAME.vpid); if (mkfifo(mca_btl_smcuda_component.sm_fifo_path, 0660) < 0) { opal_output(0, "mca_btl_smcuda_component_init: mkfifo failed with errno=%d\n", errno); return NULL; diff --git a/opal/runtime/opal_params_core.h b/opal/runtime/opal_params_core.h index b90d3fc2961..3c1f515d9f1 100644 --- a/opal/runtime/opal_params_core.h +++ b/opal/runtime/opal_params_core.h @@ -59,6 +59,7 @@ OPAL_DECLSPEC extern int opal_initialized; OPAL_DECLSPEC extern bool opal_built_with_cuda_support; OPAL_DECLSPEC extern bool opal_built_with_rocm_support; OPAL_DECLSPEC extern bool opal_built_with_ze_support; +OPAL_DECLSPEC extern bool opal_async_progress_thread_spawned; /** * * Whether we want to enable CUDA GPU buffer send and receive support. diff --git a/opal/runtime/opal_progress.c b/opal/runtime/opal_progress.c index 7bca660b9d6..ddeda52d847 100644 --- a/opal/runtime/opal_progress.c +++ b/opal/runtime/opal_progress.c @@ -50,6 +50,32 @@ bool opal_progress_debug = false; static int opal_progress_event_flag = OPAL_EVLOOP_ONCE | OPAL_EVLOOP_NONBLOCK; int opal_progress_spin_count = 10000; +/* track whether the async progress thread was spawned */ +/* default=disabled. Enabled with runtime env var OPAL_ASYNC_PROGRESS=1 */ +bool opal_async_progress_thread_spawned = false; + +#if OPAL_ENABLE_PROGRESS_THREADS == 1 +/* async progress thread info & args */ +typedef struct thread_args_s { + /* number of events reported. + * This is updated by the async progress thread, to be read and resetted + * by the application threads */ + opal_atomic_int64_t nb_events_reported; + + /* should continue running ? */ + volatile bool running; +} thread_args_t; + +/* async progress thread routine */ +static void *opal_progress_async_thread_engine(opal_object_t *obj); + +static opal_thread_t opal_progress_async_thread; +static thread_args_t thread_arg = { + .nb_events_reported = 0, + .running = false +}; +#endif + /* * Local variables */ @@ -119,9 +145,29 @@ static void opal_progress_finalize(void) opal_atomic_unlock(&progress_lock); } +void opal_progress_shutdown_async_progress_thread(void) +{ +#if OPAL_ENABLE_PROGRESS_THREADS == 1 + if (opal_async_progress_thread_spawned) { + /* shutdown the async thread */ + thread_arg.running = false; + int err = opal_thread_join(&opal_progress_async_thread, NULL); + if (OPAL_SUCCESS == err) { + opal_set_using_threads(false); + opal_async_progress_thread_spawned = false; + } else { + OPAL_OUTPUT( + (debug_output, "progress: Failed to join async progress thread: err=%d", err)); + } + } +#endif +} + /* init the progress engine - called from orte_init */ int opal_progress_init(void) { + int err = OPAL_SUCCESS; + /* reentrant issues */ opal_atomic_lock_init(&progress_lock, OPAL_ATOMIC_LOCK_UNLOCKED); @@ -155,6 +201,41 @@ int opal_progress_init(void) callbacks_lp[i] = fake_cb; } +#if OPAL_ENABLE_PROGRESS_THREADS == 1 + // check if the env var OMPI_ASYNC_PROGRESS or OPAL_ASYNC_PROGRESS is set + // at runtime. + char *evar_ompi = getenv("OMPI_ASYNC_PROGRESS"); + char *evar_opal = getenv("OPAL_ASYNC_PROGRESS"); + // prefer OMPI_ASYNC_PROGRESS to OPAL_ASYNC_PROGRESS + char *evar = evar_ompi ? evar_ompi : evar_opal; + + const bool opal_async_progress_enabled = evar ? (atoi(evar) != 0) : false; + + if (opal_async_progress_enabled && !opal_async_progress_thread_spawned) { + /* prepare the thread */ + thread_arg.nb_events_reported = 0; + thread_arg.running = true; + OBJ_CONSTRUCT(&opal_progress_async_thread, opal_thread_t); + opal_progress_async_thread.t_run = opal_progress_async_thread_engine; + opal_progress_async_thread.t_arg = &thread_arg; + + /* optimistic setting for asynchronism here, but we know that these might + still be changed by other *init() routines :( */ + opal_progress_set_yield_when_idle(true); + opal_progress_set_event_flag(opal_progress_event_flag | OPAL_EVLOOP_NONBLOCK); + + err = opal_thread_start(&opal_progress_async_thread); + if (OPAL_SUCCESS == err) { + opal_set_using_threads(true); + opal_async_progress_thread_spawned = true; + } else { + thread_arg.running = false; + OPAL_OUTPUT( + (debug_output, "progress: Failed to start async progress thread: err=%d", err)); + } + } +#endif + OPAL_OUTPUT( (debug_output, "progress: initialized event flag to: %x", opal_progress_event_flag)); OPAL_OUTPUT((debug_output, "progress: initialized yield_when_idle to: %s", @@ -165,7 +246,7 @@ int opal_progress_init(void) opal_finalize_register_cleanup(opal_progress_finalize); - return OPAL_SUCCESS; + return err; } static int opal_progress_events(void) @@ -213,7 +294,7 @@ static int opal_progress_events(void) * care, as the cost of that happening is far outweighed by the cost * of the if checks (they were resulting in bad pipe stalling behavior) */ -int opal_progress(void) +static int _opal_progress(void) { static uint32_t num_calls = 0; size_t i; @@ -224,13 +305,17 @@ int opal_progress(void) events += (callbacks[i])(); } - /* Run low priority callbacks and events once every 8 calls to opal_progress(). + /* Run low priority callbacks and events once every calls to opal_progress(). * Even though "num_calls" can be modified by multiple threads, we do not use * atomic operations here, for performance reasons. In case of a race, the * number of calls may be inaccurate, but since it will eventually be incremented, * it's not a problem. + * If opal_async_progress_thread_spawned == false, then N = 8 + * otherwise let's pick N = 256 for the moment (George's recommendation) and + * adapt it later if it takes too many resources. */ - if (((num_calls++) & 0x7) == 0) { + const uint32_t mod = opal_async_progress_thread_spawned ? 0xFF : 0x7; + if (((num_calls++) & mod) == 0) { for (i = 0; i < callbacks_lp_len; ++i) { events += (callbacks_lp[i])(); } @@ -254,6 +339,40 @@ int opal_progress(void) return events; } +#if OPAL_ENABLE_PROGRESS_THREADS == 1 +static void *opal_progress_async_thread_engine(opal_object_t *obj) +{ + opal_thread_t *current_thread = (opal_thread_t *) obj; + thread_args_t *p_thread_arg = (thread_args_t *) current_thread->t_arg; + + while (p_thread_arg->running) { + const int64_t new_events = _opal_progress(); + opal_atomic_add_fetch_64(&p_thread_arg->nb_events_reported, new_events); + } + + return OPAL_THREAD_CANCELLED; +} +#endif + +int opal_progress(void) +{ +#if OPAL_ENABLE_PROGRESS_THREADS == 1 + if (opal_async_progress_thread_spawned) { + /* async progress thread alongside may has processed new events, + * atomically read and reset nb_events_reported to zero. + */ + return opal_atomic_swap_64(&thread_arg.nb_events_reported, 0); + } else { +#endif + + /* no async progress thread, call the normal progress routine like before */ + return _opal_progress(); + +#if OPAL_ENABLE_PROGRESS_THREADS == 1 + } +#endif +} + int opal_progress_set_event_flag(int flag) { int tmp = opal_progress_event_flag; diff --git a/opal/runtime/opal_progress.h b/opal/runtime/opal_progress.h index 86a0f8add50..d6a782330d8 100644 --- a/opal/runtime/opal_progress.h +++ b/opal/runtime/opal_progress.h @@ -142,7 +142,7 @@ OPAL_DECLSPEC void opal_progress_set_event_poll_rate(int microseconds); * * Prototype for the a progress function callback. Progress function * callbacks can be registered with opal_progress_register() and - * deregistered with opal_progress_deregister(). It should be noted + * deregistered with opal_progress_unregister(). It should be noted * that either registering or deregistering a function callback is an * extraordinarily expensive operation and should not be used for * potentially short callback lifetimes. @@ -195,6 +195,9 @@ static inline bool opal_progress_spin(volatile bool *complete) return false; } +/* shutdown the async progress thread. Do nothing if disabled */ +OPAL_DECLSPEC void opal_progress_shutdown_async_progress_thread(void); + END_C_DECLS #endif