@@ -19,29 +19,29 @@ import (
19
19
type NetRPCReqType byte
20
20
21
21
const (
22
- NRpcHas NetRPCReqType = iota
23
- NRpcGet NetRPCReqType = iota
24
- NRpcGetSize NetRPCReqType = iota
25
- NRpcPut NetRPCReqType = iota
26
- NRpcDelete NetRPCReqType = iota
27
- NRpcList NetRPCReqType = iota
22
+ NRpcHas NetRPCReqType = iota
23
+ NRpcGet
24
+ NRpcGetSize
25
+ NRpcPut
26
+ NRpcDelete
27
+ NRpcList
28
28
29
29
// todo cancel req
30
30
)
31
31
32
32
type NetRPCRespType byte
33
33
34
34
const (
35
- NRpcOK NetRPCRespType = iota
36
- NRpcErr NetRPCRespType = iota
37
- NRpcMore NetRPCRespType = iota
35
+ NRpcOK NetRPCRespType = iota
36
+ NRpcErr
37
+ NRpcMore
38
38
)
39
39
40
40
type NetRPCErrType byte
41
41
42
42
const (
43
- NRpcErrGeneric NetRPCErrType = iota
44
- NRpcErrNotFound NetRPCErrType = iota
43
+ NRpcErrGeneric NetRPCErrType = iota
44
+ NRpcErrNotFound
45
45
)
46
46
47
47
type NetRpcReq struct {
@@ -87,7 +87,7 @@ type NetworkStore struct {
87
87
closed chan struct {}
88
88
89
89
closeLk sync.Mutex
90
- onClose func ()
90
+ onClose [] func ()
91
91
}
92
92
93
93
func NewNetworkStore (mss msgio.ReadWriteCloser ) * NetworkStore {
@@ -143,7 +143,7 @@ func (n *NetworkStore) OnClose(cb func()) {
143
143
case <- n .closed :
144
144
cb ()
145
145
default :
146
- n .onClose = cb
146
+ n .onClose = append ( n . onClose , cb )
147
147
}
148
148
}
149
149
@@ -154,7 +154,9 @@ func (n *NetworkStore) receive() {
154
154
155
155
close (n .closed )
156
156
if n .onClose != nil {
157
- n .onClose ()
157
+ for _ , f := range n .onClose {
158
+ f ()
159
+ }
158
160
}
159
161
}()
160
162
@@ -203,6 +205,7 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)
203
205
204
206
n .respLk .Lock ()
205
207
if n .respMap == nil {
208
+ n .respLk .Unlock ()
206
209
return 0 , nil , xerrors .Errorf ("netstore closed" )
207
210
}
208
211
n .respMap [rid ] = respCh
@@ -218,22 +221,24 @@ func (n *NetworkStore) sendRpc(rt NetRPCReqType, cids []cid.Cid, data [][]byte)
218
221
var rbuf bytes.Buffer // todo buffer pool
219
222
if err := req .MarshalCBOR (& rbuf ); err != nil {
220
223
n .respLk .Lock ()
224
+ defer n .respLk .Unlock ()
225
+
221
226
if n .respMap == nil {
222
227
return 0 , nil , xerrors .Errorf ("netstore closed" )
223
228
}
224
229
delete (n .respMap , rid )
225
- n .respLk .Unlock ()
226
230
227
231
return 0 , nil , err
228
232
}
229
233
230
234
if err := n .msgStream .WriteMsg (rbuf .Bytes ()); err != nil {
231
235
n .respLk .Lock ()
236
+ defer n .respLk .Unlock ()
237
+
232
238
if n .respMap == nil {
233
239
return 0 , nil , xerrors .Errorf ("netstore closed" )
234
240
}
235
241
delete (n .respMap , rid )
236
- n .respLk .Unlock ()
237
242
238
243
return 0 , nil , err
239
244
}
@@ -260,10 +265,10 @@ func (n *NetworkStore) waitResp(ctx context.Context, rch <-chan NetRpcResp, rid
260
265
} else {
261
266
err = xerrors .Errorf ("block not found, but cid was null" )
262
267
}
263
- default :
264
- err = xerrors .Errorf ("unknown error type" )
265
268
case NRpcErrGeneric :
266
269
err = xerrors .Errorf ("generic error" )
270
+ default :
271
+ err = xerrors .Errorf ("unknown error type" )
267
272
}
268
273
269
274
return NetRpcResp {}, xerrors .Errorf ("netstore error response: %s (%w)" , e .Msg , err )
0 commit comments