-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathpcu_pmpi.c
105 lines (92 loc) · 2.52 KB
/
pcu_pmpi.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/******************************************************************************
Copyright 2011 Scientific Computation Research Center,
Rensselaer Polytechnic Institute. All rights reserved.
This work is open source software, licensed under the terms of the
BSD license as described in the LICENSE file in the top-level directory.
*******************************************************************************/
#include "pcu_pmpi.h"
#include "pcu_buffer.h"
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
void pcu_pmpi_send2(const pcu_mpi_t* self, pcu_message* m, int tag, MPI_Comm comm);
bool pcu_pmpi_receive2(const pcu_mpi_t*, pcu_message* m, int tag, MPI_Comm comm);
void pcu_pmpi_init(MPI_Comm comm, pcu_mpi_t* self)
{
self->original_comm = comm;
MPI_Comm_dup(comm,&(self->user_comm));
MPI_Comm_dup(comm,&(self->coll_comm));
MPI_Comm_size(comm,&(self->size));
MPI_Comm_rank(comm,&(self->rank));
}
void pcu_pmpi_finalize(pcu_mpi_t* self)
{
MPI_Comm_free(&(self->user_comm));
MPI_Comm_free(&(self->coll_comm));
}
int pcu_pmpi_size(const pcu_mpi_t* self)
{
return self->size;
}
int pcu_pmpi_rank(const pcu_mpi_t* self)
{
return self->rank;
}
void pcu_pmpi_send(const pcu_mpi_t* self, pcu_message* m, MPI_Comm comm)
{
pcu_pmpi_send2(self, m,0,comm);
}
void pcu_pmpi_send2(const pcu_mpi_t* self, pcu_message* m, int tag, MPI_Comm comm)
{
// silence warning
(void)self;
if( m->buffer.size > (size_t)INT_MAX ) {
fprintf(stderr, "ERROR PCU message size exceeds INT_MAX... exiting\n");
abort();
}
MPI_Issend(
m->buffer.start,
(int)(m->buffer.size),
MPI_BYTE,
m->peer,
tag,
comm,
&(m->request));
}
bool pcu_pmpi_done(const pcu_mpi_t* self, pcu_message* m)
{
// silence warning
(void)self;
int flag;
MPI_Test(&(m->request),&flag,MPI_STATUS_IGNORE);
return flag;
}
bool pcu_pmpi_receive(const pcu_mpi_t* self, pcu_message* m, MPI_Comm comm)
{
// silence warning
(void)self;
return pcu_pmpi_receive2(self, m,0,comm);
}
bool pcu_pmpi_receive2(const pcu_mpi_t* self, pcu_message* m, int tag, MPI_Comm comm)
{
// silence warning
(void)self;
MPI_Status status;
int flag;
MPI_Iprobe(m->peer,tag,comm,&flag,&status);
if (!flag)
return false;
m->peer = status.MPI_SOURCE;
int count;
MPI_Get_count(&status,MPI_BYTE,&count);
pcu_resize_buffer(&(m->buffer),(size_t)count);
MPI_Recv(
m->buffer.start,
(int)(m->buffer.size),
MPI_BYTE,
m->peer,
tag,
comm,
MPI_STATUS_IGNORE);
return true;
}