diff --git a/.github/metrics/clone_history.csv b/.github/metrics/clone_history.csv deleted file mode 100644 index 8b1378917..000000000 --- a/.github/metrics/clone_history.csv +++ /dev/null @@ -1 +0,0 @@ - diff --git a/.github/workflows/traffic.yml b/.github/workflows/traffic.yml deleted file mode 100644 index 33229a73d..000000000 --- a/.github/workflows/traffic.yml +++ /dev/null @@ -1,59 +0,0 @@ -name: Track Traffic -on: - schedule: - - cron: "0 0 * * *" # Runs at 00:00, every day - workflow_dispatch: # Or can manually run -permissions: - contents: write - actions: read -jobs: - capture-traffic: - runs-on: ubuntu-latest - - steps: - # Checkout the track-traffic branch - - name: Checkout repo - uses: actions/checkout@v3 - with: - ref: track-traffic - - - name: Get yesterday's clone traffic - id: clones - env: - GH_TOKEN: ${{ secrets.TRAFFIC_SECRET }} - run: | - # yesterday's clones have date = `yesterday` - YESTERDAY=$(date -I -d "yesterday") - - # get all traffic from yesterday - DAILY_JSON=$(gh api repos/${GITHUB_REPOSITORY}/traffic/clones \ - --jq ".clones[] | select(.timestamp | startswith(\"$YESTERDAY\"))") - - # get total clone count and unique clone count - DAILY_COUNT=$(echo "$DAILY_JSON" | jq -r '.count // 0') - DAILY_UNIQUES=$(echo "$DAILY_JSON" | jq -r '.uniques // 0') - - # Output variables date, count, and uniques to next step - echo "date=$YESTERDAY" >> $GITHUB_OUTPUT - echo "count=$DAILY_COUNT" >> $GITHUB_OUTPUT - echo "uniques=$DAILY_UNIQUES" >> $GITHUB_OUTPUT - - - name: Append to clone_history.csv - run: | - # Create csv on first run - if [ ! -f .github/metrics/clone_history.csv ]; then - echo "date,daily_clones,daily_unique_cloners" > .github/metrics/clone_history.csv - fi - - # Append yesterday’s entry to file - echo "${{ steps.clones.outputs.date }},${{ steps.clones.outputs.count }},${{ steps.clones.outputs.uniques }}" >> .github/metrics/clone_history.csv - - - name: Commit and push - run: | - git config --local user.email "actions@github.com" - git config --local user.name "GitHub Actions" - - git add .github/metrics/clone_history.csv - git commit -m "Daily clone stats (${{ steps.clones.outputs.date }})" || echo "No changes" - git push origin HEAD:track-traffic - diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 7bd8d4c24..fcc7a687a 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -7,6 +7,7 @@ list(APPEND CPP_SOURCES "alltoallv_crs.cpp") #list(APPEND CPP_SOURCES "microbenchmarks.cpp") list(APPEND CPP_SOURCES "p2p_alltoall.cpp") list(APPEND CPP_SOURCES "p2p_alltoallv.cpp") +list(APPEND CPP_SOURCES "reorder_msgs.cpp") ## But some need special requirements -- removed until necessary parameters obtained. # if(USE_GPU) diff --git a/benchmarks/reorder_msgs.cpp b/benchmarks/reorder_msgs.cpp new file mode 100644 index 000000000..de64c168d --- /dev/null +++ b/benchmarks/reorder_msgs.cpp @@ -0,0 +1,357 @@ +#include +#include +#include + +#include "locality_aware.h" +#include "par_binary_IO.hpp" +#include "sparse_mat.hpp" + +void spmv(Mat& A, + std::vector& x, + std::vector& b, + double alpha, + double beta) +{ + for (int i = 0; i < A.n_rows; i++) + { + double sum = 0; + for (int j = A.rowptr[i]; j < A.rowptr[i+1]; j++) + sum += A.data[j] * x[A.col_idx[j]]; + b[i] = alpha*sum + beta*b[i]; + } +} + +void comm_init(ParMat& A, + std::vector& sendbuf, + std::vector& recvbuf, + std::vector& send_order, + std::vector& recv_order, + std::vector& send_req, + std::vector& recv_req, + MPIL_Comm* xcomm) +{ + int idx; + int tag; + MPIL_Comm_tag(xcomm, &tag); + + // Start communication + for (int i = 0; i < A.recv_comm.n_msgs; i++) + { + idx = recv_order[i]; + MPI_Irecv(&(recvbuf[A.recv_comm.ptr[idx]]), + A.recv_comm.counts[idx], + MPI_DOUBLE, + A.recv_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &(recv_req[i])); + } + for (int i = 0; i < A.send_comm.n_msgs; i++) + { + idx = send_order[i]; + MPI_Isend(&(sendbuf[A.send_comm.ptr[idx]]), + A.send_comm.counts[idx], + MPI_DOUBLE, + A.send_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &(send_req[i])); + } +} + +void par_spmv(ParMat& A, + std::vector& x, + std::vector& b, + std::vector& sendbuf, + std::vector& recvbuf, + std::vector& send_order, + std::vector& recv_order, + std::vector& send_req, + std::vector& recv_req, + MPIL_Comm* xcomm) +{ + int idx; + + // Pack sendbuf + for (int i = 0; i < A.send_comm.size_msgs; i++) + sendbuf[i] = x[A.send_comm.idx[i]]; + + comm_init(A, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + + // Fully local SpMV + spmv(A.on_proc, x, b, 1.0, 0.0); + + // Wait for communication to finish + MPI_Waitall(recv_req.size(), recv_req.data(), MPI_STATUSES_IGNORE); + MPI_Waitall(send_req.size(), send_req.data(), MPI_STATUSES_IGNORE); + + // Off-process SpMV + spmv(A.off_proc, recvbuf, b, 1.0, 1.0); +} + +double time_par_spmv(ParMat& A, + std::vector& x, + std::vector& b, + std::vector& sendbuf, + std::vector& recvbuf, + std::vector& send_order, + std::vector& recv_order, + std::vector& send_req, + std::vector& recv_req, + MPIL_Comm* xcomm) +{ + double t0, tfinal; + int n_iter = 1; + + // Warm-Up + par_spmv(A, x, b, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + + // Time single iteration + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + par_spmv(A, x, b, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + tfinal = MPI_Wtime() - t0; + MPI_Allreduce(&tfinal, &t0, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); + + if (t0 < 0.1) + { + n_iter = 10; + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + for (int i = 0; i < n_iter; i++) + par_spmv(A, x, b, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + tfinal = (MPI_Wtime() - t0) / n_iter; + MPI_Allreduce(&tfinal, &t0, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); + n_iter = 1.0 / t0; + } + + // Actual Timer + MPI_Barrier(MPI_COMM_WORLD); + t0 = MPI_Wtime(); + for (int i = 0; i < n_iter; i++) + par_spmv(A, x, b, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + tfinal = (MPI_Wtime() - t0) / n_iter; + MPI_Allreduce(&tfinal, &t0, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); + + return t0; +} + + +void compare(std::vector& recvbuf_std, + std::vector& recvbuf_new) +{ + for (int i = 0; i < recvbuf_std.size(); i++) + { + if (std::isnan(recvbuf_std[i]) && std::isnan(recvbuf_new[i])) + continue; + if (recvbuf_std[i] != recvbuf_new[i]) + { + fprintf(stderr, "Difference at position %d, xorig %e, xnew %e\n", + i, recvbuf_std[i], recvbuf_new[i]); + MPI_Abort(MPI_COMM_WORLD, -1); + } + } +} + +void reorder_recvs(ParMat& A, + std::vector& sendbuf, + std::vector& recvbuf, + std::vector& send_order, + std::vector& recv_order, + std::vector& send_req, + std::vector& recv_req, + MPIL_Comm* xcomm) +{ + int num_procs; + MPI_Comm_size(MPI_COMM_WORLD, &num_procs); + std::vector procs_to_idx(num_procs); + for (int i = 0; i < A.recv_comm.n_msgs; i++) + procs_to_idx[A.recv_comm.procs[i]] = i; + + int idx; + int tag; + MPIL_Comm_tag(xcomm, &tag); + + // Start sends + for (int i = 0; i < A.send_comm.n_msgs; i++) + { + idx = send_order[i]; + MPI_Isend(&(sendbuf[A.send_comm.ptr[idx]]), + A.send_comm.counts[idx], + MPI_DOUBLE, + A.send_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &(send_req[i])); + } + + MPI_Status status; + for (int i = 0; i < recv_req.size(); i++) + { + MPI_Probe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); + int proc = status.MPI_SOURCE; + idx = procs_to_idx[proc]; + MPI_Recv(&(recvbuf[A.recv_comm.ptr[idx]]), + A.recv_comm.counts[idx], + MPI_DOUBLE, + A.recv_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &status); + recv_order[i] = idx; + } + + MPI_Waitall(send_req.size(), send_req.data(), MPI_STATUSES_IGNORE); +} + +void reorder_sends(ParMat& A, + std::vector& sendbuf, + std::vector& recvbuf, + std::vector& send_order, + std::vector& recv_order, + std::vector& send_req, + std::vector& recv_req, + MPIL_Comm* xcomm) +{ + int num_procs; + MPI_Comm_size(MPI_COMM_WORLD, &num_procs); + std::vector procs_to_idx(num_procs); + for (int i = 0; i < A.send_comm.n_msgs; i++) + procs_to_idx[A.send_comm.procs[i]] = i; + + int idx; + int tag; + MPIL_Comm_tag(xcomm, &tag); + + + for (int i = 0; i < A.recv_comm.n_msgs; i++) + { + idx = recv_order[i]; + MPI_Isend(&(recvbuf[A.recv_comm.ptr[idx]]), + A.recv_comm.counts[idx], + MPI_DOUBLE, + A.recv_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &(recv_req[i])); + } + + + MPI_Status status; + for (int i = 0; i < send_req.size(); i++) + { + MPI_Probe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &status); + int proc = status.MPI_SOURCE; + idx = procs_to_idx[proc]; + MPI_Recv(&(sendbuf[A.send_comm.ptr[idx]]), + A.send_comm.counts[idx], + MPI_DOUBLE, + A.send_comm.procs[idx], + tag, + MPI_COMM_WORLD, + &status); + send_order[i] = idx; + } + + MPI_Waitall(recv_req.size(), recv_req.data(), MPI_STATUSES_IGNORE); +} + +int main(int argc, char* argv[]) +{ + MPI_Init(&argc, &argv); + + int rank, num_procs; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &num_procs); + + if (argc == 1) + { + if (rank == 0) + { + printf("Pass Matrix Filename as Command Line Arg!\n"); + } + MPI_Finalize(); + return 1; + } + char* filename = argv[1]; + + // Read suitesparse matrix + ParMat A; + int file_error = readParMatrix(filename, A); + if (file_error) + { + return 1; + } + + // Form Communication Package (A.send_comm, A.recv_comm) + form_comm(A); + + double t0; + + std::vector x(A.on_proc.n_rows); + std::vector b(A.on_proc.n_rows); + std::vector b_new(A.on_proc.n_rows); + std::generate(x.begin(), x.end(), + []() { return (double)rand() / RAND_MAX; }); + + std::vector sendbuf(A.send_comm.size_msgs); + std::vector recvbuf(A.recv_comm.size_msgs); + + MPIL_Comm* xcomm; + MPIL_Comm_init(&xcomm, MPI_COMM_WORLD); + + std::vector send_req(A.send_comm.n_msgs); + std::vector recv_req(A.recv_comm.n_msgs); + + std::vector send_order(A.send_comm.n_msgs); + std::vector recv_order(A.recv_comm.n_msgs); + + // Standard Communication + std::iota(send_order.begin(), send_order.end(), 0); + std::iota(recv_order.begin(), recv_order.end(), 0); + for (int iter = 0; iter < 5; iter++) + { + t0 = time_par_spmv(A, x, b, sendbuf, recvbuf, + send_order, recv_order, send_req, recv_req, xcomm); + if (rank == 0) printf("Iter %d: Original SpMV Time: %e\n", iter, t0); + } + + // Reorder Recvs + std::iota(send_order.begin(), send_order.end(), 0); + std::iota(recv_order.begin(), recv_order.end(), 0); + reorder_recvs(A, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + for (int iter = 0; iter < 5; iter++) + { + t0 = time_par_spmv(A, x, b_new, sendbuf, recvbuf, + send_order, recv_order, send_req, recv_req, xcomm); + if (rank == 0) printf("Iter %d: Reordered Recvs SpMV Time: %e\n", iter, t0); + } + compare(b, b_new); + + // Reorder Sends and Recvs + std::fill(b_new.begin(), b_new.end(), 0); + std::iota(send_order.begin(), send_order.end(), 0); + std::iota(recv_order.begin(), recv_order.end(), 0); + reorder_sends(A, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + reorder_recvs(A, sendbuf, recvbuf, send_order, recv_order, + send_req, recv_req, xcomm); + for (int iter = 0; iter < 5; iter++) + { + t0 = time_par_spmv(A, x, b_new, sendbuf, recvbuf, + send_order, recv_order, send_req, recv_req, xcomm); + if (rank == 0) printf("Iter %d: Reordered Sends/Recvs SpMV Time: %e\n", iter, t0); + } + compare(b, b_new); + + MPIL_Comm_free(&xcomm); + + MPI_Finalize(); + return 0; +} diff --git a/library/bindings/MPIL_Request/MPIL_Request_free.cpp b/library/bindings/MPIL_Request/MPIL_Request_free.cpp index fbd48d23e..b085acd9b 100644 --- a/library/bindings/MPIL_Request/MPIL_Request_free.cpp +++ b/library/bindings/MPIL_Request/MPIL_Request_free.cpp @@ -14,44 +14,55 @@ int MPIL_Request_free(MPIL_Request** request_ptr) { MPIL_Request* request = *request_ptr; - if (request->local_L_n_msgs) + /** Free any local request objects. In these objects, + * sendbuf and recvbuf were malloc'd, so they must be freed */ + if (request->local_L_request != NULL) { - for (int i = 0; i < request->local_L_n_msgs; i++) - { - MPI_Request_free(&(request->local_L_requests[i])); - } - free(request->local_L_requests); + MPIL_Request_free(&(request->local_L_request)); + request->local_L_request = NULL; } - if (request->local_S_n_msgs) + if (request->local_S_request != NULL) { - for (int i = 0; i < request->local_S_n_msgs; i++) - { - MPI_Request_free(&(request->local_S_requests[i])); - } - free(request->local_S_requests); + MPIL_Request_free(&(request->local_S_request)); + request->local_S_request = NULL; } - if (request->local_R_n_msgs) + if (request->local_R_request != NULL) { - for (int i = 0; i < request->local_R_n_msgs; i++) - { - MPI_Request_free(&(request->local_R_requests[i])); - } - free(request->local_R_requests); + MPIL_Request_free(&(request->local_R_request)); + request->local_R_request = NULL; } - if (request->global_n_msgs) + + if (request->n_msgs) { - for (int i = 0; i < request->global_n_msgs; i++) + for (int i = 0; i < request->n_msgs; i++) { - MPI_Request_free(&(request->global_requests[i])); + MPI_Request_free(&(request->requests[i])); } - free(request->global_requests); + free(request->requests); + request->n_msgs = 0; } - // If Locality-Aware - if (request->locality != NULL) + if (request->size_sends) { - destroy_locality_comm(request->locality); + free(request->tmp_sendbuf); + request->tmp_sendbuf = NULL; + + free(request->send_indices); + request->send_indices = NULL; + + request->size_sends = 0; } + if (request->size_recvs) + { + free(request->tmp_recvbuf); + request->tmp_recvbuf = NULL; + + free(request->recv_indices); + request->recv_indices = NULL; + + request->size_recvs = 0; + } + // TODO : for safety, may want to check if allocated with malloc? #ifdef GPU // Assuming cpu buffers allocated in pinned memory diff --git a/library/include/communicator/comm_data.h b/library/include/communicator/comm_data.h deleted file mode 100644 index cb3885aef..000000000 --- a/library/include/communicator/comm_data.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef MPI_ADVANCE_COMM_DATA_H -#define MPI_ADVANCE_COMM_DATA_H - -#include - -#ifdef __cplusplus -extern "C" { -#endif -/** @brief Structure containing metadata about a message**/ -typedef struct _CommData -{ - /** @brief Number of messages sent between sender and receiver **/ - int num_msgs; - /** @brief size of the message sent between sender and receiver **/ - int size_msgs; - /** @brief Size of the datatype used in the communication **/ - int datatype_size; - /** @brief Number of processes involved in the communication **/ - int* procs; - /** @brief pointer to index **/ - int* indptr; - /** @brief indexes for message **/ - int* indices; - /** @brief buffer containing copy of message **/ - char* buffer; -} CommData; - -/** @brief ::CommData constructor that sets all values to 0, except for CommData::datatype_size **/ -void init_comm_data(CommData** comm_data_ptr, MPI_Datatype datatype); -/** @brief ::CommData destructor that frees allocated memory **/ -void destroy_comm_data(CommData* data); -/** @brief Sets the CommData::num_msgs to provided value */ -void init_num_msgs(CommData* data, int num_msgs); -/** @brief Sets CommData::size_msgs and allocates CommData::indices for indexing messages **/ -void init_size_msgs(CommData* data, int size_msgs); -/** @brief Allocates CommData::buffer to the size of `(CommData::size_msgs * CommData::datatype_size)` bytes **/ -void finalize_comm_data(CommData* data); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/library/include/communicator/comm_pkg.h b/library/include/communicator/comm_pkg.h deleted file mode 100644 index 0d4ef6a92..000000000 --- a/library/include/communicator/comm_pkg.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef MPI_ADVANCE_COMM_PKG_H -#define MPI_ADVANCE_COMM_PKG_H - -#include "comm_data.h" - -/** @brief Struct for storing directional message data for a group of processes. - * @details - * One ::CommPkg per process. - * One ::CommData object for each process send to and received from by that process. - **/ -typedef struct _CommPkg -{ - /** @brief Information on outgoing messages **/ - CommData* send_data; - /** @brief Information on incoming messages **/ - CommData* recv_data; - /** @brief Tag value to use for communications (see ::get_tag()). **/ - int tag; -} CommPkg; - -/** @brief Allocate and initialize a ::CommPkg **/ -void init_comm_pkg(CommPkg** comm_ptr, - MPI_Datatype sendtype, - MPI_Datatype recvtype, - int _tag); -/** @brief Calls finalize_comm_data() on CommPkg::send_data and CommPkg::recv_data **/ -void finalize_comm_pkg(CommPkg* comm); -/** @brief ::CommPkg destructor that cleans up CommPkg::send_data and CommPkg::recv_data - * **/ -void destroy_comm_pkg(CommPkg* comm); - -#endif diff --git a/library/include/communicator/locality_comm.h b/library/include/communicator/locality_comm.h deleted file mode 100644 index 5f284d1c3..000000000 --- a/library/include/communicator/locality_comm.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef MPI_ADVANCE_LOCALITY_COMM_H -#define MPI_ADVANCE_LOCALITY_COMM_H - -#include - -#include "MPIL_Comm.hpp" -#include "comm_pkg.h" - -// Declarations of C++ methods -#ifdef __cplusplus -extern "C" { -#endif -/** @brief Struct for messaging metadata for the different groups of processes**/ -typedef struct LocalityComm -{ - /** @brief Metadata about local (on-node) communications. **/ - CommPkg* local_L_comm; - /** @brief Metadata about original messages before aggregation. **/ - CommPkg* local_S_comm; - /** @brief Metadata to redistribute aggravated messages. **/ - CommPkg* local_R_comm; - /** @brief Metadata on messages sent between nodes. **/ - CommPkg* global_comm; - /** @brief Pointer to MPIL_Comm used in the locality mapping. **/ - MPIL_Comm* communicators; -} LocalityComm; - -/** @brief Constructor for ::LocalityComm. Datatypes used to create message metadata. */ -void init_locality_comm(LocalityComm** locality_ptr, - MPIL_Comm* comm, - MPI_Datatype sendtype, - MPI_Datatype recvtype); -/** @brief Finalize the ::CommPkg objects inside this object. */ -void finalize_locality_comm(LocalityComm* locality); -/** @brief Destructor for a ::LocalityComm object. */ -void destroy_locality_comm(LocalityComm* locality); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/library/include/neighborhood/neighbor_locality.h b/library/include/neighborhood/neighbor_locality.h index 062ca38e5..682670cbf 100644 --- a/library/include/neighborhood/neighbor_locality.h +++ b/library/include/neighborhood/neighbor_locality.h @@ -4,17 +4,33 @@ #include #include -#include "communicator/comm_data.h" -#include "communicator/comm_pkg.h" -#include "communicator/locality_comm.h" +#include "communicator/MPIL_Comm.hpp" -void map_procs_to_nodes(LocalityComm* locality, - const int orig_num_msgs, +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _CommData +{ + int num_msgs; + int size_msgs; + int datatype_size; + int* procs; + int* indptr; + int* counts; + int* indices; + char* buffer; +} CommData; + + + +void map_procs_to_nodes(const int orig_num_msgs, const int* orig_procs, const int* orig_counts, std::vector& msg_nodes, std::vector& msg_node_to_local, - bool incr); + bool incr, + MPIL_Comm* mpil_comm); void form_local_comm(const int orig_num_sends, const int* orig_send_procs, const int* orig_send_ptr, @@ -25,22 +41,26 @@ void form_local_comm(const int orig_num_sends, CommData* recv_data, CommData* local_data, std::vector& recv_idx_nodes, - LocalityComm* locality, - const int tag); + MPIL_Comm* mpil_comm); void form_global_comm(CommData* local_data, CommData* global_data, std::vector& local_data_nodes, - const MPIL_Comm* mpil_comm, - int tag); -void update_global_comm(LocalityComm* locality); + MPIL_Comm* mpil_comm); +void update_global_comm(CommData* global_send_data, + CommData* global_recv_data, + MPIL_Comm* mpil_comm); void form_global_map(const CommData* map_data, std::map& global_map); -void map_indices(CommData* idx_data, std::map& global_map); -void map_indices(CommData* idx_data, const CommData* map_data); void remove_duplicates(CommData* comm_pkg); -void remove_duplicates(CommPkg* data); -void remove_duplicates(LocalityComm* locality); -void update_indices(LocalityComm* locality, - std::map& send_global_to_local, - std::map& recv_global_to_local); -#endif \ No newline at end of file + +void destroy_comm_data(CommData* data); +/** @brief Sets the CommData::num_msgs to provided value */ +void init_num_msgs(CommData* data, int num_msgs); +/** @brief Sets CommData::size_msgs and allocates CommData::indices for indexing messages **/ +void init_size_msgs(CommData* data, int size_msgs); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/library/include/neighborhood/neighborhood_init.h b/library/include/neighborhood/neighborhood_init.h index 86a86a48b..14e9ef1d8 100644 --- a/library/include/neighborhood/neighborhood_init.h +++ b/library/include/neighborhood/neighborhood_init.h @@ -40,6 +40,20 @@ int neighbor_alltoallv_init_standard(const void* sendbuf, MPIL_Info* info, MPIL_Request** request_ptr); +int neighbor_alltoallv_init_standard_helper(const void* sendbuf, + const int sendcounts[], + const int sdispls[], + MPI_Datatype sendtype, + void* recvbuf, + const int recvcounts[], + const int rdispls[], + MPI_Datatype recvtype, + MPIL_Topo* topo, + MPI_Comm comm, + MPIL_Info* info, + int tag, + MPIL_Request* request); + int neighbor_alltoallv_init_locality(const void* sendbuf, const int sendcounts[], const int sdispls[], @@ -72,10 +86,12 @@ void init_locality(const int n_sends, const int* send_procs, const int* send_indptr, const int* sendcounts, + const void* sendbuffer, const int n_recvs, const int* recv_procs, const int* recv_indptr, const int* recvcounts, + void* recvbuffer, const long* global_send_indices, const long* global_recv_indices, const MPI_Datatype sendtype, @@ -83,20 +99,17 @@ void init_locality(const int n_sends, MPIL_Comm* mpil_comm, MPIL_Request* request); -int init_communication(const void* sendbuffer, - int n_sends, - const int* send_procs, - const int* send_ptr, - MPI_Datatype sendtype, - void* recvbuffer, - int n_recvs, - const int* recv_procs, - const int* recv_ptr, - MPI_Datatype recvtype, - int tag, - MPI_Comm comm, - int* n_request_ptr, - MPI_Request** request_ptr); + +void init_packing_buffers(MPIL_Request* request, + int size_sends, + int* send_indices, + int send_size, + const void* _sendbuf, + int size_recvs, + int* recv_indices, + int recv_size, + void* _recvbuf); + #ifdef __cplusplus } diff --git a/library/include/persistent/MPIL_Request.h b/library/include/persistent/MPIL_Request.h index b4244a76e..86f6b354a 100644 --- a/library/include/persistent/MPIL_Request.h +++ b/library/include/persistent/MPIL_Request.h @@ -3,53 +3,60 @@ #include -#include "communicator/locality_comm.h" - #ifdef __cplusplus extern "C" { #endif +typedef struct _MPIL_Request MPIL_Request; /** @brief A custom MPI_Request struct used for the library's persistent collectives * @details For external users, there is limited direct access to class members through * API calls. Contains multiple requests and buffers to manage complex communication. * Contains function pointer to appropriate start and wait functions. */ -typedef struct _MPIL_Request +struct _MPIL_Request { - // Message counts; Will only use global unless locality-aware - /** @brief Intra-node message count **/ - int local_L_n_msgs; - /** @brief Sent message count **/ - int local_S_n_msgs; - /** @brief Received message count **/ - int local_R_n_msgs; - /** @brief Number of inter-node messages **/ - int global_n_msgs; - - // MPI Request arrays; Will only use global unless locality-aware - /** @brief Requests to manage of intra-node messages **/ - MPI_Request* local_L_requests; - /** @brief Requests to control sent messages **/ - MPI_Request* local_S_requests; - /** @brief Requests to control received messages **/ - MPI_Request* local_R_requests; - /** @brief Requests to manage of inter-node messages **/ - MPI_Request* global_requests; + /** @brief Number of messages **/ + int n_msgs; + /** @brief array of MPI Requests **/ + MPI_Request* requests; - /** @brief Pointer to locality communication information if using locality-aware - * variants **/ - LocalityComm* locality; - - /** @brief Pointers to the user's original send buffer */ + /** @brief Pointer to the user's original send buffer*/ const void* sendbuf; /** @brief Pointer to the user's original receive buffer */ void* recvbuf; + /** @brief Pointer to new send buffer for data to be + * packed into at intermediate steps */ + void* tmp_sendbuf; + /** @brief Pointer to new recv buffer for data to be + * recvd into at intermediate steps */ + void* tmp_recvbuf; + /** @brief indices of input buffer to be packed **/ + int* send_indices; + /** @brief indices of received buffer to be unpacked **/ + int* recv_indices; + /** @brief size of sendbuf/send_indices **/ + int size_sends; + /** @brief size of recvbuf/recv_indices **/ + int size_recvs; + /** @brief size of sendtype **/ + int send_size; + /** @brief size of recvtype **/ + + // Pointers to MPI_Requests for aggregated communication + /** @brief Fully local communication **/ + MPIL_Request* local_L_request; + /** @brief Initial local aggregation **/ + MPIL_Request* local_S_request; + /** @brief Final local disaggrgation **/ + MPIL_Request* local_R_request; + /** @brief Number of bytes per receive object, locality-aware only **/ int recv_size; /** @brief Block size for strided/blocked communication **/ int block_size; - + /** @brief Flag for if we want MPIL to reorder requests based on order of arrival + * during first iteration **/ int reorder; #ifdef GPU @@ -62,7 +69,7 @@ typedef struct _MPIL_Request int (*start_function)(struct _MPIL_Request* request); /** @brief Function pointer to MPIL_Wait or MPIL_neighbor_wait **/ int (*wait_function)(struct _MPIL_Request* request, MPI_Status* status); -} MPIL_Request; +}; /** @brief Constructor for ::MPIL_Request. Initializes most members to 0. */ void init_request(MPIL_Request** request_ptr); @@ -71,10 +78,10 @@ void init_request(MPIL_Request** request_ptr); @param [in] n_request how many requests need space @param [out] request_ptr pointer to start of allocated memory **/ -void allocate_requests(int n_requests, MPI_Request** request_ptr); +void allocate_requests(int n_requests, MPIL_Request* request); #ifdef __cplusplus } #endif -#endif \ No newline at end of file +#endif diff --git a/library/source/communicator/CMakeLists.txt b/library/source/communicator/CMakeLists.txt index 611d17452..0cf82ed1e 100644 --- a/library/source/communicator/CMakeLists.txt +++ b/library/source/communicator/CMakeLists.txt @@ -1,9 +1,6 @@ target_sources(locality_aware PRIVATE - comm_data.cpp - comm_pkg.cpp - get_tag.cpp global_comms.cpp - initializers.cpp - locality_comm.cpp topology_functions.cpp -) \ No newline at end of file + get_tag.cpp + initializers.cpp +) diff --git a/library/source/communicator/comm_data.cpp b/library/source/communicator/comm_data.cpp deleted file mode 100644 index e9af8e104..000000000 --- a/library/source/communicator/comm_data.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include "communicator/comm_data.h" - -#include - -void init_comm_data(CommData** comm_data_ptr, MPI_Datatype datatype) -{ - CommData* data = (CommData*)malloc(sizeof(CommData)); - - data->num_msgs = 0; - data->size_msgs = 0; - MPI_Type_size(datatype, &(data->datatype_size)); - data->procs = NULL; - data->indptr = NULL; - data->indices = NULL; - data->buffer = NULL; - - *comm_data_ptr = data; -} - -void destroy_comm_data(CommData* data) -{ - free(data->procs); - free(data->indptr); - free(data->indices); - free(data->buffer); - - free(data); -} - -void init_num_msgs(CommData* data, int num_msgs) -{ - data->num_msgs = num_msgs; - if (data->num_msgs) - { - data->procs = (int*)malloc(sizeof(int) * data->num_msgs); - } - data->indptr = (int*)malloc(sizeof(int) * (data->num_msgs + 1)); - data->indptr[0] = 0; -} - -void init_size_msgs(CommData* data, int size_msgs) -{ - data->size_msgs = size_msgs; - if (data->size_msgs) - { - data->indices = (int*)malloc(data->size_msgs * sizeof(int)); - } -} - -void finalize_comm_data(CommData* data) -{ - if (data->size_msgs) - { - data->buffer = - (char*)malloc(data->size_msgs * data->datatype_size * sizeof(char)); - } -} diff --git a/library/source/communicator/comm_pkg.cpp b/library/source/communicator/comm_pkg.cpp deleted file mode 100644 index bcc2a4140..000000000 --- a/library/source/communicator/comm_pkg.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include "communicator/comm_pkg.h" - -#include - -void init_comm_pkg(CommPkg** comm_ptr, - MPI_Datatype sendtype, - MPI_Datatype recvtype, - int _tag) -{ - CommPkg* comm = (CommPkg*)malloc(sizeof(CommPkg)); - - init_comm_data(&(comm->send_data), sendtype); - init_comm_data(&(comm->recv_data), recvtype); - comm->tag = _tag; - - *comm_ptr = comm; -} - -void finalize_comm_pkg(CommPkg* comm_pkg) -{ - finalize_comm_data(comm_pkg->send_data); - finalize_comm_data(comm_pkg->recv_data); -} - -void destroy_comm_pkg(CommPkg* comm) -{ - destroy_comm_data(comm->send_data); - destroy_comm_data(comm->recv_data); - - free(comm); -} diff --git a/library/source/communicator/locality_comm.cpp b/library/source/communicator/locality_comm.cpp deleted file mode 100644 index 1484e9068..000000000 --- a/library/source/communicator/locality_comm.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include "communicator/locality_comm.h" - -#include - -#include "locality_aware.h" - -void init_locality_comm(LocalityComm** locality_ptr, - MPIL_Comm* mpil_comm, - MPI_Datatype sendtype, - MPI_Datatype recvtype) -{ - LocalityComm* locality = (LocalityComm*)malloc(sizeof(LocalityComm)); - - int tag; - get_tag(mpil_comm, &tag); - init_comm_pkg(&(locality->local_L_comm), sendtype, recvtype, tag); - - get_tag(mpil_comm, &tag); - init_comm_pkg(&(locality->local_S_comm), sendtype, recvtype, tag); - - get_tag(mpil_comm, &tag); - init_comm_pkg(&(locality->local_R_comm), recvtype, recvtype, tag); - - get_tag(mpil_comm, &tag); - init_comm_pkg(&(locality->global_comm), recvtype, recvtype, tag); - - locality->communicators = mpil_comm; - - *locality_ptr = locality; -} - -void finalize_locality_comm(LocalityComm* locality) -{ - finalize_comm_pkg(locality->local_L_comm); - finalize_comm_pkg(locality->local_S_comm); - finalize_comm_pkg(locality->local_R_comm); - finalize_comm_pkg(locality->global_comm); -} - -void destroy_locality_comm(LocalityComm* locality) -{ - destroy_comm_pkg(locality->local_L_comm); - destroy_comm_pkg(locality->local_S_comm); - destroy_comm_pkg(locality->local_R_comm); - destroy_comm_pkg(locality->global_comm); - - free(locality); -} diff --git a/library/source/neighborhood/CMakeLists.txt b/library/source/neighborhood/CMakeLists.txt index 88230aca4..58d2db13a 100644 --- a/library/source/neighborhood/CMakeLists.txt +++ b/library/source/neighborhood/CMakeLists.txt @@ -6,10 +6,9 @@ target_sources(locality_aware PRIVATE locality/neighbor_alltoallv_locality.cpp locality/neighbor_alltoallv_init_locality.cpp locality/neighbor_alltoallv_init_locality_ext.cpp - persistent/init_communication.cpp persistent/init_neighbor_request.cpp persistent/neighbor_start.cpp persistent/neighbor_wait.cpp standard/neighbor_alltoallv_standard.cpp standard/neighbor_alltoallv_init_standard.cpp -) \ No newline at end of file +) diff --git a/library/source/neighborhood/locality/neighbor_alltoallv_init_locality_ext.cpp b/library/source/neighborhood/locality/neighbor_alltoallv_init_locality_ext.cpp index 232ab3550..9565e45a3 100644 --- a/library/source/neighborhood/locality/neighbor_alltoallv_init_locality_ext.cpp +++ b/library/source/neighborhood/locality/neighbor_alltoallv_init_locality_ext.cpp @@ -30,6 +30,9 @@ int neighbor_alltoallv_init_locality_ext(const void* sendbuffer, MPIL_Request* request; init_neighbor_request(&request); + init_neighbor_request(&(request->local_L_request)); + init_neighbor_request(&(request->local_S_request)); + init_neighbor_request(&(request->local_R_request)); int indegree = 0; int outdegree = 0; @@ -76,6 +79,7 @@ int neighbor_alltoallv_init_locality_ext(const void* sendbuffer, } } + // Initialize Locality-Aware Communication Strategy (3-Step) // E.G. Determine which processes talk to each other at every step // TODO : instead of mpi_comm, use comm @@ -84,10 +88,12 @@ int neighbor_alltoallv_init_locality_ext(const void* sendbuffer, destinations, dest_displs, dest_counts, + sendbuffer, indegree, sources, source_displs, source_counts, + recvbuffer, global_sindices, global_rindices, sendtype, @@ -95,74 +101,6 @@ int neighbor_alltoallv_init_locality_ext(const void* sendbuffer, comm, // communicator used in dist_graph_create_adjacent request); - request->sendbuf = sendbuffer; - request->recvbuf = recvbuffer; - MPI_Type_size(recvtype, &(request->recv_size)); - - // Local L Communication - // init_communication(sendbuffer, - init_communication(request->locality->local_L_comm->send_data->buffer, - request->locality->local_L_comm->send_data->num_msgs, - request->locality->local_L_comm->send_data->procs, - request->locality->local_L_comm->send_data->indptr, - sendtype, - request->locality->local_L_comm->recv_data->buffer, - request->locality->local_L_comm->recv_data->num_msgs, - request->locality->local_L_comm->recv_data->procs, - request->locality->local_L_comm->recv_data->indptr, - recvtype, - request->locality->local_L_comm->tag, - comm->local_comm, - &(request->local_L_n_msgs), - &(request->local_L_requests)); - - // Local S Communication - init_communication(request->locality->local_S_comm->send_data->buffer, - request->locality->local_S_comm->send_data->num_msgs, - request->locality->local_S_comm->send_data->procs, - request->locality->local_S_comm->send_data->indptr, - sendtype, - request->locality->local_S_comm->recv_data->buffer, - request->locality->local_S_comm->recv_data->num_msgs, - request->locality->local_S_comm->recv_data->procs, - request->locality->local_S_comm->recv_data->indptr, - recvtype, - request->locality->local_S_comm->tag, - comm->local_comm, - &(request->local_S_n_msgs), - &(request->local_S_requests)); - - // Global Communication - init_communication(request->locality->global_comm->send_data->buffer, - request->locality->global_comm->send_data->num_msgs, - request->locality->global_comm->send_data->procs, - request->locality->global_comm->send_data->indptr, - sendtype, - request->locality->global_comm->recv_data->buffer, - request->locality->global_comm->recv_data->num_msgs, - request->locality->global_comm->recv_data->procs, - request->locality->global_comm->recv_data->indptr, - recvtype, - request->locality->global_comm->tag, - comm->global_comm, - &(request->global_n_msgs), - &(request->global_requests)); - - // Local R Communication - init_communication(request->locality->local_R_comm->send_data->buffer, - request->locality->local_R_comm->send_data->num_msgs, - request->locality->local_R_comm->send_data->procs, - request->locality->local_R_comm->send_data->indptr, - sendtype, - request->locality->local_R_comm->recv_data->buffer, - request->locality->local_R_comm->recv_data->num_msgs, - request->locality->local_R_comm->recv_data->procs, - request->locality->local_R_comm->recv_data->indptr, - recvtype, - request->locality->local_R_comm->tag, - comm->local_comm, - &(request->local_R_n_msgs), - &(request->local_R_requests)); *request_ptr = request; diff --git a/library/source/neighborhood/locality/neighbor_locality.cpp b/library/source/neighborhood/locality/neighbor_locality.cpp index 9a6c58ca7..a287b205f 100644 --- a/library/source/neighborhood/locality/neighbor_locality.cpp +++ b/library/source/neighborhood/locality/neighbor_locality.cpp @@ -2,8 +2,15 @@ #include +#include "locality_aware.h" #include "communicator/MPIL_Comm.hpp" #include "persistent/MPIL_Request.h" +#include "neighborhood/MPIL_Topo.h" +#include "neighborhood/neighborhood_init.h" +#include "neighborhood/alltoall_crs.h" + +void map_indices(CommData* idx_data, std::map& global_map); +void map_indices(CommData* idx_data, const CommData* map_data); /****************************************** **** @@ -20,10 +27,12 @@ void init_locality(const int n_sends, const int* send_procs, const int* send_indptr, const int* sendcounts, + const void* sendbuffer, const int n_recvs, const int* recv_procs, const int* recv_indptr, const int* recvcounts, + void* recvbuffer, const long* global_send_indices, const long* global_recv_indices, const MPI_Datatype sendtype, @@ -31,20 +40,34 @@ void init_locality(const int n_sends, MPIL_Comm* mpil_comm, MPIL_Request* request) { - // Initialize structure - LocalityComm* locality_comm; - init_locality_comm(&locality_comm, mpil_comm, sendtype, recvtype); + // Get MPI Information + int rank, num_procs; + MPI_Comm_rank(mpil_comm->global_comm, &rank); + MPI_Comm_size(mpil_comm->global_comm, &num_procs); + + CommData* local_L_send_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* local_L_recv_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* local_S_send_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* local_S_recv_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* local_R_send_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* local_R_recv_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* global_send_data = (CommData*)calloc(1, sizeof(CommData)); + CommData* global_recv_data = (CommData*)calloc(1, sizeof(CommData)); + + int send_size, recv_size; + MPI_Type_size(sendtype, &(send_size)); + MPI_Type_size(recvtype, &(recv_size)); // Find global send nodes std::vector send_nodes; std::vector send_node_to_local; - map_procs_to_nodes(locality_comm, - n_sends, + map_procs_to_nodes(n_sends, send_procs, sendcounts, send_nodes, send_node_to_local, - true); + true, + mpil_comm); // Form initial send local comm std::vector recv_idx_nodes; @@ -54,30 +77,28 @@ void init_locality(const int n_sends, sendcounts, global_send_indices, send_node_to_local, - locality_comm->local_S_comm->send_data, - locality_comm->local_S_comm->recv_data, - locality_comm->local_L_comm->send_data, + local_S_send_data, + local_S_recv_data, + local_L_send_data, recv_idx_nodes, - locality_comm, - 19483); + mpil_comm); // Form global send data - form_global_comm(locality_comm->local_S_comm->recv_data, - locality_comm->global_comm->send_data, + form_global_comm(local_S_recv_data, + global_send_data, recv_idx_nodes, - mpil_comm, - 93284); + mpil_comm); // Find global recv nodes std::vector recv_nodes; std::vector recv_node_to_local; - map_procs_to_nodes(locality_comm, - n_recvs, + map_procs_to_nodes(n_recvs, recv_procs, recvcounts, recv_nodes, recv_node_to_local, - false); + false, + mpil_comm); // Form final recv local comm std::vector send_idx_nodes; @@ -87,22 +108,22 @@ void init_locality(const int n_sends, recvcounts, global_recv_indices, recv_node_to_local, - locality_comm->local_R_comm->recv_data, - locality_comm->local_R_comm->send_data, - locality_comm->local_L_comm->recv_data, + local_R_recv_data, + local_R_send_data, + local_L_recv_data, send_idx_nodes, - locality_comm, - 32048); + mpil_comm); // Form global recv data - form_global_comm(locality_comm->local_R_comm->send_data, - locality_comm->global_comm->recv_data, + form_global_comm(local_R_send_data, + global_recv_data, send_idx_nodes, - locality_comm->communicators, - 93284); + mpil_comm); // Update procs for global_comm send and recvs - update_global_comm(locality_comm); + update_global_comm(global_send_data, + global_recv_data, + mpil_comm); // Update send and receive indices std::map send_global_to_local; @@ -130,22 +151,209 @@ void init_locality(const int n_sends, } } - update_indices(locality_comm, send_global_to_local, recv_global_to_local); + remove_duplicates(local_S_send_data); + remove_duplicates(local_S_recv_data); + remove_duplicates(local_R_send_data); + remove_duplicates(local_R_recv_data); + remove_duplicates(global_send_data); + remove_duplicates(global_recv_data); + + // Map global indices to usable indices + map_indices(global_send_data, local_S_recv_data); + map_indices(local_R_send_data, global_recv_data); + map_indices(local_S_send_data, send_global_to_local); + map_indices(local_L_send_data, send_global_to_local); + map_indices(local_R_recv_data, recv_global_to_local); + map_indices(local_L_recv_data, recv_global_to_local); - // Initialize final variable (MPI_Request arrays, etc.) - finalize_locality_comm(locality_comm); + // Don't need local_S or global recv indices (just contiguous) + if (local_S_recv_data->indices) + { + free(local_S_recv_data->indices); + local_S_recv_data->indices = NULL; + } + if (global_recv_data->indices) + { + free(global_recv_data->indices); + global_recv_data->indices = NULL; + } + + + // Initialize packing buffers for Local_L + init_packing_buffers(request->local_L_request, + local_L_send_data->size_msgs, + local_L_send_data->indices, + send_size, + sendbuffer, + local_L_recv_data->size_msgs, + local_L_recv_data->indices, + recv_size, + recvbuffer); + + // Initialize packing buffers for Local_S + init_packing_buffers(request->local_S_request, + local_S_send_data->size_msgs, + local_S_send_data->indices, + send_size, + sendbuffer, + local_S_recv_data->size_msgs, + NULL, + send_size, + NULL); + + // Initialize packing buffers for global + init_packing_buffers(request, + global_send_data->size_msgs, + global_send_data->indices, + send_size, + request->local_S_request->tmp_recvbuf, + global_recv_data->size_msgs, + NULL, + recv_size, + NULL); + + // Initialize packing buffers for Local_R + init_packing_buffers(request->local_R_request, + local_R_send_data->size_msgs, + local_R_send_data->indices, + recv_size, + request->tmp_recvbuf, + local_R_recv_data->size_msgs, + local_R_recv_data->indices, + recv_size, + recvbuffer); + + + + + + MPIL_Info* mpil_info; + MPIL_Info_init(&mpil_info); + int tag; + + // Local L Communication + MPIL_Topo* topo_step; + MPIL_Topo_init(local_L_recv_data->num_msgs, + local_L_recv_data->procs, + MPI_UNWEIGHTED, + local_L_send_data->num_msgs, + local_L_send_data->procs, + MPI_UNWEIGHTED, + mpil_info, + &topo_step); + MPIL_Comm_tag(mpil_comm, &tag); + neighbor_alltoallv_init_standard_helper( + request->local_L_request->tmp_sendbuf, + local_L_send_data->counts, + local_L_send_data->indptr, + sendtype, + request->local_L_request->tmp_recvbuf, + local_L_recv_data->counts, + local_L_recv_data->indptr, + recvtype, + topo_step, + mpil_comm->local_comm, + mpil_info, + tag, + request->local_L_request); + MPIL_Topo_free(&topo_step); + + + // Local S Communication + MPIL_Topo_init(local_S_recv_data->num_msgs, + local_S_recv_data->procs, + MPI_UNWEIGHTED, + local_S_send_data->num_msgs, + local_S_send_data->procs, + MPI_UNWEIGHTED, + mpil_info, + &topo_step); + MPIL_Comm_tag(mpil_comm, &tag); + neighbor_alltoallv_init_standard_helper( + request->local_S_request->tmp_sendbuf, + local_S_send_data->counts, + local_S_send_data->indptr, + sendtype, + request->local_S_request->tmp_recvbuf, + local_S_recv_data->counts, + local_S_recv_data->indptr, + recvtype, + topo_step, + mpil_comm->local_comm, + mpil_info, + tag, + request->local_S_request); + MPIL_Topo_free(&topo_step); + + + // Global Communication + MPIL_Topo_init(global_recv_data->num_msgs, + global_recv_data->procs, + MPI_UNWEIGHTED, + global_send_data->num_msgs, + global_send_data->procs, + MPI_UNWEIGHTED, + mpil_info, + &topo_step); + MPIL_Comm_tag(mpil_comm, &tag); + neighbor_alltoallv_init_standard_helper( + request->tmp_sendbuf, + global_send_data->counts, + global_send_data->indptr, + sendtype, + request->tmp_recvbuf, + global_recv_data->counts, + global_recv_data->indptr, + recvtype, + topo_step, + mpil_comm->global_comm, + mpil_info, + tag, + request); + MPIL_Topo_free(&topo_step); + + + // Local R Communication + MPIL_Topo_init(local_R_recv_data->num_msgs, + local_R_recv_data->procs, + MPI_UNWEIGHTED, + local_R_send_data->num_msgs, + local_R_send_data->procs, + MPI_UNWEIGHTED, + mpil_info, + &topo_step); + MPIL_Comm_tag(mpil_comm, &tag); + neighbor_alltoallv_init_standard_helper( + request->local_R_request->tmp_sendbuf, + local_R_send_data->counts, + local_R_send_data->indptr, + sendtype, + request->local_R_request->tmp_recvbuf, + local_R_recv_data->counts, + local_R_recv_data->indptr, + recvtype, + topo_step, + mpil_comm->local_comm, + mpil_info, + tag, + request->local_R_request); + + destroy_comm_data(local_L_send_data); + destroy_comm_data(local_L_recv_data); + destroy_comm_data(local_S_send_data); + destroy_comm_data(local_S_recv_data); + destroy_comm_data(local_R_send_data); + destroy_comm_data(local_R_recv_data); + destroy_comm_data(global_send_data); + destroy_comm_data(global_recv_data); + + MPIL_Info_free(&mpil_info); + MPIL_Topo_free(&topo_step); - // Copy to pointer for return - request->locality = locality_comm; } #ifdef __cplusplus } #endif -// Destroy NAPComm* structure -void destroy_locality(MPIL_Request* request) -{ - destroy_locality_comm(request->locality); -} /****************************************** **** @@ -154,25 +362,24 @@ void destroy_locality(MPIL_Request* request) ******************************************/ // Map original communication processes to nodes on which they lie // And assign local processes to each node -void map_procs_to_nodes(LocalityComm* locality, - const int orig_num_msgs, +void map_procs_to_nodes(const int orig_num_msgs, const int* orig_procs, const int* orig_counts, std::vector& msg_nodes, std::vector& msg_node_to_local, - bool incr) + bool incr, + MPIL_Comm* mpil_comm) { - int local_rank, local_num_procs; - MPI_Comm_rank(locality->communicators->local_comm, &local_rank); - MPI_Comm_size(locality->communicators->local_comm, &local_num_procs); + int local_num_procs; + MPI_Comm_size(mpil_comm->local_comm, &local_num_procs); int proc, size, node; int local_proc; int inc; std::vector node_sizes; - int num_nodes = locality->communicators->num_nodes; - int rank_node = locality->communicators->rank_node; + int num_nodes = mpil_comm->num_nodes; + int rank_node = mpil_comm->rank_node; // Map local msg_procs to local msg_nodes node_sizes.resize(num_nodes, 0); @@ -180,7 +387,7 @@ void map_procs_to_nodes(LocalityComm* locality, { proc = orig_procs[i]; size = orig_counts[i]; - node = get_node(locality->communicators, proc); + node = get_node(mpil_comm, proc); node_sizes[node] += size; } @@ -190,7 +397,7 @@ void map_procs_to_nodes(LocalityComm* locality, num_nodes, MPI_INT, MPI_SUM, - locality->communicators->local_comm); + mpil_comm->local_comm); for (int i = 0; i < num_nodes; i++) { if (node_sizes[i] && i != rank_node) @@ -247,27 +454,21 @@ void form_local_comm(const int orig_num_sends, CommData* recv_data, CommData* local_data, std::vector& recv_idx_nodes, - LocalityComm* locality, - const int tag) + MPIL_Comm* mpil_comm) { // MPI_Information int local_rank, local_num_procs; - MPI_Comm_rank(locality->communicators->local_comm, &local_rank); - MPI_Comm_size(locality->communicators->local_comm, &local_num_procs); + MPI_Comm_rank(mpil_comm->local_comm, &local_rank); + MPI_Comm_size(mpil_comm->local_comm, &local_num_procs); // Declare variables int global_proc, local_proc; - int size, ctr, start_ctr; + int size, ctr; int start, end, node; int idx, proc_idx; - int proc, global_idx; - MPI_Status recv_status; + int global_idx; - std::vector send_buffer; - std::vector send_requests; std::vector send_sizes; - std::vector recv_buffer; - std::vector orig_to_node; std::vector local_idx; @@ -278,19 +479,17 @@ void form_local_comm(const int orig_num_sends, // Allocate sizes init_num_msgs(send_data, local_num_procs); - init_num_msgs(recv_data, local_num_procs); init_num_msgs(local_data, local_num_procs); // Form local_S_comm send_data->num_msgs = 0; local_data->num_msgs = 0; - recv_data->num_msgs = 0; for (int i = 0; i < orig_num_sends; i++) { global_proc = orig_send_procs[i]; size = orig_sendcounts[i]; - node = get_node(locality->communicators, global_proc); - if (locality->communicators->rank_node != node) + node = get_node(mpil_comm, global_proc); + if (mpil_comm->rank_node != node) { local_proc = nodes_to_local[node]; if (send_sizes[local_proc] == 0) @@ -305,7 +504,8 @@ void form_local_comm(const int orig_num_sends, { orig_to_node[i] = -1; local_data->procs[local_data->num_msgs] = - get_local_proc(locality->communicators, global_proc); + get_local_proc(mpil_comm, global_proc); + local_data->counts[local_data->num_msgs] = size; local_data->size_msgs += size; local_data->num_msgs++; local_data->indptr[local_data->num_msgs] = local_data->size_msgs; @@ -316,6 +516,7 @@ void form_local_comm(const int orig_num_sends, for (int i = 0; i < send_data->num_msgs; i++) { local_proc = send_data->procs[i]; + send_data->counts[i] = send_sizes[local_proc]; send_data->indptr[i + 1] = send_data->indptr[i] + send_sizes[local_proc]; send_sizes[local_proc] = 0; } @@ -355,99 +556,68 @@ void form_local_comm(const int orig_num_sends, } // Send 'local_S_comm send' info (to form local_S recv) - MPI_Allreduce(MPI_IN_PLACE, - send_sizes.data(), - local_num_procs, - MPI_INT, - MPI_SUM, - locality->communicators->local_comm); - recv_data->size_msgs = send_sizes[local_rank]; - init_size_msgs(recv_data, recv_data->size_msgs); - recv_idx_nodes.resize(recv_data->size_msgs); - - send_buffer.resize(2 * send_data->size_msgs); - send_requests.resize(send_data->num_msgs); - ctr = 0; - start_ctr = 0; - for (int i = 0; i < send_data->num_msgs; i++) - { - proc = send_data->procs[i]; - start = send_data->indptr[i]; - end = send_data->indptr[i + 1]; - for (int j = start; j < end; j++) - { - send_buffer[ctr++] = send_data->indices[j]; - send_buffer[ctr++] = send_idx_node[j]; - } - MPI_Isend(&send_buffer[start_ctr], - ctr - start_ctr, - MPI_INT, - proc, - tag, - locality->communicators->local_comm, - &send_requests[i]); - start_ctr = ctr; - } + std::vector send_buf(2 * send_data->size_msgs); + for (int i = 0; i < send_data->size_msgs; i++) + { + send_buf[2 * i] = send_data->indices[i]; + send_buf[2 * i + 1] = send_idx_node[i]; + } + + // Dynamic comm uses mpil_comm->global_comm + // So create new one with global_comm set to current local_comm + MPIL_Comm* local_mpil_comm; + MPIL_Comm_init(&local_mpil_comm, mpil_comm->local_comm); + + // Reseting local comm's tag to next available mpil_comm tag + // So that calling this method multiple times doesn't result + // in multiple dynamic comms on same tag + get_tag(mpil_comm, &local_mpil_comm->tag); + + MPIL_Info* local_info; + MPIL_Info_init(&local_info); + + int n_recvs, s_recvs; + int *src, *recvcounts, *rdispls, *recv_buf; + alltoallv_crs_personalized(send_data->num_msgs, + send_data->size_msgs, + send_data->procs, + send_data->counts, + send_data->indptr, + MPI_2INT, + send_buf.data(), + &n_recvs, + &s_recvs, + &src, + &recvcounts, + &rdispls, + MPI_2INT, + (void**)(&recv_buf), + local_info, + local_mpil_comm); + + init_num_msgs(recv_data, n_recvs); + init_size_msgs(recv_data, s_recvs); + memcpy(recv_data->procs, src, n_recvs * sizeof(int)); + memcpy(recv_data->counts, recvcounts, n_recvs * sizeof(int)); + memcpy(recv_data->indptr, rdispls, (n_recvs + 1) * sizeof(int)); + + recv_idx_nodes.resize(s_recvs); + int* recv_buf_int = (int*)recv_buf; + for (int i = 0; i < s_recvs; i++) + { + recv_data->indices[i] = recv_buf_int[2 * i]; + recv_idx_nodes[i] = recv_buf_int[2 * i + 1]; + } + + MPIL_Info_free(&local_info); + MPIL_Comm_free(&local_mpil_comm); + + MPIL_Free(src); + MPIL_Free(recvcounts); + MPIL_Free(rdispls); + MPIL_Free(recv_buf); + - std::vector proc_pos(local_num_procs, -1); - std::vector recv_idx(recv_data->size_msgs); - std::vector tmpnodes(recv_data->size_msgs); - std::vector recvptr(local_num_procs + 1); - recvptr[0] = 0; - ctr = 0; - while (ctr < recv_data->size_msgs) - { - MPI_Probe(MPI_ANY_SOURCE, tag, locality->communicators->local_comm, &recv_status); - proc = recv_status.MPI_SOURCE; - MPI_Get_count(&recv_status, MPI_INT, &size); - if (size > (int)recv_buffer.size()) - { - recv_buffer.resize(size); - } - MPI_Recv(recv_buffer.data(), - size, - MPI_INT, - proc, - tag, - locality->communicators->local_comm, - &recv_status); - proc_pos[proc] = recv_data->num_msgs; - for (int i = 0; i < size; i += 2) - { - recv_idx[ctr] = recv_buffer[i]; - tmpnodes[ctr++] = recv_buffer[i + 1]; - } - recvptr[recv_data->num_msgs + 1] = recvptr[recv_data->num_msgs] + (size / 2); - recv_data->num_msgs++; - } - - // Reorder Recvs - ctr = 0; - int pos, old_start, new_start; - for (int i = 0; i < local_num_procs; i++) - { - if (proc_pos[i] == -1) - { - continue; - } - - recv_data->procs[ctr] = i; - pos = proc_pos[i]; - old_start = recvptr[pos]; - new_start = recv_data->indptr[ctr]; - size = recvptr[pos + 1] - old_start; - recv_data->indptr[++ctr] = new_start + size; - for (int j = 0; j < size; j++) - { - recv_data->indices[new_start + j] = recv_idx[old_start + j]; - recv_idx_nodes[new_start + j] = tmpnodes[old_start + j]; - } - } - - if (send_data->num_msgs) - { - MPI_Waitall(send_data->num_msgs, send_requests.data(), MPI_STATUSES_IGNORE); - } } // Form portion of inter-node communication (data corresponding to @@ -456,17 +626,12 @@ void form_local_comm(const int orig_num_sends, void form_global_comm(CommData* local_data, CommData* global_data, std::vector& local_data_nodes, - const MPIL_Comm* mpil_comm, - int tag) + MPIL_Comm* mpil_comm) { - std::vector tmp_send_indices; std::vector node_sizes; std::vector node_ctr; // Get MPI Information - int local_rank, local_num_procs; - MPI_Comm_rank(mpil_comm->local_comm, &local_rank); - MPI_Comm_size(mpil_comm->local_comm, &local_num_procs); int num_nodes = mpil_comm->num_nodes; int node_idx, node; @@ -493,6 +658,7 @@ void form_global_comm(CommData* local_data, if (node_sizes[i]) { global_data->procs[global_data->num_msgs] = i; + global_data->counts[global_data->num_msgs] = node_sizes[i]; global_data->size_msgs += node_sizes[i]; node_sizes[i] = global_data->num_msgs; global_data->num_msgs++; @@ -516,156 +682,84 @@ void form_global_comm(CommData* local_data, } // Replace send and receive processes with the node id's currently in their place -void update_global_comm(LocalityComm* locality) +void update_global_comm(CommData* global_send_data, + CommData* global_recv_data, + MPIL_Comm* mpil_comm) { - int rank, num_procs; - MPI_Comm_rank(locality->communicators->global_comm, &rank); - MPI_Comm_size(locality->communicators->global_comm, &num_procs); - int local_rank, local_num_procs; - MPI_Comm_rank(locality->communicators->local_comm, &local_rank); - MPI_Comm_size(locality->communicators->local_comm, &local_num_procs); - int num_nodes = locality->communicators->num_nodes; - - int n_sends = locality->global_comm->send_data->num_msgs; - int n_recvs = locality->global_comm->recv_data->num_msgs; - int n_msgs = n_sends + n_recvs; - MPI_Request* requests = NULL; - int* send_buffer = NULL; - int send_tag, recv_tag; - get_tag(locality->communicators, &send_tag); - get_tag(locality->communicators, &recv_tag); - int node, global_proc; - int num_to_recv; - MPI_Status recv_status; - std::vector send_nodes(num_nodes, 0); - std::vector recv_nodes(num_nodes, 0); - if (n_msgs) - { - requests = new MPI_Request[n_msgs]; - send_buffer = new int[n_msgs]; - } - - std::vector comm_procs(num_procs, 0); - for (int i = 0; i < n_sends; i++) - { - node = locality->global_comm->send_data->procs[i]; - global_proc = get_global_proc(locality->communicators, node, local_rank); - comm_procs[global_proc]++; - send_buffer[i] = locality->communicators->rank_node; - MPI_Isend(&(send_buffer[i]), - 1, - MPI_INT, - global_proc, - send_tag, - locality->communicators->global_comm, - &(requests[i])); - } - MPI_Allreduce(MPI_IN_PLACE, - comm_procs.data(), - num_procs, - MPI_INT, - MPI_SUM, - locality->communicators->global_comm); - num_to_recv = comm_procs[rank]; - for (int i = 0; i < num_procs; i++) - { - comm_procs[i] = 0; - } - for (int i = 0; i < num_to_recv; i++) - { - MPI_Probe( - MPI_ANY_SOURCE, send_tag, locality->communicators->global_comm, &recv_status); - global_proc = recv_status.MPI_SOURCE; - MPI_Recv(&node, - 1, - MPI_INT, - global_proc, - send_tag, - locality->communicators->global_comm, - &recv_status); - recv_nodes[node] = global_proc; - } + int local_rank; + MPI_Comm_rank(mpil_comm->local_comm, &local_rank); + int num_nodes = mpil_comm->num_nodes; + std::vector nodes(2*num_nodes, 0); + + MPIL_Info* mpil_info; + MPIL_Info_init(&mpil_info); + + // Initialize send side for dynamic communication + std::vector dest(global_send_data->num_msgs); + std::vector vals(global_send_data->num_msgs, mpil_comm->rank_node); + for (int i = 0; i < global_send_data->num_msgs; i++) + dest[i] = get_global_proc(mpil_comm, global_send_data->procs[i], local_rank); + + int n_recvs; + int *src, *recvbuf; + MPIL_Alltoall_crs(global_send_data->num_msgs, + dest.data(), + 1, + MPI_INT, + vals.data(), + &n_recvs, + &src, + 1, + MPI_INT, + (void**) &recvbuf, + mpil_info, + mpil_comm); for (int i = 0; i < n_recvs; i++) - { - node = locality->global_comm->recv_data->procs[i]; - global_proc = get_global_proc(locality->communicators, node, local_rank); - comm_procs[global_proc]++; - send_buffer[n_sends + i] = locality->communicators->rank_node; - MPI_Isend(&(send_buffer[n_sends + i]), - 1, - MPI_INT, - global_proc, - recv_tag, - locality->communicators->global_comm, - &(requests[n_sends + i])); - } - MPI_Allreduce(MPI_IN_PLACE, - comm_procs.data(), - num_procs, - MPI_INT, - MPI_SUM, - locality->communicators->global_comm); - num_to_recv = comm_procs[rank]; - for (int i = 0; i < num_to_recv; i++) - { - MPI_Probe( - MPI_ANY_SOURCE, recv_tag, locality->communicators->global_comm, &recv_status); - global_proc = recv_status.MPI_SOURCE; - MPI_Recv(&node, - 1, - MPI_INT, - global_proc, - recv_tag, - locality->communicators->global_comm, - &recv_status); - send_nodes[node] = global_proc; - } + nodes[recvbuf[i]] = src[i]; + + MPIL_Free(src); + MPIL_Free(recvbuf); + + dest.resize(global_recv_data->num_msgs); + vals.resize(global_recv_data->num_msgs, mpil_comm->rank_node); + for (int i = 0; i < global_recv_data->num_msgs; i++) + dest[i] = get_global_proc(mpil_comm, global_recv_data->procs[i], local_rank); + MPIL_Alltoall_crs(global_recv_data->num_msgs, + dest.data(), + 1, + MPI_INT, + vals.data(), + &n_recvs, + &src, + 1, + MPI_INT, + (void**) &recvbuf, + mpil_info, + mpil_comm); + for (int i = 0; i < n_recvs; i++) + nodes[num_nodes + recvbuf[i]] = src[i]; + + MPIL_Free(src); + MPIL_Free(recvbuf); - if (n_sends + n_recvs) - { - MPI_Waitall(n_sends + n_recvs, requests, MPI_STATUSES_IGNORE); - } MPI_Allreduce(MPI_IN_PLACE, - send_nodes.data(), - num_nodes, - MPI_INT, - MPI_MAX, - locality->communicators->local_comm); - MPI_Allreduce(MPI_IN_PLACE, - recv_nodes.data(), - num_nodes, + nodes.data(), + 2*num_nodes, MPI_INT, MPI_MAX, - locality->communicators->local_comm); + mpil_comm->local_comm); - for (int i = 0; i < n_sends; i++) - { - node = locality->global_comm->send_data->procs[i]; - locality->global_comm->send_data->procs[i] = send_nodes[node]; - } - for (int i = 0; i < n_recvs; i++) - { - node = locality->global_comm->recv_data->procs[i]; - locality->global_comm->recv_data->procs[i] = recv_nodes[node]; - } + for (int i = 0; i < global_send_data->num_msgs; i++) + global_send_data->procs[i] = nodes[num_nodes + global_send_data->procs[i]]; + for (int i = 0; i < global_recv_data->num_msgs; i++) + global_recv_data->procs[i] = nodes[global_recv_data->procs[i]]; + + MPIL_Info_free(&mpil_info); - if (requests) - { - delete[] requests; - } - if (send_buffer) - { - delete[] send_buffer; - } } -// Update indices: -// 1.) map initial sends to point to positions in original data -// 2.) map internal communication steps to point to correct -// position in previously received data -// 3.) map final receives to points in original recv data void form_global_map(const CommData* map_data, std::map& global_map) { int idx; @@ -694,11 +788,6 @@ void map_indices(CommData* idx_data, const CommData* map_data) map_indices(idx_data, global_map); } -int cmpfunc(const void* a, const void* b) -{ - return (*(int*)a - *(int*)b); -} - void remove_duplicates(CommData* comm_pkg) { int start, end; @@ -732,40 +821,55 @@ void remove_duplicates(CommData* comm_pkg) } start = end; comm_pkg->indptr[i + 1] = comm_pkg->size_msgs; + comm_pkg->counts[i] = comm_pkg->indptr[i+1] - comm_pkg->indptr[i]; } } -void remove_duplicates(CommPkg* data) + + +void init_num_msgs(CommData* data, int num_msgs) { - remove_duplicates(data->send_data); - remove_duplicates(data->recv_data); + data->num_msgs = num_msgs; + if (data->num_msgs) + { + data->procs = (int*)malloc(sizeof(int) * data->num_msgs); + data->counts = (int*)malloc(sizeof(int) * data->num_msgs); + } + data->indptr = (int*)malloc(sizeof(int) * (data->num_msgs + 1)); + data->indptr[0] = 0; } -void remove_duplicates(LocalityComm* locality) +void init_size_msgs(CommData* data, int size_msgs) { - remove_duplicates(locality->local_S_comm); - remove_duplicates(locality->local_R_comm); - remove_duplicates(locality->global_comm); + data->size_msgs = size_msgs; + if (data->size_msgs) + { + data->indices = (int*)malloc(data->size_msgs * sizeof(int)); + } } -void update_indices(LocalityComm* locality, - std::map& send_global_to_local, - std::map& recv_global_to_local) +void destroy_comm_data(CommData* data) { - // Remove duplicates - remove_duplicates(locality); - - // Map global indices to usable indices - map_indices(locality->global_comm->send_data, locality->local_S_comm->recv_data); - map_indices(locality->local_R_comm->send_data, locality->global_comm->recv_data); - map_indices(locality->local_S_comm->send_data, send_global_to_local); - map_indices(locality->local_L_comm->send_data, send_global_to_local); - map_indices(locality->local_R_comm->recv_data, recv_global_to_local); - map_indices(locality->local_L_comm->recv_data, recv_global_to_local); + if (data->procs) + { + free(data->procs); + } + if (data->indptr) + { + free(data->indptr); + } + if (data->counts) + { + free(data->counts); + } + if (data->indices) + { + free(data->indices); + } + if (data->buffer) + { + free(data->buffer); + } - // Don't need local_S or global recv indices (just contiguous) - free(locality->local_S_comm->recv_data->indices); - locality->local_S_comm->recv_data->indices = NULL; - free(locality->global_comm->recv_data->indices); - locality->global_comm->recv_data->indices = NULL; + free(data); } diff --git a/library/source/neighborhood/persistent/init_communication.cpp b/library/source/neighborhood/persistent/init_communication.cpp deleted file mode 100644 index 866f98829..000000000 --- a/library/source/neighborhood/persistent/init_communication.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "neighborhood/neighborhood_init.h" -#include "persistent/MPIL_Request.h" - -int init_communication(const void* sendbuffer, - int n_sends, - const int* send_procs, - const int* send_ptr, - MPI_Datatype sendtype, - void* recvbuffer, - int n_recvs, - const int* recv_procs, - const int* recv_ptr, - MPI_Datatype recvtype, - int tag, - MPI_Comm comm, - int* n_request_ptr, - MPI_Request** request_ptr) -{ - int ierr = 0; - int start, size; - int send_size, recv_size; - - char* send_buffer = (char*)sendbuffer; - char* recv_buffer = (char*)recvbuffer; - MPI_Type_size(sendtype, &send_size); - MPI_Type_size(recvtype, &recv_size); - - MPI_Request* requests; - *n_request_ptr = n_recvs + n_sends; - allocate_requests(*n_request_ptr, &requests); - - for (int i = 0; i < n_recvs; i++) - { - start = recv_ptr[i]; - size = recv_ptr[i + 1] - start; - - ierr += MPI_Recv_init(&(recv_buffer[start * recv_size]), - size, - recvtype, - recv_procs[i], - tag, - comm, - &(requests[i])); - } - - for (int i = 0; i < n_sends; i++) - { - start = send_ptr[i]; - size = send_ptr[i + 1] - start; - - ierr += MPI_Send_init(&(send_buffer[start * send_size]), - size, - sendtype, - send_procs[i], - tag, - comm, - &(requests[n_recvs + i])); - } - - *request_ptr = requests; - - return ierr; -} diff --git a/library/source/neighborhood/persistent/init_neighbor_request.cpp b/library/source/neighborhood/persistent/init_neighbor_request.cpp index 4fd049591..f9a17d4ea 100644 --- a/library/source/neighborhood/persistent/init_neighbor_request.cpp +++ b/library/source/neighborhood/persistent/init_neighbor_request.cpp @@ -1,4 +1,6 @@ #include "neighborhood/neighborhood_init.h" +#include +#include void init_neighbor_request(MPIL_Request** request_ptr) { @@ -7,4 +9,38 @@ void init_neighbor_request(MPIL_Request** request_ptr) request->start_function = neighbor_start; request->wait_function = neighbor_wait; -} \ No newline at end of file +} + +void init_packing_buffers(MPIL_Request* request, int size_sends, int* send_indices, + int send_size, const void* _sendbuf, int size_recvs, int* recv_indices, + int recv_size, void* _recvbuf) +{ + if (size_sends) + { + request->size_sends = size_sends; + request->tmp_sendbuf = (char*)malloc(size_sends * send_size); + request->sendbuf = _sendbuf; + request->send_size = send_size; + + if (send_indices) + { + request->send_indices = (int*)malloc(size_sends * sizeof(int)); + memcpy(request->send_indices, send_indices, size_sends*sizeof(int)); + } + + } + + if (size_recvs) + { + request->size_recvs = size_recvs; + request->tmp_recvbuf = (char*)malloc(size_recvs * recv_size); + request->recvbuf = _recvbuf; + request->recv_size = recv_size; + + if (recv_indices) + { + request->recv_indices = (int*)malloc(size_recvs * sizeof(int)); + memcpy(request->recv_indices, recv_indices, size_recvs*sizeof(int)); + } + } +} diff --git a/library/source/neighborhood/persistent/neighbor_start.cpp b/library/source/neighborhood/persistent/neighbor_start.cpp index 6260736a9..52d97341f 100644 --- a/library/source/neighborhood/persistent/neighbor_start.cpp +++ b/library/source/neighborhood/persistent/neighbor_start.cpp @@ -1,5 +1,7 @@ #include // For NULL +#include +#include "locality_aware.h" #include "neighborhood/neighborhood_init.h" #include "persistent/MPIL_Request.h" @@ -13,64 +15,33 @@ int neighbor_start(MPIL_Request* request) int ierr = 0; int idx; - char* send_buffer = NULL; - int recv_size = 0; - if (request->recv_size) - { - send_buffer = (char*)(request->sendbuf); - recv_size = request->recv_size; - } - // Local L sends sendbuf - if (request->local_L_n_msgs) + if (request->local_L_request != NULL) { - for (int i = 0; i < request->locality->local_L_comm->send_data->size_msgs; i++) - { - idx = request->locality->local_L_comm->send_data->indices[i]; - for (int j = 0; j < recv_size; j++) - { - request->locality->local_L_comm->send_data->buffer[i * recv_size + j] = - send_buffer[idx * recv_size + j]; - } - } - ierr += MPI_Startall(request->local_L_n_msgs, request->local_L_requests); + MPIL_Start(request->local_L_request); } // Local S sends sendbuf - if (request->local_S_n_msgs) + if (request->local_S_request != NULL) { - for (int i = 0; i < request->locality->local_S_comm->send_data->size_msgs; i++) - { - idx = request->locality->local_S_comm->send_data->indices[i]; - - for (int j = 0; j < recv_size; j++) - { - request->locality->local_S_comm->send_data->buffer[i * recv_size + j] = - send_buffer[idx * recv_size + j]; - } - } - - ierr += MPI_Startall(request->local_S_n_msgs, request->local_S_requests); - ierr += MPI_Waitall( - request->local_S_n_msgs, request->local_S_requests, MPI_STATUSES_IGNORE); + MPIL_Start(request->local_S_request); + MPIL_Wait(request->local_S_request, MPI_STATUS_IGNORE); + } - // Copy into global->send_data->buffer - for (int i = 0; i < request->locality->global_comm->send_data->size_msgs; i++) + // Global sends buffer in locality, sendbuf in standard + if (request->n_msgs) + { + if (request->size_sends && request->send_indices) { - idx = request->locality->global_comm->send_data->indices[i]; - for (int j = 0; j < recv_size; j++) + for (int i = 0; i < request->size_sends; i++) { - request->locality->global_comm->send_data->buffer[i * recv_size + j] = - request->locality->local_S_comm->recv_data - ->buffer[idx * recv_size + j]; + idx = request->send_indices[i]; + memcpy((char*)(request->tmp_sendbuf) + (i*request->send_size), + (char*)(request->sendbuf) + (idx*request->send_size), + request->send_size); } } - } - - // Global sends buffer in locality, sendbuf in standard - if (request->global_n_msgs) - { - ierr += MPI_Startall(request->global_n_msgs, request->global_requests); + ierr += MPI_Startall(request->n_msgs, request->requests); } return ierr; diff --git a/library/source/neighborhood/persistent/neighbor_wait.cpp b/library/source/neighborhood/persistent/neighbor_wait.cpp index 2df44c364..4a89faccc 100644 --- a/library/source/neighborhood/persistent/neighbor_wait.cpp +++ b/library/source/neighborhood/persistent/neighbor_wait.cpp @@ -1,5 +1,7 @@ #include // For NULL +#include +#include "locality_aware.h" #include "neighborhood/neighborhood_init.h" #include "persistent/MPIL_Request.h" @@ -18,71 +20,36 @@ int neighbor_wait(MPIL_Request* request, MPI_Status* status) int ierr = 0; int idx; - char* recv_buffer = NULL; - int recv_size = 0; - if (request->recv_size) - { - recv_buffer = (char*)(request->recvbuf); - recv_size = request->recv_size; - } - // Global waits for recvs - if (request->global_n_msgs) + if (request->n_msgs) { ierr += MPI_Waitall( - request->global_n_msgs, request->global_requests, MPI_STATUSES_IGNORE); + request->n_msgs, request->requests, MPI_STATUSES_IGNORE); - if (request->local_R_n_msgs) + if (request->size_recvs && request->recv_indices) { - for (int i = 0; i < request->locality->local_R_comm->send_data->size_msgs; - i++) + for (int i = 0; i < request->size_recvs; i++) { - idx = request->locality->local_R_comm->send_data->indices[i]; - for (int j = 0; j < recv_size; j++) - { - request->locality->local_R_comm->send_data - ->buffer[i * recv_size + j] = - request->locality->global_comm->recv_data - ->buffer[idx * recv_size + j]; - } + idx = request->recv_indices[i]; + memcpy((char*)(request->recvbuf) + (idx*request->recv_size), + (char*)(request->tmp_recvbuf) + (i*request->recv_size), + request->recv_size); } } } // Wait for local_R recvs - if (request->local_R_n_msgs) + if (request->local_R_request) { - ierr += MPI_Startall(request->local_R_n_msgs, request->local_R_requests); - ierr += MPI_Waitall( - request->local_R_n_msgs, request->local_R_requests, MPI_STATUSES_IGNORE); - - for (int i = 0; i < request->locality->local_R_comm->recv_data->size_msgs; i++) - { - idx = request->locality->local_R_comm->recv_data->indices[i]; - for (int j = 0; j < recv_size; j++) - { - recv_buffer[idx * recv_size + j] = - request->locality->local_R_comm->recv_data->buffer[i * recv_size + j]; - } - } + MPIL_Start(request->local_R_request); + MPIL_Wait(request->local_R_request, MPI_STATUS_IGNORE); } // Wait for local_L recvs - if (request->local_L_n_msgs) + if (request->local_L_request) { - ierr += MPI_Waitall( - request->local_L_n_msgs, request->local_L_requests, MPI_STATUSES_IGNORE); - - for (int i = 0; i < request->locality->local_L_comm->recv_data->size_msgs; i++) - { - idx = request->locality->local_L_comm->recv_data->indices[i]; - for (int j = 0; j < recv_size; j++) - { - recv_buffer[idx * recv_size + j] = - request->locality->local_L_comm->recv_data->buffer[i * recv_size + j]; - } - } - } + MPIL_Wait(request->local_L_request, MPI_STATUS_IGNORE); + } return ierr; } diff --git a/library/source/neighborhood/sparse_col/alltoall_crs.cpp b/library/source/neighborhood/sparse_col/alltoall_crs.cpp index c6efcf2ed..7127cdfad 100644 --- a/library/source/neighborhood/sparse_col/alltoall_crs.cpp +++ b/library/source/neighborhood/sparse_col/alltoall_crs.cpp @@ -646,6 +646,11 @@ int alltoall_crs_personalized_loc(const int send_nnz, } } + if (n_sends) + { + MPI_Waitall(n_sends, comm->requests, MPI_STATUSES_IGNORE); + } + local_redistribute(node_recv_size, recv_buf, origins, diff --git a/library/source/neighborhood/standard/neighbor_alltoallv_init_standard.cpp b/library/source/neighborhood/standard/neighbor_alltoallv_init_standard.cpp index 7c4373128..db4bd4ecf 100644 --- a/library/source/neighborhood/standard/neighbor_alltoallv_init_standard.cpp +++ b/library/source/neighborhood/standard/neighbor_alltoallv_init_standard.cpp @@ -23,8 +23,32 @@ int neighbor_alltoallv_init_standard(const void* sendbuf, int tag; MPIL_Comm_tag(comm, &tag); - request->global_n_msgs = topo->indegree + topo->outdegree; - allocate_requests(request->global_n_msgs, &(request->global_requests)); + int ierr = neighbor_alltoallv_init_standard_helper(sendbuf, sendcounts, sdispls, + sendtype, recvbuf, recvcounts, rdispls, recvtype, topo, comm->global_comm, + info, tag, request); + + *request_ptr = request; + + return ierr; +} + + + +int neighbor_alltoallv_init_standard_helper(const void* sendbuf, + const int sendcounts[], + const int sdispls[], + MPI_Datatype sendtype, + void* recvbuf, + const int recvcounts[], + const int rdispls[], + MPI_Datatype recvtype, + MPIL_Topo* topo, + MPI_Comm comm, + MPIL_Info* info, + int tag, + MPIL_Request* request) +{ + allocate_requests(topo->indegree + topo->outdegree, request); const char* send_buffer = (const char*)(sendbuf); char* recv_buffer = (char*)(recvbuf); @@ -41,8 +65,8 @@ int neighbor_alltoallv_init_standard(const void* sendbuf, recvtype, topo->sources[i], tag, - comm->global_comm, - &(request->global_requests[i])); + comm, + &(request->requests[i])); } for (int i = 0; i < topo->outdegree; i++) @@ -52,11 +76,9 @@ int neighbor_alltoallv_init_standard(const void* sendbuf, sendtype, topo->destinations[i], tag, - comm->global_comm, - &(request->global_requests[topo->indegree + i])); + comm, + &(request->requests[topo->indegree + i])); } - *request_ptr = request; - return ierr; } diff --git a/library/source/persistent/allocate_requests.cpp b/library/source/persistent/allocate_requests.cpp index f09e7bcaa..ee96a6f43 100644 --- a/library/source/persistent/allocate_requests.cpp +++ b/library/source/persistent/allocate_requests.cpp @@ -2,15 +2,11 @@ #include "persistent/MPIL_Request.h" -void allocate_requests(int n_requests, MPI_Request** request_ptr) +void allocate_requests(int n_requests, MPIL_Request* request) { if (n_requests) { - MPI_Request* request = (MPI_Request*)malloc(sizeof(MPI_Request) * n_requests); - *request_ptr = request; + request->n_msgs = n_requests; + request->requests = (MPI_Request*)malloc(sizeof(MPI_Request) * n_requests); } - else - { - *request_ptr = NULL; - } -} \ No newline at end of file +} diff --git a/library/source/persistent/init_request.cpp b/library/source/persistent/init_request.cpp index 88cb1d032..446500b29 100644 --- a/library/source/persistent/init_request.cpp +++ b/library/source/persistent/init_request.cpp @@ -7,17 +7,24 @@ void init_request(MPIL_Request** request_ptr) { MPIL_Request* request = (MPIL_Request*)malloc(sizeof(MPIL_Request)); - request->locality = NULL; + request->n_msgs = 0; + request->requests = NULL; + request->sendbuf = NULL; + request->recvbuf = NULL; - request->local_L_n_msgs = 0; - request->local_S_n_msgs = 0; - request->local_R_n_msgs = 0; - request->global_n_msgs = 0; + request->size_sends = 0; + request->size_recvs = 0; + request->send_size = 0; + request->recv_size = 0; - request->local_L_requests = NULL; - request->local_S_requests = NULL; - request->local_R_requests = NULL; - request->global_requests = NULL; + request->tmp_sendbuf = NULL; + request->tmp_recvbuf = NULL; + request->send_indices = NULL; + request->recv_indices = NULL; + + request->local_L_request = NULL; + request->local_S_request = NULL; + request->local_R_request = NULL; request->recv_size = 0; request->block_size = 1; @@ -28,4 +35,4 @@ void init_request(MPIL_Request** request_ptr) #endif *request_ptr = request; -} \ No newline at end of file +} diff --git a/library/tests/test_suitesparse_neighbor_alltoallv_init.cpp b/library/tests/test_suitesparse_neighbor_alltoallv_init.cpp index 2d647a7a0..db5641725 100644 --- a/library/tests/test_suitesparse_neighbor_alltoallv_init.cpp +++ b/library/tests/test_suitesparse_neighbor_alltoallv_init.cpp @@ -222,6 +222,9 @@ void test_matrix(const char* filename) xcomm, xinfo, &xrequest); + MPIL_Start(xrequest); + MPIL_Wait(xrequest, &status); + MPIL_Request_free(&xrequest); // Full Locality MPIL_Set_alltoallv_neighbor_init_alogorithm(NEIGHBOR_ALLTOALLV_INIT_LOCALITY); diff --git a/runscript.sh b/runscript.sh new file mode 100644 index 000000000..76e708cd4 --- /dev/null +++ b/runscript.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +#SBATCH --output=reorder_n2.%j.out +#SBATCH --error=reorder_n2.%j.err +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=112 +#SBATCH --cpus-per-task=1 +#SBATCH --time=00:20:00 +#SBATCH --partition=pbatch + +module load gcc +module load openmpi + +cd ${HOME}/locality_fork/build/benchmarks +folder=/usr/workspace/bienz1/benchmark_mats +for mat in $folder/*.pm; do + echo $mat + for (( nodes = 2; nodes <= 2; nodes*=2 )); + do + procs=$((112*nodes)) + srun -n $procs -N $nodes ./reorder_msgs $mat + done +done +