Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
#include "pmix.h"

/* for use when we don't have a PMIx that supports CID generation */
opal_atomic_int64_t ompi_comm_next_base_cid = 1;
opal_atomic_int64_t ompi_comm_next_base_cid = OPAL_ATOMIC_INIT(1);

/* A macro comparing two CIDs */
#define OMPI_COMM_CID_IS_LOWER(comm1,comm2) ( ((comm1)->c_index < (comm2)->c_index)? 1:0)
Expand Down
5 changes: 3 additions & 2 deletions ompi/communicator/comm_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include "comm_request.h"
#include "opal/sys/atomic.h"

#include "opal/class/opal_free_list.h"
#include "opal/include/opal/sys/atomic.h"
Expand Down Expand Up @@ -109,7 +110,7 @@ int ompi_comm_request_schedule_append_w_flags(ompi_comm_request_t *request, ompi
static int ompi_comm_request_progress (void)
{
ompi_comm_request_t *request, *next;
static opal_atomic_int32_t progressing = 0;
static opal_atomic_int32_t progressing = OPAL_ATOMIC_INIT(0);
int completed = 0;

/* don't allow re-entry */
Expand Down Expand Up @@ -175,7 +176,7 @@ static int ompi_comm_request_progress (void)
}

opal_mutex_unlock (&ompi_comm_request_mutex);
progressing = 0;
opal_atomic_store_32(&progressing, 0);

return completed;
}
Expand Down
20 changes: 12 additions & 8 deletions ompi/communicator/ft/comm_ft_detector.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "opal/mca/threads/threads.h"

#include "ompi/runtime/params.h"
#include "opal/sys/atomic.h"
#include "ompi/communicator/communicator.h"
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ static opal_event_base_t* fd_event_base = NULL;
static void fd_event_cb(int fd, short flags, void* pdetector);

static bool comm_detector_use_thread = false;
static opal_atomic_int32_t fd_thread_active = 0;
static opal_atomic_int32_t fd_thread_active = OPAL_ATOMIC_INIT(0);
static opal_thread_t fd_thread;
static void* fd_progress(opal_object_t* obj);

Expand Down Expand Up @@ -168,8 +169,9 @@ int ompi_comm_failure_detector_init(void) {
fd_thread.t_arg = NULL;
ret = opal_thread_start(&fd_thread);
if( OPAL_SUCCESS != ret ) goto cleanup;
while( 0 == fd_thread_active ); /* wait for the fd thread initialization */
if( 0 > fd_thread_active ) goto cleanup;
while (0 == opal_atomic_load_32(&fd_thread_active)) { /* wait for the fd thread initialization */
}
if (0 > opal_atomic_load_32(&fd_thread_active)) goto cleanup;
}

return OMPI_SUCCESS;
Expand Down Expand Up @@ -218,18 +220,19 @@ int ompi_comm_failure_detector_finalize(void) {
#endif
while( observing == detector->hb_observing ) {
/* If observed process changed, recheck if local*/
if( !(0 < fd_thread_active) )
if (!(0 < opal_atomic_load_32(&fd_thread_active)))
{
opal_progress();
}
}
}

if( 0 < fd_thread_active ) {
if (0 < opal_atomic_load_32(&fd_thread_active)) {
void* tret;
/* this is not a race condition. Accesses are serialized, we use the
* atomic for the mfence part of it. */
OPAL_THREAD_ADD_FETCH32(&fd_thread_active, -fd_thread_active);
int32_t active = opal_atomic_load_32(&fd_thread_active);
OPAL_THREAD_ADD_FETCH32(&fd_thread_active, -active);
opal_event_base_loopbreak(fd_event_base);
opal_thread_join(&fd_thread, &tret);
}
Expand Down Expand Up @@ -587,9 +590,10 @@ void* fd_progress(opal_object_t* obj) {
return OPAL_THREAD_CANCELLED;
}
OPAL_THREAD_ADD_FETCH32(&fd_thread_active, 1);
while( 1 == fd_thread_active ); /* wait for init stage 2: start_detector */
while (1 == opal_atomic_load_32(&fd_thread_active)) { /* wait for init stage 2: start_detector */
}
ret = MCA_PML_CALL(irecv(NULL, 0, MPI_BYTE, 0, MCA_COLL_BASE_TAG_FT_END, &ompi_mpi_comm_self.comm, &req));
while( fd_thread_active ) {
while (opal_atomic_load_32(&fd_thread_active)) {
opal_event_loop(fd_event_base, OPAL_EVLOOP_ONCE);
#if 0
/* This test disabled because rdma emulation over TCP would not work without
Expand Down
3 changes: 2 additions & 1 deletion ompi/communicator/ft/comm_ft_reliable_bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ompi/mca/pml/pml.h"
#include "ompi/mca/bml/bml.h"
#include "ompi/mca/bml/base/base.h"
#include "opal/sys/atomic.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/base/coll_tags.h"

Expand Down Expand Up @@ -208,7 +209,7 @@ static void ompi_comm_rbcast_bml_recv_cb(
* that we keep receiving messages after we deregistered the type.
* Any other time, this is indicative of a problem.
*/
assert(ompi_mpi_state >= OMPI_MPI_STATE_FINALIZE_STARTED);
assert(opal_atomic_load_32(&ompi_mpi_state) >= OMPI_MPI_STATE_FINALIZE_STARTED);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ompi/errhandler/errhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ extern opal_atomic_int32_t ompi_instance_count;
*/
#define OMPI_ERR_INIT_FINALIZE(name) \
{ \
if (OPAL_UNLIKELY(0 == ompi_instance_count)) { \
if (OPAL_UNLIKELY(0 == opal_atomic_load_32(&ompi_instance_count))) { \
ompi_errhandler_invoke(NULL, NULL, -1, \
ompi_errcode_get_mpi_code(MPI_ERR_ARG), \
name); \
Expand Down
2 changes: 1 addition & 1 deletion ompi/errhandler/errhandler_invoke.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int ompi_errhandler_invoke(ompi_errhandler_t *errhandler, void *mpi_object,
/* If we got no errorhandler, then route the error to the appropriate
* predefined error handler */
if (NULL == errhandler) {
int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (state >= OMPI_MPI_STATE_INIT_COMPLETED &&
state < OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
comm = (ompi_mpi_compat_mpi3)? &ompi_mpi_comm_world.comm: &ompi_mpi_comm_self.comm;
Expand Down
4 changes: 2 additions & 2 deletions ompi/errhandler/errhandler_predefined.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void ompi_mpi_errors_return_instance_handler (struct ompi_instance_t **instance,
static void out(char *str, char *arg)
{
if (ompi_rte_initialized &&
ompi_mpi_state < OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
opal_atomic_load_32(&ompi_mpi_state) < OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
if (NULL != arg) {
opal_output(0, str, arg);
} else {
Expand Down Expand Up @@ -400,7 +400,7 @@ static void backend_abort_no_aggregate(int fatal, char *type,
{
char *arg;

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
assert(state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT);

Expand Down
2 changes: 1 addition & 1 deletion ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum {
OMPI_INSTANCE_FINALIZING = -2,
};

opal_atomic_int32_t ompi_instance_count = 0;
opal_atomic_int32_t ompi_instance_count = OPAL_ATOMIC_INIT(0);

static const char *ompi_instance_builtin_psets[] = {
"mpi://WORLD",
Expand Down
3 changes: 2 additions & 1 deletion ompi/mca/coll/hcoll/coll_hcoll_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
#include "hcoll/api/hcoll_constants.h"
#include "coll_hcoll_dtypes.h"
#include "hcoll/api/hcoll_dte.h"
#include "opal/sys/atomic.h"
int mca_coll_hcoll_barrier(struct ompi_communicator_t *comm,
mca_coll_base_module_t *module){
int rc;
mca_coll_hcoll_module_t *hcoll_module = (mca_coll_hcoll_module_t*)module;
HCOL_VERBOSE(20,"RUNNING HCOL BARRIER");

if (OPAL_UNLIKELY(ompi_mpi_state >= OMPI_MPI_STATE_FINALIZE_STARTED)) {
if (OPAL_UNLIKELY(opal_atomic_load_32(&ompi_mpi_state) >= OMPI_MPI_STATE_FINALIZE_STARTED)) {
HCOL_VERBOSE(5, "In finalize, reverting to previous barrier");
goto orig_barrier;
}
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/common/monitoring/common_monitoring.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

/*** Monitoring specific variables ***/
/* Keep tracks of how many components are currently using the common part */
static opal_atomic_int32_t mca_common_monitoring_hold = 0;
static opal_atomic_int32_t mca_common_monitoring_hold = OPAL_ATOMIC_INIT(0);
/* Output parameters */
int mca_common_monitoring_output_stream_id = -1;
static opal_output_stream_t mca_common_monitoring_output_stream_obj = {
Expand Down
7 changes: 4 additions & 3 deletions ompi/mca/common/ompio/common_ompio_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
#include "opal/mca/allocator/base/base.h"
#include "common_ompio.h"
#include "common_ompio_buffer.h"
#include "opal/sys/atomic.h"


static opal_mutex_t mca_common_ompio_buffer_mutex; /* lock for thread safety */
static mca_allocator_base_component_t* mca_common_ompio_allocator_component=NULL;
static mca_allocator_base_module_t* mca_common_ompio_allocator=NULL;

static opal_atomic_int32_t mca_common_ompio_buffer_init = 0;
static opal_atomic_int32_t mca_common_ompio_buffer_init = OPAL_ATOMIC_INIT(0);
static int32_t mca_common_ompio_pagesize=4096;
static void* mca_common_ompio_buffer_alloc_seg ( void *ctx, size_t *size );
static void mca_common_ompio_buffer_free_seg ( void *ctx, void *buf );
Expand Down Expand Up @@ -145,7 +146,7 @@ void *mca_common_ompio_alloc_buf ( ompio_file_t *fh, size_t bufsize )
{
char *tmp=NULL;

if ( !mca_common_ompio_buffer_init ){
if (!opal_atomic_load_32(&mca_common_ompio_buffer_init)){
mca_common_ompio_buffer_alloc_init ();
}

Expand All @@ -159,7 +160,7 @@ void *mca_common_ompio_alloc_buf ( ompio_file_t *fh, size_t bufsize )
void mca_common_ompio_release_buf ( ompio_file_t *fh, void *buf )
{

if ( !mca_common_ompio_buffer_init ){
if (!opal_atomic_load_32(&mca_common_ompio_buffer_init)){
/* Should not happen. You can not release a buf without
** having it allocated first.
*/
Expand Down
3 changes: 2 additions & 1 deletion ompi/mca/osc/monitoring/osc_monitoring_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "ompi/info/info.h"
#include "ompi/win/win.h"
#include "ompi/mca/osc/osc.h"
#include "opal/sys/atomic.h"

/* Define once and for all the module_template variable name */
#define OMPI_OSC_MONITORING_MODULE_VARIABLE(template) \
Expand Down Expand Up @@ -50,7 +51,7 @@
OSC_MONITORING_SET_TEMPLATE_FCT_NAME(template) (ompi_osc_base_module_t*module) \
{ \
/* Define the ompi_osc_monitoring_module_## template ##_init_done variable */ \
opal_atomic_int32_t init_done = 0; \
opal_atomic_int32_t init_done = OPAL_ATOMIC_INIT(0); \
/* Define and set the ompi_osc_monitoring_## template \
* ##_template variable. The functions recorded here are \
* linked to the original functions of the original \
Expand Down
5 changes: 3 additions & 2 deletions ompi/mca/osc/ucx/osc_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "ompi/mca/osc/osc.h"
#include "ompi/mca/osc/base/base.h"
#include "opal/sys/atomic.h"
#include "ompi/mca/osc/base/osc_base_obj_convert.h"
#include "opal/mca/common/ucx/common_ucx.h"

Expand Down Expand Up @@ -358,8 +359,8 @@ static int component_finalize(void) {
}
opal_common_ucx_wpool_free(mca_osc_ucx_component.wpool);

assert(opal_common_ucx_ep_counts == 0);
assert(opal_common_ucx_unpacked_rkey_counts == 0);
assert(opal_atomic_load_64(&opal_common_ucx_ep_counts) == 0);
assert(opal_atomic_load_64(&opal_common_ucx_unpacked_rkey_counts) == 0);
return OMPI_SUCCESS;
}

Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/pml/base/pml_base_bsend.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static size_t mca_pml_bsend_size; /* adjusted size of user buffe
static size_t mca_pml_bsend_count; /* number of outstanding requests */
static size_t mca_pml_bsend_pagesz; /* mmap page size */
static int mca_pml_bsend_pagebits; /* number of bits in pagesz */
static opal_atomic_int32_t mca_pml_bsend_init = 0;
static opal_atomic_int32_t mca_pml_bsend_init = OPAL_ATOMIC_INIT(0);

/* defined in pml_base_open.c */
extern char *ompi_pml_base_bsend_allocator_name;
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/pml/base/pml_base_sendreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static void mca_pml_base_send_request_destruct(mca_pml_base_send_request_t* req)
#if MPI_VERSION >= 4
int mca_pml_cancel_send_callback(struct ompi_request_t *request, int flag)
{
static opal_atomic_int32_t send_deprecate_count = 0;
static opal_atomic_int32_t send_deprecate_count = OPAL_ATOMIC_INIT(0);
int32_t val;

val = opal_atomic_add_fetch_32(&send_deprecate_count, 1);
Expand Down
3 changes: 2 additions & 1 deletion ompi/mca/pml/ob1/pml_ob1.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "ompi/errhandler/errhandler.h"
#include "opal/mca/pmix/pmix-internal.h"
#include "ompi/runtime/ompi_spc.h"
#include "opal/sys/atomic.h"

#include "pml_ob1.h"
#include "pml_ob1_component.h"
Expand Down Expand Up @@ -908,7 +909,7 @@ void mca_pml_ob1_error_handler(
* termination. Lets simply ignore such errors after MPI is not supposed to
* be operational anyway.
*/
if(ompi_mpi_state >= OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
if (opal_atomic_load_32(&ompi_mpi_state) >= OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/pml/ob1/pml_ob1_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static inline int mca_pml_ob1_process_pending_accelerator_async_copies(void)
return count;
}

static opal_atomic_int32_t mca_pml_ob1_progress_needed = 0;
static opal_atomic_int32_t mca_pml_ob1_progress_needed = OPAL_ATOMIC_INIT(0);
int mca_pml_ob1_enable_progress(int32_t count)
{
int32_t progress_count = OPAL_ATOMIC_ADD_FETCH32(&mca_pml_ob1_progress_needed, count);
Expand Down
2 changes: 1 addition & 1 deletion ompi/mpi/c/finalized.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ PROTOTYPE ERROR_CLASS finalized(INT_OUT flag)
{
ompi_hook_base_mpi_finalized_top(flag);

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);

if (MPI_PARAM_CHECK) {
if (NULL == flag) {
Expand Down
2 changes: 1 addition & 1 deletion ompi/mpi/c/get_library_version.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PROTOTYPE ERROR_CLASS get_library_version(STRING_OUT version, INT_OUT resultlen)
(i.e., use a NULL communicator, which will end up at the
default errhandler, which is abort). */

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (state >= OMPI_MPI_STATE_INIT_COMPLETED &&
state < OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG,
Expand Down
2 changes: 1 addition & 1 deletion ompi/mpi/c/get_version.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ PROTOTYPE ERROR_CLASS get_version(INT_OUT version, INT_OUT subversion)
(i.e., use a NULL communicator, which will end up at the
default errhandler, which is abort). */

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (state >= OMPI_MPI_STATE_INIT_COMPLETED &&
state < OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) {
return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_ARG,
Expand Down
2 changes: 1 addition & 1 deletion ompi/mpi/c/initialized.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ PROTOTYPE ERROR_CLASS initialized(INT_OUT flag)
{
ompi_hook_base_mpi_initialized_top(flag);

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);

if (MPI_PARAM_CHECK) {
if (NULL == flag) {
Expand Down
3 changes: 2 additions & 1 deletion ompi/mpi/tool/finalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ompi/mpi/tool/mpit-internal.h"

#include "ompi/runtime/ompi_info_support.h"
#include "opal/sys/atomic.h"
#include "opal/include/opal/sys/atomic.h"
#include "opal/runtime/opal.h"

Expand All @@ -40,7 +41,7 @@ int MPI_T_finalize (void)
if (0 == --ompi_mpit_init_count) {
(void) ompi_info_close_components ();

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if ((state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_PAST_COMM_SELF_DESTRUCT) &&
(NULL != ompi_mpi_main_thread)) {
Expand Down
3 changes: 2 additions & 1 deletion ompi/peruse/peruse.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ompi/peruse/peruse.h"
#include "ompi/peruse/peruse-internal.h"
#include "ompi/communicator/communicator.h"
#include "opal/sys/atomic.h"
#include "ompi/runtime/params.h"

/*
Expand Down Expand Up @@ -65,7 +66,7 @@ const int PERUSE_num_events = (sizeof(PERUSE_events) / sizeof(peruse_event_assoc
int PERUSE_Init (void)
{
if (MPI_PARAM_CHECK) {
int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
return PERUSE_ERR_INIT;
Expand Down
2 changes: 1 addition & 1 deletion ompi/request/grequestx.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

static bool requests_initialized = false;
static opal_list_t requests;
static opal_atomic_int32_t active_requests = 0;
static opal_atomic_int32_t active_requests = OPAL_ATOMIC_INIT(0);
static bool in_progress = false;
static opal_mutex_t lock = OPAL_MUTEX_STATIC_INIT;

Expand Down
2 changes: 1 addition & 1 deletion ompi/runtime/ompi_mpi_abort.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ ompi_mpi_abort(struct ompi_communicator_t* comm,

/* If the RTE isn't setup yet/any more, then don't even try
killing everyone. Sorry, Charlie... */
int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (!ompi_rte_initialized) {
fprintf(stderr, "[%s:%05d] Local abort %s completed successfully, but am not able to aggregate error messages, and not able to guarantee that all other processes were killed!\n",
host, (int) pid,
Expand Down
2 changes: 1 addition & 1 deletion ompi/runtime/ompi_mpi_finalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int ompi_mpi_finalize(void)

ompi_hook_base_mpi_finalize_top();

int32_t state = ompi_mpi_state;
int32_t state = opal_atomic_load_32(&ompi_mpi_state);
if (state < OMPI_MPI_STATE_INIT_COMPLETED ||
state >= OMPI_MPI_STATE_FINALIZE_STARTED) {
/* Note that if we're not initialized or already finalized, we
Expand Down
Loading
Loading