diff --git a/ompi/mca/part/persist/part_persist.h b/ompi/mca/part/persist/part_persist.h index ccc8f8f1971..25fd2981952 100644 --- a/ompi/mca/part/persist/part_persist.h +++ b/ompi/mca/part/persist/part_persist.h @@ -66,16 +66,14 @@ struct ompi_part_persist_t { int32_t next_recv_tag; ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */ ompi_request_t *part_comm_req; - int32_t part_comm_ready; ompi_communicator_t *part_comm_setup; /* We create a second communicator to send set-up messages (rational: these messages go in the opposite direction of normal messages, need to use MPI_ANY_SOURCE to support different communicators, and thus need to have a unique tag. Because tags are controlled by the sender in this model, we cannot assume that the tag will be unused in part_comm. */ ompi_request_t *part_comm_sreq; - int32_t part_comm_sready; - int32_t init_comms; int32_t init_world; + int32_t init_world_step; int32_t my_world_rank; /* Because the back end communicators use a world rank, we need to communicate ours to set up the requests. */ opal_atomic_int32_t block_entry; @@ -105,6 +103,7 @@ mca_part_persist_free_req(struct mca_part_persist_request_t* req) if( MCA_PART_PERSIST_REQUEST_PRECV == req->req_type ) { MCA_PART_PERSIST_PRECV_REQUEST_RETURN(req); } else { + free(req->part_ready); MCA_PART_PERSIST_PSEND_REQUEST_RETURN(req); } return err; @@ -138,12 +137,12 @@ __opal_attribute_always_inline__ static inline void mca_part_persist_complete(struct mca_part_persist_request_t* request) { if(MCA_PART_PERSIST_REQUEST_PRECV == request->req_type) { - request->req_ompi.req_status.MPI_SOURCE = request->req_peer; + request->req_ompi.req_status.MPI_SOURCE = request->req_peer; } else { request->req_ompi.req_status.MPI_SOURCE = request->req_comm->c_my_rank; } request->req_ompi.req_complete_cb = NULL; - request->req_ompi.req_status.MPI_TAG = request->req_tag; + request->req_ompi.req_status.MPI_TAG = request->req_tag; request->req_ompi.req_status._ucount = request->req_bytes; request->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; request->req_part_complete = true; @@ -159,15 +158,15 @@ __opal_attribute_always_inline__ static inline int mca_part_persist_progress(void) { mca_part_persist_list_t *current; - int err; + int err = OMPI_SUCCESS; + int completed = 0; size_t i; /* prevent re-entry, */ int block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), 1); - if(1 < block_entry) - { + if(1 < block_entry) { block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1); - return OMPI_SUCCESS; + return completed; } OPAL_THREAD_LOCK(&ompi_part_persist.lock); @@ -175,62 +174,53 @@ mca_part_persist_progress(void) mca_part_persist_request_t* to_delete = NULL; /* Don't do anything till a function in the module is called. */ - if(-1 == ompi_part_persist.init_world) - { - OPAL_THREAD_UNLOCK(&ompi_part_persist.lock); - block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1); - return OMPI_SUCCESS; + if(-1 == ompi_part_persist.init_world) { + goto end_part_progress; } /* Can't do anything if we don't have world */ if(0 == ompi_part_persist.init_world) { - ompi_part_persist.my_world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); - err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm, &ompi_part_persist.part_comm_req); - if(err != OMPI_SUCCESS) { - exit(-1); - } - ompi_part_persist.part_comm_ready = 0; - err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm_setup, &ompi_part_persist.part_comm_sreq); - if(err != OMPI_SUCCESS) { - exit(-1); - } - ompi_part_persist.part_comm_sready = 0; - ompi_part_persist.init_world = 1; - - OPAL_THREAD_UNLOCK(&ompi_part_persist.lock); - block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1); - return OMPI_SUCCESS; - } - - /* Check to see if Comms are setup */ - if(0 == ompi_part_persist.init_comms) { - if(0 == ompi_part_persist.part_comm_ready) { - ompi_request_test(&ompi_part_persist.part_comm_req, &ompi_part_persist.part_comm_ready, MPI_STATUS_IGNORE); - } - if(0 == ompi_part_persist.part_comm_sready) { - ompi_request_test(&ompi_part_persist.part_comm_sreq, &ompi_part_persist.part_comm_sready, MPI_STATUS_IGNORE); + int done = 0; + switch (ompi_part_persist.init_world_step) { + case 0: + ompi_part_persist.my_world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm, &ompi_part_persist.part_comm_req); + done = 1; + break; + case 1: + err = ompi_request_test(&ompi_part_persist.part_comm_req, &done, MPI_STATUS_IGNORE); + break; + case 2: + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist.part_comm_setup, &ompi_part_persist.part_comm_sreq); + done = 1; + break; + case 3: + err = ompi_request_test(&ompi_part_persist.part_comm_sreq, &done, MPI_STATUS_IGNORE); + break; + default: + ompi_part_persist.init_world = 1; + break; } - if(0 != ompi_part_persist.part_comm_ready && 0 != ompi_part_persist.part_comm_sready) { - ompi_part_persist.init_comms = 1; - } - OPAL_THREAD_UNLOCK(&ompi_part_persist.lock); - block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1); - return OMPI_SUCCESS; + ompi_part_persist.init_world_step += 0 != done; + completed += 0 != done; + goto end_part_progress; } OPAL_LIST_FOREACH(current, ompi_part_persist.progress_list, mca_part_persist_list_t) { mca_part_persist_request_t *req = (mca_part_persist_request_t *) current->item; - /* Check to see if request is initilaized */ + /* Check to see if request is initialized */ if(false == req->initialized) { - int done = 0; - + int done = 0; + if(true == req->flag_post_setup_recv) { err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, OMPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist.part_comm_setup, &req->setup_req[1])); + if(OMPI_SUCCESS != err) goto end_part_progress; req->flag_post_setup_recv = false; - } - - ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE); + } + + err = ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE); + if(OMPI_SUCCESS != err) goto end_part_progress; if(done) { size_t dt_size_; @@ -238,108 +228,132 @@ mca_part_persist_progress(void) if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) { /* parse message */ - req->world_peer = req->setup_info[1].world_rank; + req->world_peer = req->setup_info[1].world_rank; err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); - if(OMPI_SUCCESS != err) return OMPI_ERROR; - dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_; + if(OMPI_SUCCESS != err) goto end_part_progress; + if(dt_size_ > (size_t) UINT_MAX) { + err = OMPI_ERR_UNKNOWN_DATA_TYPE; + goto end_part_progress; + } + dt_size = (uint32_t) dt_size_; uint32_t bytes = req->real_count * dt_size; /* Set up persistent sends */ req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); for(i = 0; i < req->real_parts; i++) { - void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); - err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm, &(req->persist_reqs[i]))); - } + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm, &(req->persist_reqs[i]))); + if(OMPI_SUCCESS != err) goto end_part_progress; + } } else { /* parse message */ - req->world_peer = req->setup_info[1].world_rank; + req->world_peer = req->setup_info[1].world_rank; req->my_send_tag = req->setup_info[1].start_tag; req->my_recv_tag = req->setup_info[1].setup_tag; req->real_parts = req->setup_info[1].num_parts; req->real_count = req->setup_info[1].count; req->real_dt_size = req->setup_info[1].dt_size; - err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); - if(OMPI_SUCCESS != err) return OMPI_ERROR; - dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_; + if(OMPI_SUCCESS != err) goto end_part_progress; + if(dt_size_ > (size_t) UINT_MAX) { + err = OMPI_ERR_UNKNOWN_DATA_TYPE; + goto end_part_progress; + } + dt_size = (uint32_t) dt_size_; uint32_t bytes = req->real_count * dt_size; - - - /* Set up persistent sends */ - req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); - req->flags = (int*) calloc(req->real_parts,sizeof(int)); - + /* Set up persistent receives */ + req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*req->real_parts); + req->flags = (mca_part_persist_partition_state_t*) malloc(sizeof(mca_part_persist_partition_state_t)*req->real_parts); + for(i = 0; i < req->real_parts; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; + } if(req->real_dt_size == dt_size) { - - for(i = 0; i < req->real_parts; i++) { + for(i = 0; i < req->real_parts; i++) { void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); err = MCA_PML_CALL(irecv_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist.part_comm, &(req->persist_reqs[i]))); + if(OMPI_SUCCESS != err) goto end_part_progress; } } else { for(i = 0; i < req->real_parts; i++) { void *buf = ((void*) (((char*)req->req_addr) + (req->real_count * req->real_dt_size * i))); err = MCA_PML_CALL(irecv_init(buf, req->real_count * req->real_dt_size, MPI_BYTE, req->world_peer, req->my_send_tag+i, ompi_part_persist.part_comm, &(req->persist_reqs[i]))); + if(OMPI_SUCCESS != err) goto end_part_progress; } - } - err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0]))); + } + err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0]))); + if(OMPI_SUCCESS != err) goto end_part_progress; /* Send back a message */ req->setup_info[0].world_rank = ompi_part_persist.my_world_rank; err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, req->world_peer, req->my_recv_tag, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm_setup, &req->setup_req[0])); - if(OMPI_SUCCESS != err) return OMPI_ERROR; + if(OMPI_SUCCESS != err) goto end_part_progress; } - req->initialized = true; + completed++; + req->initialized = true; } } else { if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) { - for(i = 0; i < req->real_parts; i++) { - - /* Check to see if partition is queued for being started. Only applicable to sends. */ - if(-2 == req->flags[i]) { - err = req->persist_reqs[i]->req_start(1, (&(req->persist_reqs[i]))); - req->flags[i] = 0; + int part_done; + size_t done_count = 0; + for(i = 0; i < req->real_parts; i++) { + /* Letting 'MPI_Pready' change 'req->flags' directly would lead to concurrency + * issues, therefore 'req->part_ready' acts as a proxy for thread safety. + * 'req->part_ready' is only relevant for send requests. */ + if(NULL != req->part_ready && 0 == req->part_ready[i]) continue; + + /* Check to see if partition is queued for being started. Only applicable to sends. */ + if(MCA_PART_PERSIST_PARTITION_QUEUED == req->flags[i]) { + err = req->persist_reqs[i]->req_start(1, &req->persist_reqs[i]); + if(OMPI_SUCCESS != err) goto end_part_progress; + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; } - if(0 == req->flags[i]) - { - ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE); - if(0 != req->flags[i]) req->done_count++; + if(MCA_PART_PERSIST_PARTITION_STARTED == req->flags[i] && OMPI_REQUEST_ACTIVE == req->persist_reqs[i]->req_state) { + err = ompi_request_test(&(req->persist_reqs[i]), &part_done, MPI_STATUS_IGNORE); + if(OMPI_SUCCESS != err) goto end_part_progress; + if(0 != part_done) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_COMPLETE; + } } + + done_count += MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i]; } /* Check for completion and complete the requests */ - if(req->done_count == req->real_parts) - { + if(done_count == req->real_parts) { req->first_send = false; mca_part_persist_complete(req); - } + completed++; + } } if(true == req->req_free_called && true == req->req_part_complete && REQUEST_COMPLETED == req->req_ompi.req_complete && OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) { to_delete = req; } } + } + + if(NULL != to_delete && OPAL_SUCCESS == err) { + err = mca_part_persist_free_req(to_delete); + } +end_part_progress: + if(OMPI_SUCCESS != err) { + ompi_rte_abort(err, "part progress internal failure"); } OPAL_THREAD_UNLOCK(&ompi_part_persist.lock); block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist.block_entry), -1); - if(to_delete) { - err = mca_part_persist_free_req(to_delete); - if (OMPI_SUCCESS != err) { - return OMPI_ERROR; - } - } - return OMPI_SUCCESS; + return completed; } __opal_attribute_always_inline__ static inline int mca_part_persist_precv_init(void *buf, - size_t parts, + size_t parts, size_t count, ompi_datatype_t * datatype, int src, @@ -349,8 +363,7 @@ mca_part_persist_precv_init(void *buf, struct ompi_request_t **request) { int err = OMPI_SUCCESS; - size_t dt_size_; - uint32_t dt_size; + size_t dt_size; mca_part_persist_list_t* new_progress_elem = NULL; mca_part_persist_precv_request_t *recvreq; @@ -370,20 +383,21 @@ mca_part_persist_precv_init(void *buf, mca_part_persist_request_t *req = (mca_part_persist_request_t *) recvreq; - /* Set lazy initializion flags */ + /* Set lazy initialization flags */ req->initialized = false; - req->first_send = true; + req->first_send = true; req->flag_post_setup_recv = false; req->flags = NULL; + req->part_ready = NULL; /* Non-blocking receive on setup info */ err = MCA_PML_CALL(irecv(&req->setup_info[1], sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, src, tag, comm, &req->setup_req[1])); if(OMPI_SUCCESS != err) return OMPI_ERROR; /* Compute total number of bytes */ - err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size); if(OMPI_SUCCESS != err) return OMPI_ERROR; - dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_; - req->req_bytes = parts * count * dt_size; + if(dt_size > (size_t) UINT_MAX) return OMPI_ERR_UNKNOWN_DATA_TYPE; + req->req_bytes = parts * count * ((uint32_t) dt_size); /* Set ompi request initial values */ req->req_ompi.req_persistent = true; @@ -394,7 +408,7 @@ mca_part_persist_precv_init(void *buf, /* Add element to progress engine */ new_progress_elem = OBJ_NEW(mca_part_persist_list_t); new_progress_elem->item = req; - req->progress_elem = new_progress_elem; + req->progress_elem = new_progress_elem; OPAL_THREAD_LOCK(&ompi_part_persist.lock); opal_list_append(ompi_part_persist.progress_list, (opal_list_item_t*)new_progress_elem); OPAL_THREAD_UNLOCK(&ompi_part_persist.lock); @@ -416,7 +430,7 @@ mca_part_persist_psend_init(const void* buf, ompi_request_t** request) { int err = OMPI_SUCCESS; - size_t dt_size_; + size_t dt_size_, i; uint32_t dt_size; mca_part_persist_list_t* new_progress_elem = NULL; mca_part_persist_psend_request_t *sendreq; @@ -437,17 +451,17 @@ mca_part_persist_psend_init(const void* buf, /* Set lazy initialization variables */ req->initialized = false; req->first_send = true; - /* Determine total bytes to send. */ err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); if(OMPI_SUCCESS != err) return OMPI_ERROR; - dt_size = (dt_size_ > (size_t) UINT_MAX) ? MPI_UNDEFINED : (uint32_t) dt_size_; + if(dt_size_ > (size_t) UINT_MAX) return OMPI_ERR_UNKNOWN_DATA_TYPE; + dt_size = (uint32_t) dt_size_; req->req_bytes = parts * count * dt_size; /* non-blocking send set-up data */ req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); - req->setup_info[0].start_tag = ompi_part_persist.next_send_tag; ompi_part_persist.next_send_tag += parts; + req->setup_info[0].start_tag = ompi_part_persist.next_send_tag; ompi_part_persist.next_send_tag += parts; req->my_send_tag = req->setup_info[0].start_tag; req->setup_info[0].setup_tag = ompi_part_persist.next_recv_tag; ompi_part_persist.next_recv_tag++; req->my_recv_tag = req->setup_info[0].setup_tag; @@ -457,13 +471,18 @@ mca_part_persist_psend_init(const void* buf, req->real_count = count; req->setup_info[0].dt_size = dt_size; - req->flags = (int*) calloc(req->real_parts, sizeof(int)); + req->flags = (mca_part_persist_partition_state_t*) malloc(sizeof(mca_part_persist_partition_state_t) * req->real_parts); + req->part_ready = (int32_t*) malloc(sizeof(int32_t) * req->real_parts); + for (i = 0; i < req->real_parts; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; + req->part_ready[i] = 0; + } err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, dst, tag, MCA_PML_BASE_SEND_STANDARD, comm, &req->setup_req[0])); if(OMPI_SUCCESS != err) return OMPI_ERROR; /* Non-blocking receive on setup info */ - if(1 == ompi_part_persist.init_comms) { + if(1 == ompi_part_persist.init_world) { err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, MPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist.part_comm_setup, &req->setup_req[1])); if(OMPI_SUCCESS != err) return OMPI_ERROR; req->flag_post_setup_recv = false; @@ -471,7 +490,7 @@ mca_part_persist_psend_init(const void* buf, req->flag_post_setup_recv = true; } - /* Initilaize completion variables */ + /* Initialize completion variables */ sendreq->req_base.req_ompi.req_persistent = true; req->req_part_complete = true; req->req_ompi.req_complete = REQUEST_COMPLETED; @@ -503,30 +522,29 @@ mca_part_persist_start(size_t count, ompi_request_t** requests) if(false == req->first_send) { if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) { - req->done_count = 0; - memset((void*)req->flags,0,sizeof(uint32_t)*req->real_parts); + for(i = 0; i < req->real_parts; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; + req->part_ready[i] = 0; + } } else { - req->done_count = 0; err = req->persist_reqs[0]->req_start(req->real_parts, req->persist_reqs); - memset((void*)req->flags,0,sizeof(uint32_t)*req->real_parts); - } - } else { - if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) { - req->done_count = 0; - for(i = 0; i < req->real_parts && OMPI_SUCCESS == err; i++) { - req->flags[i] = -1; + for(i = 0; i < req->real_parts; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; } - } else { - req->done_count = 0; } - } - req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; + } else if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) { + for(i = 0; i < req->real_parts; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_QUEUED; + req->part_ready[i] = 0; + } + } + req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; req->req_ompi.req_status.MPI_TAG = MPI_ANY_TAG; req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; req->req_ompi.req_status._cancelled = 0; req->req_part_complete = false; req->req_ompi.req_complete = false; - OPAL_ATOMIC_SWAP_PTR(&req->req_ompi.req_complete, REQUEST_PENDING); + OPAL_ATOMIC_SWAP_PTR(&req->req_ompi.req_complete, REQUEST_PENDING); } return err; @@ -541,17 +559,17 @@ mca_part_persist_pready(size_t min_part, size_t i; mca_part_persist_request_t *req = (mca_part_persist_request_t *)(request); - if(true == req->initialized) - { - err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, (&(req->persist_reqs[min_part]))); - for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { - req->flags[i] = 0; /* Mark partition as ready for testing */ + if(MCA_PART_PERSIST_REQUEST_PSEND != req->req_type) { + err = OMPI_ERROR; + } else { + if(true == req->initialized) { + err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, &req->persist_reqs[min_part]); + for(i = min_part; i <= max_part; i++) { + req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED; + } } - } - else - { - for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { - req->flags[i] = -2; /* Mark partition as queued */ + for(i = min_part; i <= max_part; i++) { + req->part_ready[i] = 1; } } return err; @@ -560,7 +578,7 @@ mca_part_persist_pready(size_t min_part, __opal_attribute_always_inline__ static inline int mca_part_persist_parrived(size_t min_part, size_t max_part, - int* flag, + int* flag, ompi_request_t* request) { int err = OMPI_SUCCESS; @@ -568,25 +586,29 @@ mca_part_persist_parrived(size_t min_part, int _flag = false; mca_part_persist_request_t *req = (mca_part_persist_request_t *)request; - if(0 != req->flags) { + if(MCA_PART_PERSIST_REQUEST_PRECV != req->req_type) { + err = OMPI_ERROR; + } else if(OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) { + _flag = 1; + } else if(0 != req->flags) { _flag = 1; if(req->req_parts == req->real_parts) { for(i = min_part; i <= max_part; i++) { - _flag = _flag && req->flags[i]; + _flag &= MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i]; } } else { float convert = ((float)req->real_parts) / ((float)req->req_parts); size_t _min = floor(convert * min_part); size_t _max = ceil(convert * max_part); for(i = _min; i <= _max; i++) { - _flag = _flag && req->flags[i]; + _flag &= MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i]; } } } - if(!_flag) { + if(!_flag && OMPI_SUCCESS == err) { opal_progress(); - } + } *flag = _flag; return err; } @@ -610,4 +632,4 @@ mca_part_persist_free(ompi_request_t** request) END_C_DECLS -#endif /* PART_PERSIST_H_HAS_BEEN_INCLUDED */ +#endif /* PART_PERSIST_H */ diff --git a/ompi/mca/part/persist/part_persist_component.c b/ompi/mca/part/persist/part_persist_component.c index 919284476b9..a0f474ff348 100644 --- a/ompi/mca/part/persist/part_persist_component.c +++ b/ompi/mca/part/persist/part_persist_component.c @@ -98,11 +98,8 @@ mca_part_persist_component_open(void) mca_part_persist_init_lists(); - ompi_part_persist.init_comms = 0; ompi_part_persist.init_world = -1; - - ompi_part_persist.part_comm_ready = 0; - ompi_part_persist.part_comm_ready = 0; + ompi_part_persist.init_world_step = 0; ompi_part_persist.block_entry = 0; return OMPI_SUCCESS; diff --git a/ompi/mca/part/persist/part_persist_request.h b/ompi/mca/part/persist/part_persist_request.h index ba55c6bd920..02e902675f2 100644 --- a/ompi/mca/part/persist/part_persist_request.h +++ b/ompi/mca/part/persist/part_persist_request.h @@ -44,6 +44,11 @@ struct ompi_mca_persist_setup_t { size_t count; }; +typedef enum { + MCA_PART_PERSIST_PARTITION_STARTED = 0, /* partition request started. The default after initialization, or after MPI_Start for receive requests */ + MCA_PART_PERSIST_PARTITION_COMPLETE = 1, /* partition request complete */ + MCA_PART_PERSIST_PARTITION_QUEUED = 2 /* next progress loop will start the partition request (send only) */ +} mca_part_persist_partition_state_t; /** * Base type for PART PERSIST requests @@ -89,10 +94,10 @@ struct mca_part_persist_request_t { int32_t initialized; /**< flag for initialized state */ int32_t first_send; /**< flag for whether the first send has happened */ - int32_t flag_post_setup_recv; - size_t done_count; /**< counter for the number of partitions marked ready */ + int32_t flag_post_setup_recv; - int32_t *flags; /**< array of flags to determine whether a partition has arrived */ + mca_part_persist_partition_state_t* flags; /**< the state of each partition */ + int32_t *part_ready; /**< readiness flag of each partition, NULL for receive requests */ struct ompi_mca_persist_setup_t setup_info[2]; /**< Setup info to send during initialization. */