Skip to content

Commit ff11f75

Browse files
committed
Add StreamSSE method and unit tests to CLI client
Adds StreamSSE(ctx) to the Client type, opening GET /events with Accept: text/event-stream, signalling on each data: line, and distinguishing clean context cancellation from unexpected EOF or non-2xx status. Both returned channels are buffered. Unit tests cover 3-event EOF, context cancel, and non-2xx cases using httptest. Built with Raymond (Agent Orchestrator)
1 parent 54776b1 commit ff11f75

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed

internal/cli/client.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package cli
22

33
import (
4+
"bufio"
45
"bytes"
6+
"context"
57
"encoding/json"
68
"fmt"
79
"io"
810
"net/http"
11+
"strings"
912
)
1013

1114
const defaultURL = "http://localhost:9999"
@@ -86,6 +89,66 @@ func (c *Client) Do(method, path string, body any) (json.RawMessage, error) {
8689
return json.RawMessage(respBody), nil
8790
}
8891

92+
// StreamSSE opens a connection to the /events SSE endpoint and returns two
93+
// buffered channels: a signal channel that receives a value for each SSE event,
94+
// and an error channel that receives nil on clean context cancellation or a
95+
// non-nil error on unexpected connection failure.
96+
func (c *Client) StreamSSE(ctx context.Context) (<-chan struct{}, <-chan error) {
97+
signals := make(chan struct{}, 16)
98+
errs := make(chan error, 1)
99+
100+
go func() {
101+
defer close(errs)
102+
defer close(signals)
103+
104+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+"/events", nil)
105+
if err != nil {
106+
errs <- fmt.Errorf("creating SSE request: %w", err)
107+
return
108+
}
109+
req.Header.Set("Accept", "text/event-stream")
110+
if c.Token != "" {
111+
req.Header.Set("Authorization", "Bearer "+c.Token)
112+
}
113+
114+
resp, err := c.HTTPClient.Do(req)
115+
if err != nil {
116+
if ctx.Err() != nil {
117+
errs <- nil
118+
} else {
119+
errs <- fmt.Errorf("SSE connect: %w", err)
120+
}
121+
return
122+
}
123+
defer resp.Body.Close()
124+
125+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
126+
errs <- fmt.Errorf("SSE HTTP %d", resp.StatusCode)
127+
return
128+
}
129+
130+
scanner := bufio.NewScanner(resp.Body)
131+
for scanner.Scan() {
132+
line := scanner.Text()
133+
if strings.HasPrefix(line, "data:") {
134+
signals <- struct{}{}
135+
}
136+
}
137+
138+
if ctx.Err() != nil {
139+
errs <- nil
140+
} else {
141+
scanErr := scanner.Err()
142+
if scanErr == nil {
143+
scanErr = io.EOF
144+
}
145+
errs <- fmt.Errorf("SSE stream ended: %w", scanErr)
146+
}
147+
}()
148+
149+
return signals, errs
150+
}
151+
89152
// prettyJSON formats a json.RawMessage with 2-space indentation.
90153
func prettyJSON(data json.RawMessage) (string, error) {
91154
var buf bytes.Buffer

internal/cli/client_sse_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
)
11+
12+
func newTestClient(serverURL string) *Client {
13+
return &Client{
14+
BaseURL: serverURL,
15+
HTTPClient: http.DefaultClient,
16+
}
17+
}
18+
19+
// TestStreamSSE_ServerSendsEvents verifies that 3 SSE events produce 3 signals,
20+
// then the closed body causes a non-nil error.
21+
func TestStreamSSE_ServerSendsEvents(t *testing.T) {
22+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
w.Header().Set("Content-Type", "text/event-stream")
24+
w.WriteHeader(http.StatusOK)
25+
flusher, ok := w.(http.Flusher)
26+
if !ok {
27+
t.Error("ResponseWriter does not support Flush")
28+
return
29+
}
30+
for i := 0; i < 3; i++ {
31+
fmt.Fprintf(w, "data: update\n\n")
32+
flusher.Flush()
33+
}
34+
// Close the connection by returning — body EOF triggers error path.
35+
}))
36+
defer srv.Close()
37+
38+
client := newTestClient(srv.URL)
39+
ctx := context.Background()
40+
signals, errs := client.StreamSSE(ctx)
41+
42+
count := 0
43+
timeout := time.After(5 * time.Second)
44+
loop:
45+
for {
46+
select {
47+
case _, ok := <-signals:
48+
if !ok {
49+
break loop
50+
}
51+
count++
52+
case <-timeout:
53+
t.Fatal("timed out waiting for signals")
54+
}
55+
}
56+
57+
if count != 3 {
58+
t.Fatalf("expected 3 signals, got %d", count)
59+
}
60+
61+
err, ok := <-errs
62+
if !ok {
63+
t.Fatal("errs channel closed without a value")
64+
}
65+
if err == nil {
66+
t.Fatal("expected non-nil error for unexpected EOF, got nil")
67+
}
68+
}
69+
70+
// TestStreamSSE_ContextCancel verifies that cancelling the context causes a nil error.
71+
func TestStreamSSE_ContextCancel(t *testing.T) {
72+
// Server blocks indefinitely until the client disconnects.
73+
ready := make(chan struct{})
74+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
75+
w.Header().Set("Content-Type", "text/event-stream")
76+
w.WriteHeader(http.StatusOK)
77+
if f, ok := w.(http.Flusher); ok {
78+
f.Flush()
79+
}
80+
close(ready)
81+
<-r.Context().Done()
82+
}))
83+
defer srv.Close()
84+
85+
client := newTestClient(srv.URL)
86+
ctx, cancel := context.WithCancel(context.Background())
87+
signals, errs := client.StreamSSE(ctx)
88+
89+
// Wait until the server has accepted the connection, then cancel.
90+
select {
91+
case <-ready:
92+
case <-time.After(5 * time.Second):
93+
t.Fatal("server never became ready")
94+
}
95+
cancel()
96+
97+
// Drain signals.
98+
timeout := time.After(5 * time.Second)
99+
for {
100+
select {
101+
case _, ok := <-signals:
102+
if !ok {
103+
goto checkErr
104+
}
105+
case <-timeout:
106+
t.Fatal("timed out waiting for signal channel to close")
107+
}
108+
}
109+
checkErr:
110+
select {
111+
case err, ok := <-errs:
112+
if ok && err != nil {
113+
t.Fatalf("expected nil error on context cancel, got %v", err)
114+
}
115+
case <-time.After(5 * time.Second):
116+
t.Fatal("timed out waiting for error channel")
117+
}
118+
}
119+
120+
// TestStreamSSE_NonTwoxxStatus verifies that a non-2xx response yields a non-nil error.
121+
func TestStreamSSE_NonTwoxxStatus(t *testing.T) {
122+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123+
http.Error(w, "forbidden", http.StatusForbidden)
124+
}))
125+
defer srv.Close()
126+
127+
client := newTestClient(srv.URL)
128+
ctx := context.Background()
129+
signals, errs := client.StreamSSE(ctx)
130+
131+
// Drain signal channel.
132+
timeout := time.After(5 * time.Second)
133+
for {
134+
select {
135+
case _, ok := <-signals:
136+
if !ok {
137+
goto checkErr
138+
}
139+
case <-timeout:
140+
t.Fatal("timed out waiting for signal channel to close")
141+
}
142+
}
143+
checkErr:
144+
select {
145+
case err, ok := <-errs:
146+
if !ok {
147+
t.Fatal("errs channel closed without a value")
148+
}
149+
if err == nil {
150+
t.Fatal("expected non-nil error for non-2xx status, got nil")
151+
}
152+
case <-time.After(5 * time.Second):
153+
t.Fatal("timed out waiting for error")
154+
}
155+
}

0 commit comments

Comments
 (0)