|
4 | 4 | package jsonrpc2
|
5 | 5 |
|
6 | 6 | import (
|
| 7 | + "bytes" |
7 | 8 | "context"
|
| 9 | + "encoding/json" |
8 | 10 | "fmt"
|
9 | 11 | "sync"
|
10 | 12 | "sync/atomic"
|
@@ -75,7 +77,74 @@ func NewConn(s Stream) Conn {
|
75 | 77 | return conn
|
76 | 78 | }
|
77 | 79 |
|
78 |
| -// Notify implemens Conn. |
| 80 | +// Call implements Conn. |
| 81 | +func (c *conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) { |
| 82 | + // generate a new request identifier |
| 83 | + id := ID{number: atomic.AddInt64(&c.seq, 1)} |
| 84 | + call, err := NewCall(id, method, params) |
| 85 | + if err != nil { |
| 86 | + return id, fmt.Errorf("marshaling call parameters: %w", err) |
| 87 | + } |
| 88 | + |
| 89 | + ctx, done := event.Start(ctx, method, |
| 90 | + tag.Method.Of(method), |
| 91 | + tag.RPCDirection.Of(tag.Outbound), |
| 92 | + tag.RPCID.Of(fmt.Sprintf("%q", id)), |
| 93 | + ) |
| 94 | + defer func() { |
| 95 | + recordStatus(ctx, err) |
| 96 | + done() |
| 97 | + }() |
| 98 | + event.Metric(ctx, tag.Started.Of(1)) |
| 99 | + |
| 100 | + // We have to add ourselves to the pending map before we send, otherwise we |
| 101 | + // are racing the response. Also add a buffer to rchan, so that if we get a |
| 102 | + // wire response between the time this call is cancelled and id is deleted |
| 103 | + // from c.pending, the send to rchan will not block. |
| 104 | + rchan := make(chan *Response, 1) |
| 105 | + |
| 106 | + c.pendingMu.Lock() |
| 107 | + c.pending[id] = rchan |
| 108 | + c.pendingMu.Unlock() |
| 109 | + |
| 110 | + defer func() { |
| 111 | + c.pendingMu.Lock() |
| 112 | + delete(c.pending, id) |
| 113 | + c.pendingMu.Unlock() |
| 114 | + }() |
| 115 | + |
| 116 | + // now we are ready to send |
| 117 | + n, err := c.write(ctx, call) |
| 118 | + event.Metric(ctx, tag.SentBytes.Of(n)) |
| 119 | + if err != nil { |
| 120 | + // sending failed, we will never get a response, so don't leave it pending |
| 121 | + return id, err |
| 122 | + } |
| 123 | + |
| 124 | + // now wait for the response |
| 125 | + select { |
| 126 | + case response := <-rchan: |
| 127 | + switch { |
| 128 | + case response.err != nil: // is it an error response? |
| 129 | + return id, response.err |
| 130 | + |
| 131 | + case result == nil || len(response.result) == 0: |
| 132 | + return id, nil |
| 133 | + |
| 134 | + default: |
| 135 | + dec := json.NewDecoder(bytes.NewReader(response.result)) |
| 136 | + if err := dec.Decode(result); err != nil { |
| 137 | + return id, fmt.Errorf("unmarshaling result: %w", err) |
| 138 | + } |
| 139 | + return id, nil |
| 140 | + } |
| 141 | + |
| 142 | + case <-ctx.Done(): |
| 143 | + return id, ctx.Err() |
| 144 | + } |
| 145 | +} |
| 146 | + |
| 147 | +// Notify implements Conn. |
79 | 148 | func (c *conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
|
80 | 149 | notify, err := NewNotification(method, params)
|
81 | 150 | if err != nil {
|
@@ -128,7 +197,7 @@ func (c *conn) write(ctx context.Context, msg Message) (int64, error) {
|
128 | 197 | return c.stream.Write(ctx, msg)
|
129 | 198 | }
|
130 | 199 |
|
131 |
| -// Go implemens Conn. |
| 200 | +// Go implements Conn. |
132 | 201 | func (c *conn) Go(ctx context.Context, handler Handler) {
|
133 | 202 | go c.run(ctx, handler)
|
134 | 203 | }
|
@@ -176,17 +245,17 @@ func (c *conn) run(ctx context.Context, handler Handler) {
|
176 | 245 | }
|
177 | 246 | }
|
178 | 247 |
|
179 |
| -// Close implemens Conn. |
| 248 | +// Close implements Conn. |
180 | 249 | func (c *conn) Close() error {
|
181 | 250 | return c.stream.Close()
|
182 | 251 | }
|
183 | 252 |
|
184 |
| -// Done implemens Conn. |
| 253 | +// Done implements Conn. |
185 | 254 | func (c *conn) Done() <-chan struct{} {
|
186 | 255 | return c.done
|
187 | 256 | }
|
188 | 257 |
|
189 |
| -// Err implemens Conn. |
| 258 | +// Err implements Conn. |
190 | 259 | func (c *conn) Err() error {
|
191 | 260 | if err := c.err.Load(); err != nil {
|
192 | 261 | return err.(error)
|
|
0 commit comments