-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathpcu_msg.c
243 lines (213 loc) · 6.17 KB
/
pcu_msg.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/******************************************************************************
Copyright 2011 Scientific Computation Research Center,
Rensselaer Polytechnic Institute. All rights reserved.
This work is open source software, licensed under the terms of the
BSD license as described in the LICENSE file in the top-level directory.
*******************************************************************************/
#include "pcu_msg.h"
#include "noto_malloc.h"
#include "reel.h"
#include <string.h>
/* the pcu_msg algorithm for a communication phase
is as follows:
1 barrier
2 pack data to be sent
3 requests = send all packed data
4 while (requests not done)
5 receive and process data
6 begin barrier
7 while (barrier not done)
8 receive and process data
The main goal of this is to detect when no more
messages will be received.
To prove that this works, note the following:
1. messages are sent using a synchronous
non-blocking send: requests are not done
until the message is received at its destination.
2. the barrier is not done on any rank until
after all ranks have begun the barrier
3. all requests of this rank are done before this
rank begins the barrier
From this we can prove:
1. if all requests of this rank are done, all messages sent by
this rank have been received at their destinations
2. when the barrier is done, all messages sent by all ranks
have been received at their destinations, i.e.
no more messages can be received.
Finally, the barrier at line 1 is there because it takes
different amounts of time for each rank to be notified
that the barrier is done, and in that time the rank is
naively processing everything it receives.
If another rank is notified first and quickly goes on to
a new phase, it may be able to send a message that is
received by the slow rank out-of-phase.
*/
//enumeration for pcu_msg.state
enum {
idle_state, //in between phases
pack_state, //after phase start, before sending
send_recv_state, //starting to receive, sends still going
recv_state //sends are done, still receiving
};
static void make_comm(pcu_msg* m)
{
pcu_make_aa(&(m->peers));
pcu_make_message(&(m->received));
m->state = idle_state;
}
void pcu_make_msg(pcu_msg* m)
{
make_comm(m);
m->file = NULL;
m->order = NULL;
}
static void free_peers(pcu_aa_tree* t)
{
if (pcu_aa_empty(*t))
return;
free_peers(&((*t)->left));
free_peers(&((*t)->right));
pcu_msg_peer* peer;
peer = (pcu_msg_peer*) *t;
pcu_free_message(&(peer->message));
noto_free(peer);
pcu_make_aa(t);
}
void pcu_msg_start(pcu_mpi_t* mpi, pcu_msg* m)
{
if (m->state != idle_state)
reel_fail("PCU_Comm_Begin called at the wrong time");
/* this barrier ensures no one starts a new superstep
while others are receiving in the past superstep.
It is the only blocking call in the pcu_msg system. */
pcu_barrier(mpi, &(m->coll));
m->state = pack_state;
}
static bool peer_less(pcu_aa_node* a, pcu_aa_node* b)
{
return ((pcu_msg_peer*)a)->message.peer
< ((pcu_msg_peer*)b)->message.peer;
}
static pcu_msg_peer* find_peer(pcu_aa_tree t, int id)
{
pcu_msg_peer key;
key.message.peer = id;
return (pcu_msg_peer*) pcu_aa_find(&(key.node),t,peer_less);
}
static pcu_msg_peer* make_peer(int id)
{
pcu_msg_peer* p;
NOTO_MALLOC(p,1);
pcu_make_message(&(p->message));
p->message.peer = id;
return p;
}
void* pcu_msg_pack(pcu_msg* m, int id, size_t size)
{
if (m->state != pack_state)
reel_fail("PCU_Comm_Pack called at the wrong time");
pcu_msg_peer* peer = find_peer(m->peers,id);
if (!peer)
{
peer = make_peer(id);
pcu_aa_insert(&(peer->node),&(m->peers),peer_less);
}
return pcu_push_buffer(&(peer->message.buffer),size);
}
size_t pcu_msg_packed(pcu_msg* m, int id)
{
if (m->state != pack_state)
reel_fail("PCU_Comm_Packed called at the wrong time");
pcu_msg_peer* peer = find_peer(m->peers,id);
if (!peer)
reel_fail("PCU_Comm_Packed called but nothing was packed");
return peer->message.buffer.size;
}
static void send_peers(pcu_mpi_t* mpi, pcu_aa_tree t)
{
if (pcu_aa_empty(t))
return;
pcu_msg_peer* peer;
peer = (pcu_msg_peer*)t;
pcu_mpi_send(mpi, &(peer->message),mpi->user_comm);
send_peers(mpi, t->left);
send_peers(mpi, t->right);
}
void pcu_msg_send(pcu_mpi_t* mpi, pcu_msg* m)
{
if (m->state != pack_state)
reel_fail("PCU_Comm_Send called at the wrong time");
send_peers(mpi, m->peers);
m->state = send_recv_state;
}
static bool done_sending_peers(pcu_mpi_t* mpi, pcu_aa_tree t)
{
if (pcu_aa_empty(t))
return true;
pcu_msg_peer* peer;
peer = (pcu_msg_peer*)t;
return pcu_mpi_done(mpi, &(peer->message))
&& done_sending_peers(mpi, t->left)
&& done_sending_peers(mpi, t->right);
}
static bool receive_global(pcu_mpi_t* mpi, pcu_msg* m)
{
m->received.peer = MPI_ANY_SOURCE;
while ( ! pcu_mpi_receive(mpi, &(m->received),mpi->user_comm))
{
if (m->state == send_recv_state)
if (done_sending_peers(mpi, m->peers))
{
pcu_begin_barrier(mpi, &(m->coll));
m->state = recv_state;
}
if (m->state == recv_state)
if (pcu_barrier_done(mpi, &(m->coll)))
return false;
}
return true;
}
static void free_comm(pcu_msg* m)
{
free_peers(&(m->peers));
pcu_free_message(&(m->received));
}
bool pcu_msg_receive(pcu_mpi_t* mpi, pcu_msg* m)
{
if ((m->state != send_recv_state)&&
(m->state != recv_state))
reel_fail("PCU_Comm_Receive called at the wrong time");
if ( ! pcu_msg_unpacked(m))
reel_fail("PCU_Comm_Receive called before previous message unpacked");
if (receive_global(mpi, m))
{
pcu_begin_buffer(&(m->received.buffer));
return true;
}
m->state = idle_state;
free_comm(m);
make_comm(m);
return false;
}
void* pcu_msg_unpack(pcu_msg* m, size_t size)
{
return pcu_walk_buffer(&(m->received.buffer),size);
}
bool pcu_msg_unpacked(pcu_msg* m)
{
return pcu_buffer_walked(&(m->received.buffer));
}
int pcu_msg_received_from(pcu_msg* m)
{
return m->received.peer;
}
size_t pcu_msg_received_size(pcu_msg* m)
{
return m->received.buffer.capacity;
}
void pcu_free_msg(pcu_msg* m)
{
free_comm(m);
if (m->file)
fclose(m->file);
}