Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package webservice

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -153,6 +155,10 @@ func writeHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Headers", "X-Requested-With,Content-Type,Accept,Origin")
}

func writeHeader(w http.ResponseWriter, key, val string) {
w.Header().Set(key, val)
}

func buildJSONErrorResponse(w http.ResponseWriter, detail string, code int) {
w.WriteHeader(code)
errorInfo := dao.NewYAPIError(nil, code, detail)
Expand Down Expand Up @@ -746,6 +752,11 @@ func getQueueApplications(w http.ResponseWriter, r *http.Request) {
appsDao = append(appsDao, getApplicationDAO(app))
}

if checkHeader(r.Header, "Content-Encoding", "gzip") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR aims to enhance only getQueueApplication. However, I'd like to open the room to discuss whether we should bring this enhancement to all restful APIs. @targetoee WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. It can be an optional choice for user to provide more flexibility.
Some APIs typically don't return large amounts of data in common cases, so it may be necessary to discuss which ones require this functionality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be necessary to discuss which ones require this functionality.

The improvement you propose is a kind of infra to our Restful APIs, so I prefer to bring such benefit to all APIs if there is no obvious side-effect or cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is exactly why I would make the two functions so we can easily apply it to any REST end points. Anything that sends more than a single IP packet as the response can benefit.
For the streaming API, which uses really small messages that fit in a single IP packet, compressing might be more overhead than the gains we get so that one might not be a candidate everything else is.

compress(w, appsDao)
return
}

if err := json.NewEncoder(w).Encode(appsDao); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
Expand Down Expand Up @@ -1216,3 +1227,40 @@ func getStream(w http.ResponseWriter, r *http.Request) {
}
}
}

func checkHeader(h http.Header, key string, value string) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return error if users use unsupported compression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be necessary. Skipping it is a solution when dealing with unsupported compression types in requests. Do you think it's essential for users to require this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406

It seems to me following the standard error can avoid the misunderstanding in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me following the standard error can avoid the misunderstanding in the future.

NO. We should never return an error to the client if it request an encoding we don't understand. Accept-Encoding: identity is the default, which means the identity encoding is always allowed. Therefore, if an unacceptable encoding is requested, we simply send the request uncompressed and without a Content-Encoding: gzip header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO. We should never return an error to the client if it request an encoding we don't understand. Accept-Encoding: identity is the default, which means the identity encoding is always allowed. Therefore, if an unacceptable encoding is requested, we simply send the request uncompressed and without a Content-Encoding: gzip header.

That is an acceptable way to me, but I'd like to have more discussion for my own education :)

Should we support full representation of Accept-Encoding ( weight and coding )? If yes, we need to consider the Accept-Encoding: gzip;q=1.0, identity; q=0.

Or we ignore the weight and only check the existence of gzip from the Accept-Encoding. This is the solution adopted by this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a common saying in network protocol design: Be liberal in what you accept, strict in what you produce. In other words, we can get away with just checking for the substring gzip in Accept-Encoding, and produce exactly Content-Encoding: gzip in that case. If we choose not to compress due to size, or gzip was not requested, then we use the standard identity version. Weights are not really necessary; yes, they are part of the spec, but the client is only giving its preference; we do not have to honor it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a very simple function like: GetCompressedWriter(headers, writer) (writer) that checks for the gzip header and wraps the given writer with a gzip-compressed one, else returns the original writer. Then in any endpoint we want to (potentially compress), we just replace our writer with that one instead.

Copy link
Contributor

@wilfred-s wilfred-s Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will cause a leak as the gzip writer must be closed for it not to leak. Calling close on the http.ResponseWriter is not possible so we need more. Probably the easiest solution is to use the same solution as we have for the loggingHandler(). We wrap the compression choice in a handler function, which then gets wrapped in the logging handler. That means we have it all in one place and expand on it with compressor pooling or other things in the future.

Example code, which is not complete but gives some idea on how we can close the compressor. That can be expanded to use a sync.Pool to not recreate the zip writer each time and just reset it before use.

type gzipResponseWriter struct {
	io.Writer
	http.ResponseWriter
}

func (w gzipResponseWriter) Write(b []byte) (int, error) {
	return w.Writer.Write(b)
}

func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
			fn(w, r)
			return
		}
		w.Header().Set("Content-Encoding", "gzip")
                w.Header().Del("Content-Length")
		gz := gzip.NewWriter(w)
		defer gz.Close()
		gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
		fn(gzr, r)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to everyone for all the advice. I have got some questions about what @wilfred-s said.
In the provided example, the compress handler would be wrapped within the logging handler. So we first see which API is being called, and then decide whether to use the gzip handler, since we haven't decided to compress all APIs yet? Is there any misunderstanding?

values := h.Values(key)
for _, v := range values {
if v == value {
return true
}
}
return false
}

func compress(w http.ResponseWriter, data any) {
response, err := json.Marshal(data)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}

var compressedData bytes.Buffer
writer := gzip.NewWriter(&compressedData)
_, err = writer.Write(response)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}

err = writer.Close()
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}

writeHeader(w, "Content-Encoding", "gzip")
if _, err = w.Write(compressedData.Bytes()); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}
36 changes: 36 additions & 0 deletions pkg/webservice/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package webservice

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
Expand Down Expand Up @@ -2600,3 +2603,36 @@ func NewResponseRecorderWithDeadline() *ResponseRecorderWithDeadline {
ResponseRecorder: httptest.NewRecorder(),
}
}

func TestCompressQueueApplications(t *testing.T) {
var appsDao []*dao.ApplicationDAOInfo
app := newApplication("app-01", "part-01", "queue-1", rmID, security.UserGroup{})
app2 := newApplication("app-02", "part-01", "queue-1", rmID, security.UserGroup{})
appsDao = append(appsDao, getApplicationDAO(app))
appsDao = append(appsDao, getApplicationDAO(app2))

resp := &MockResponseWriter{}
compress(resp, appsDao)

buf := bytes.NewBuffer(resp.outputBytes)
gzipReader, err := gzip.NewReader(buf)
assert.NilError(t, err, "Error while decompressing data.")
err = gzipReader.Close()
assert.NilError(t, err, "Error when close gzip reader.")

uncompressedData, err := io.ReadAll(gzipReader)
assert.NilError(t, err, "Error when read decoded data.")

var decodedData []*dao.ApplicationDAOInfo
err = json.Unmarshal(uncompressedData, &decodedData)
assert.NilError(t, err, "Error when unmarshal decoded data.")

for i := range decodedData {
assert.Equal(t, appsDao[i].ApplicationID, decodedData[i].ApplicationID)
assert.Equal(t, appsDao[i].Partition, decodedData[i].Partition)
assert.Equal(t, appsDao[i].QueueName, decodedData[i].QueueName)
assert.Equal(t, appsDao[i].SubmissionTime, decodedData[i].SubmissionTime)
assert.Equal(t, appsDao[i].User, decodedData[i].User)
assert.Equal(t, appsDao[i].Groups[0], decodedData[i].Groups[0])
}
}