diff --git a/proxy/protocol/http/sidecar.go b/proxy/protocol/http/sidecar.go index d782657..2851f8c 100755 --- a/proxy/protocol/http/sidecar.go +++ b/proxy/protocol/http/sidecar.go @@ -18,6 +18,7 @@ package http import ( + "bufio" "context" "encoding/json" "errors" @@ -54,6 +55,8 @@ var sr = resolver.GetSourceResolver() const ( XForwardedPort = "X-Forwarded-Port" XForwardedHost = "X-Forwarded-Host" + SSEHeaderKey = "Content-Type" + SSEHeaderValue = "text/event-stream" ) var ( @@ -215,6 +218,33 @@ func RemoteRequestHandler(w http.ResponseWriter, r *http.Request) { } } +func copySSEChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) { + defer func() { + if err := resp.Body.Close(); err != null { + openlog.Error("Http sse response close error: " + err.Error()) + } + }() + + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + openlog.Error("Error reading response line: " + err.Error()) + return + } + + if _, err = w.Write([]byte(line)); err != nil { + openlog.Error("Error reading response line: " + err.Error()) + return + } + + w.(http.Flusher).Flush() + } +} + func copyChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) { if resp == nil { openlog.Warn("response is nil because of unknown reason") @@ -224,6 +254,11 @@ func copyChassisResp2HttpResp(w http.ResponseWriter, resp *http.Response) { copyHeader(w.Header(), resp.Header) w.WriteHeader(resp.StatusCode) + if isSSEResponse(resp.Header) { + copySSEChassisResp2HttpResp(w, resp) + return + } + _, err := io.Copy(w, resp.Body) if err != nil { openlog.Error("can not copy: " + err.Error()) @@ -328,7 +363,11 @@ func prepareRequest(req *http.Request) { func copyHeader(dst, src http.Header) { for k, vs := range src { for _, v := range vs { - dst.Add(k, v) + if SSEHeaderKey == k && strings.Contains(v, SSEHeaderValue) { + dts.Add(SSEHeaderKey, SSEHeaderValue) + } else { + dst.Add(k, v) + } } } } @@ -336,3 +375,10 @@ func copyHeader(dst, src http.Header) { func postProcessResponse(rsp *http.Response) { rsp.Header.Del("Connection") } + +func isSSEResponse(header http.Header) bool { + if header == nil { + return false + } + return SSEHeaderValue == header.Get(SSEHeaderKey) +}