From ef40354ca2e58687f81a8e0b325ba38efb6dcdad Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Thu, 9 Jun 2022 21:27:44 -0700 Subject: [PATCH 1/3] use local serialization lib --- resource-management/go.mod | 1 + .../pkg/common-lib/framer/framer.go | 171 +++++++++++++++ .../pkg/common-lib/framer/framer_test.go | 177 +++++++++++++++ .../pkg/common-lib/serializer/interfaces.go | 201 ++++++++++++++++++ .../pkg/common-lib/serializer/json/json.go | 130 +++++++++++ .../serializer/protobuf/protobuf.go | 118 ++++++++++ .../pkg/service-api/endpoints/installer.go | 76 ++++--- .../service-api/endpoints/installer_test.go | 13 +- 8 files changed, 855 insertions(+), 32 deletions(-) create mode 100644 resource-management/pkg/common-lib/framer/framer.go create mode 100644 resource-management/pkg/common-lib/framer/framer_test.go create mode 100644 resource-management/pkg/common-lib/serializer/interfaces.go create mode 100644 resource-management/pkg/common-lib/serializer/json/json.go create mode 100644 resource-management/pkg/common-lib/serializer/protobuf/protobuf.go diff --git a/resource-management/go.mod b/resource-management/go.mod index 81728631..7e578e84 100644 --- a/resource-management/go.mod +++ b/resource-management/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/go-redis/redis/v8 v8.11.5 + github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.0.0 github.com/gorilla/mux v1.8.0 github.com/stretchr/testify v1.7.1 diff --git a/resource-management/pkg/common-lib/framer/framer.go b/resource-management/pkg/common-lib/framer/framer.go new file mode 100644 index 00000000..24c7f300 --- /dev/null +++ b/resource-management/pkg/common-lib/framer/framer.go @@ -0,0 +1,171 @@ +/* +Copyright 2015 The Kubernetes Authors. +Copyright 2022 Authors of Arktos - file modified. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package framer implements simple frame decoding techniques for an io.ReadCloser +package framer + +import ( + "encoding/binary" + "encoding/json" + "io" +) + +type lengthDelimitedFrameWriter struct { + w io.Writer + h [4]byte +} + +func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer { + return &lengthDelimitedFrameWriter{w: w} +} + +// Write writes a single frame to the nested writer, prepending it with the length in +// in bytes of data (as a 4 byte, bigendian uint32). +func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) { + binary.BigEndian.PutUint32(w.h[:], uint32(len(data))) + n, err := w.w.Write(w.h[:]) + if err != nil { + return 0, err + } + if n != len(w.h) { + return 0, io.ErrShortWrite + } + return w.w.Write(data) +} + +type lengthDelimitedFrameReader struct { + r io.ReadCloser + remaining int +} + +// NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed +// frames off of a stream. +// +// The protocol is: +// +// stream: message ... +// message: prefix body +// prefix: 4 byte uint32 in BigEndian order, denotes length of body +// body: bytes (0..prefix) +// +// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead +// will be returned along with the number of bytes read. +func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser { + return &lengthDelimitedFrameReader{r: r} +} + +// Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer +// is returned and subsequent calls will attempt to read the last frame. A frame is complete when +// err is nil. +func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) { + if r.remaining <= 0 { + header := [4]byte{} + n, err := io.ReadAtLeast(r.r, header[:4], 4) + if err != nil { + return 0, err + } + if n != 4 { + return 0, io.ErrUnexpectedEOF + } + frameLength := int(binary.BigEndian.Uint32(header[:])) + r.remaining = frameLength + } + + expect := r.remaining + max := expect + if max > len(data) { + max = len(data) + } + n, err := io.ReadAtLeast(r.r, data[:max], int(max)) + r.remaining -= n + if err == io.ErrShortBuffer || r.remaining > 0 { + return n, io.ErrShortBuffer + } + if err != nil { + return n, err + } + if n != expect { + return n, io.ErrUnexpectedEOF + } + + return n, nil +} + +func (r *lengthDelimitedFrameReader) Close() error { + return r.r.Close() +} + +type jsonFrameReader struct { + r io.ReadCloser + decoder *json.Decoder + remaining []byte +} + +// NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off +// of a wire. +// +// The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate +// the read. +func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser { + return &jsonFrameReader{ + r: r, + decoder: json.NewDecoder(r), + } +} + +// ReadFrame decodes the next JSON object in the stream, or returns an error. The returned +// byte slice will be modified the next time ReadFrame is invoked and should not be altered. +func (r *jsonFrameReader) Read(data []byte) (int, error) { + // Return whatever remaining data exists from an in progress frame + if n := len(r.remaining); n > 0 { + if n <= len(data) { + //nolint:staticcheck // SA4006,SA4010 underlying array of data is modified here. + data = append(data[0:0], r.remaining...) + r.remaining = nil + return n, nil + } + + n = len(data) + //nolint:staticcheck // SA4006,SA4010 underlying array of data is modified here. + data = append(data[0:0], r.remaining[:n]...) + r.remaining = r.remaining[n:] + return n, io.ErrShortBuffer + } + + // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see + // data written to data, or be larger than data and a different array. + n := len(data) + m := json.RawMessage(data[:0]) + if err := r.decoder.Decode(&m); err != nil { + return 0, err + } + + // If capacity of data is less than length of the message, decoder will allocate a new slice + // and set m to it, which means we need to copy the partial result back into data and preserve + // the remaining result for subsequent reads. + if len(m) > n { + //nolint:staticcheck // SA4006,SA4010 underlying array of data is modified here. + data = append(data[0:0], m[:n]...) + r.remaining = m[n:] + return n, io.ErrShortBuffer + } + return len(m), nil +} + +func (r *jsonFrameReader) Close() error { + return r.r.Close() +} diff --git a/resource-management/pkg/common-lib/framer/framer_test.go b/resource-management/pkg/common-lib/framer/framer_test.go new file mode 100644 index 00000000..bf42c15f --- /dev/null +++ b/resource-management/pkg/common-lib/framer/framer_test.go @@ -0,0 +1,177 @@ +/* +Copyright 2016 The Kubernetes Authors. +Copyright 2022 Authors of Arktos - file modified. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framer + +import ( + "bytes" + "io" + "io/ioutil" + "testing" +) + +func TestRead(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, 0x03, 0x04, + 0x00, 0x00, 0x00, 0x03, + 0x05, 0x06, 0x07, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x08, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) + buf := make([]byte, 1) + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x02}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read the remaining frame + buf = make([]byte, 2) + if n, err := r.Read(buf); err != nil && n != 2 && bytes.Equal(buf, []byte{0x03, 0x04}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read with buffer equal to frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x07}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read empty frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read with larger buffer than frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} + +func TestReadLarge(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, 0x03, 0x04, + 0x00, 0x00, 0x00, 0x03, + 0x05, 0x06, 0x07, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x08, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) + buf := make([]byte, 40) + if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x7}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} +func TestReadInvalidFrame(t *testing.T) { + data := []byte{ + 0x00, 0x00, 0x00, 0x04, + 0x01, 0x02, + } + b := bytes.NewBuffer(data) + r := NewLengthDelimitedFrameReader(ioutil.NopCloser(b)) + buf := make([]byte, 1) + if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read the remaining frame + buf = make([]byte, 3) + if n, err := r.Read(buf); err != io.ErrUnexpectedEOF && n != 1 && bytes.Equal(buf, []byte{0x02}) { + t.Fatalf("unexpected: %v %d %v", err, n, buf) + } + // read EOF + if n, err := r.Read(buf); err != io.EOF && n != 0 { + t.Fatalf("unexpected: %v %d", err, n) + } +} + +func TestJSONFrameReader(t *testing.T) { + b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") + r := NewJSONFramedReader(ioutil.NopCloser(b)) + buf := make([]byte, 20) + if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 5 || string(buf[:n]) != `["a"]` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.EOF || n != 0 { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } +} + +func TestJSONFrameReaderShortBuffer(t *testing.T) { + b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]") + r := NewJSONFramedReader(ioutil.NopCloser(b)) + buf := make([]byte, 3) + + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `est` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `":t` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `rue` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `}` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `["a` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + if n, err := r.Read(buf); err != nil || n != 2 || string(buf[:n]) != `"]` { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } + + if n, err := r.Read(buf); err != io.EOF || n != 0 { + t.Fatalf("unexpected: %v %d %q", err, n, buf) + } +} diff --git a/resource-management/pkg/common-lib/serializer/interfaces.go b/resource-management/pkg/common-lib/serializer/interfaces.go new file mode 100644 index 00000000..703a0baf --- /dev/null +++ b/resource-management/pkg/common-lib/serializer/interfaces.go @@ -0,0 +1,201 @@ +/* +Copyright 2014 The Kubernetes Authors. +Copyright 2022 Authors of Arktos - file modified. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serializer + +import ( + "io" +) + +// MemoryAllocator is responsible for allocating memory. +// By encapsulating memory allocation into its own interface, we can reuse the memory +// across many operations in places we know it can significantly improve the performance. +type MemoryAllocator interface { + // Allocate reserves memory for n bytes. + // Note that implementations of this method are not required to zero the returned array. + // It is the caller's responsibility to clean the memory if needed. + Allocate(n uint64) []byte +} + +// SimpleAllocator a wrapper around make([]byte) +// conforms to the MemoryAllocator interface +type SimpleAllocator struct{} + +var _ MemoryAllocator = &SimpleAllocator{} + +func (sa *SimpleAllocator) Allocate(n uint64) []byte { + return make([]byte, n, n) +} + +type ObjectTyper string + +// Identifier represents an identifier. +// Identitier of two different objects should be equal if and only if for every +// input the output they produce is exactly the same. +type Identifier string + +// Encoder writes objects to a serialized form +type Encoder interface { + Encode(interface{}, io.Writer) error + Identifier() Identifier +} + +//// MemoryAllocator is responsible for allocating memory. +//// By encapsulating memory allocation into its own interface, we can reuse the memory +//// across many operations in places we know it can significantly improve the performance. +//type MemoryAllocator interface { +// // Allocate reserves memory for n bytes. +// // Note that implementations of this method are not required to zero the returned array. +// // It is the caller's responsibility to clean the memory if needed. +// Allocate(n uint64) []byte +//} +// +//// EncoderWithAllocator serializes objects in a way that allows callers to manage any additional memory allocations. +//type EncoderWithAllocator interface { +// Encoder +// // EncodeWithAllocator writes an object to a stream as Encode does. +// // In addition, it allows for providing a memory allocator for efficient memory usage during object serialization +// EncodeWithAllocator(obj interface{}, w io.Writer, memAlloc MemoryAllocator) error +//} + +// Decoder attempts to load an object from data. +type Decoder interface { + Decode([]byte, interface{}) (interface{}, error) +} + +// Serializer is the core interface for transforming objects into a serialized format and back. +// Implementations may choose to perform conversion of the object, but no assumptions should be made. +type Serializer interface { + Encoder + Decoder +} + +// Codec is a Serializer that deals with the details of versioning objects. It offers the same +// interface as Serializer, so this is a marker to consumers that care about the version of the objects +// they receive. +// type Codec Serializer + +//// ParameterCodec defines methods for serializing and deserializing API objects to url.Values and +//// performing any necessary conversion. Unlike the normal Codec, query parameters are not self describing +//// and the desired version must be specified. +//type ParameterCodec interface { +// // DecodeParameters takes the given url.Values in the specified group version and decodes them +// // into the provided object, or returns an error. +// DecodeParameters(parameters url.Values, into interface{}) error +// // EncodeParameters encodes the provided object as query parameters or returns an error. +// EncodeParameters(obj interface{}) (url.Values, error) +//} + +// Framer is a factory for creating readers and writers that obey a particular framing pattern. +type Framer interface { + NewFrameReader(r io.ReadCloser) io.ReadCloser + NewFrameWriter(w io.Writer) io.Writer +} + +//// SerializerInfo contains information about a specific serialization format +//type SerializerInfo struct { +// // MediaType is the value that represents this serializer over the wire. +// MediaType string +// // MediaTypeType is the first part of the MediaType ("application" in "application/json"). +// MediaTypeType string +// // MediaTypeSubType is the second part of the MediaType ("json" in "application/json"). +// MediaTypeSubType string +// // EncodesAsText indicates this serializer can be encoded to UTF-8 safely. +// EncodesAsText bool +// // Serializer is the individual object serializer for this media type. +// Serializer Serializer +// // PrettySerializer, if set, can serialize this object in a form biased towards +// // readability. +// PrettySerializer Serializer +// // StrictSerializer, if set, deserializes this object strictly, +// // erring on unknown fields. +// StrictSerializer Serializer +// // StreamSerializer, if set, describes the streaming serialization format +// // for this media type. +// StreamSerializer *StreamSerializerInfo +//} +// +// StreamSerializerInfo contains information about a specific stream serialization format +//type StreamSerializerInfo struct { +// // EncodesAsText indicates this serializer can be encoded to UTF-8 safely. +// EncodesAsText bool +// // Serializer is the top level object serializer for this type when streaming +// Serializer +// // Framer is the factory for retrieving streams that separate objects on the wire +// Framer +//} + +//// NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers +//// for multiple supported media types. This would commonly be accepted by a server component +//// that performs HTTP content negotiation to accept multiple formats. +//type NegotiatedSerializer interface { +// // SupportedMediaTypes is the media types supported for reading and writing single objects. +// SupportedMediaTypes() []SerializerInfo +// +// // EncoderForVersion returns an encoder that ensures objects being written to the provided +// // serializer are in the provided group version. +// EncoderForVersion(serializer Encoder) Encoder +// // DecoderToVersion returns a decoder that ensures objects being read by the provided +// // serializer are in the provided group version by default. +// DecoderToVersion(serializer Decoder) Decoder +//} + +//// ClientNegotiator handles turning an HTTP content type into the appropriate encoder. +//// Use NewClientNegotiator or NewVersionedClientNegotiator to create this interface from +//// a NegotiatedSerializer. +//type ClientNegotiator interface { +// // Encoder returns the appropriate encoder for the provided contentType (e.g. application/json) +// // and any optional mediaType parameters (e.g. pretty=1), or an error. If no serializer is found +// // a NegotiateError will be returned. The current client implementations consider params to be +// // optional modifiers to the contentType and will ignore unrecognized parameters. +// Encoder(contentType string, params map[string]string) (Encoder, error) +// // Decoder returns the appropriate decoder for the provided contentType (e.g. application/json) +// // and any optional mediaType parameters (e.g. pretty=1), or an error. If no serializer is found +// // a NegotiateError will be returned. The current client implementations consider params to be +// // optional modifiers to the contentType and will ignore unrecognized parameters. +// Decoder(contentType string, params map[string]string) (Decoder, error) +// // StreamDecoder returns the appropriate stream decoder for the provided contentType (e.g. +// // application/json) and any optional mediaType parameters (e.g. pretty=1), or an error. If no +// // serializer is found a NegotiateError will be returned. The Serializer and Framer will always +// // be returned if a Decoder is returned. The current client implementations consider params to be +// // optional modifiers to the contentType and will ignore unrecognized parameters. +// StreamDecoder(contentType string, params map[string]string) (Decoder, Serializer, Framer, error) +//} + +//// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers +//// that can read and write data at rest. This would commonly be used by client tools that must +//// read files, or server side storage interfaces that persist restful objects. +//type StorageSerializer interface { +// // SupportedMediaTypes are the media types supported for reading and writing objects. +// SupportedMediaTypes() []SerializerInfo +// +// // UniversalDeserializer returns a Serializer that can read objects in multiple supported formats +// // by introspecting the data at rest. +// UniversalDeserializer() Decoder +// +// // EncoderForVersion returns an encoder that ensures objects being written to the provided +// // serializer are in the provided group version. +// EncoderForVersion(serializer Encoder) Encoder +// // DecoderForVersion returns a decoder that ensures objects being read by the provided +// // serializer are in the provided group version by default. +// DecoderToVersion(serializer Decoder) Decoder +//} + +//// for POC, simply without versioning support of the node object +//type ObjectCreater interface { +// New() (out interface{}, err error) +//} diff --git a/resource-management/pkg/common-lib/serializer/json/json.go b/resource-management/pkg/common-lib/serializer/json/json.go new file mode 100644 index 00000000..57a21038 --- /dev/null +++ b/resource-management/pkg/common-lib/serializer/json/json.go @@ -0,0 +1,130 @@ +/* +Copyright 2015 The Kubernetes Authors. +Copyright 2022 Authors of Arktos - file modified. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "io" + "strconv" + + "k8s.io/klog/v2" + + "global-resource-service/resource-management/pkg/common-lib/serializer" +) + +func NewSerializer(typer serializer.ObjectTyper, pretty bool) *Serializer { + return NewSerializerWithOptions(typer, SerializerOptions{false, false}) +} + +// NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML +// form. If typer is not nil, the object has the group, version, and kind fields set. Options are copied into the Serializer +// and are immutable. +func NewSerializerWithOptions(typer serializer.ObjectTyper, options SerializerOptions) *Serializer { + return &Serializer{ + typer: typer, + options: options, + identifier: identifier(options), + } +} + +// identifier computes Identifier of Encoder based on the given options. +func identifier(options SerializerOptions) serializer.Identifier { + result := map[string]string{ + "name": "json", + "pretty": strconv.FormatBool(options.Pretty), + "strict": strconv.FormatBool(options.Strict), + } + identifier, err := json.Marshal(result) + if err != nil { + klog.Fatalf("Failed marshaling identifier for json Serializer: %v", err) + } + return serializer.Identifier(identifier) +} + +type SerializerOptions struct { + // Pretty: configures a JSON enabled Serializer(`Yaml: false`) to produce human-readable output. + Pretty bool + + // Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML. + // Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths. + Strict bool +} + +// Serializer handles encoding versioned objects into the proper JSON form +type Serializer struct { + options SerializerOptions + typer serializer.ObjectTyper + identifier serializer.Identifier +} + +func (s *Serializer) Decode(data []byte, into interface{}) (interface{}, error) { + + err := s.Unmarshal(data, &into) + + return into, err +} + +// Encode serializes the provided object to the given writer. +func (s *Serializer) Encode(obj interface{}, w io.Writer) error { + return s.doEncode(obj, w) +} + +func (s *Serializer) doEncode(obj interface{}, w io.Writer) error { + if s.options.Pretty { + data, err := json.MarshalIndent(obj, "", " ") + if err != nil { + return err + } + _, err = w.Write(data) + return err + } + encoder := json.NewEncoder(w) + return encoder.Encode(obj) +} + +// IsStrict indicates whether the serializer +// uses strict decoding or not +func (s *Serializer) IsStrict() bool { + return s.options.Strict +} + +func (s *Serializer) Unmarshal(data []byte, into *interface{}) (err error) { + return json.Unmarshal(data, into) +} + +// Identifier implements serializer.Encoder interface. +func (s *Serializer) Identifier() serializer.Identifier { + return s.identifier +} + +//// Framer is the default JSON framing behavior, with newlines delimiting individual objects. +//var Framer = jsonFramer{} +// +//type jsonFramer struct{} +// +//// NewFrameWriter implements stream framing for this serializer +//func (jsonFramer) NewFrameWriter(w io.Writer) io.Writer { +// // we can write JSON objects directly to the writer, because they are self-framing +// return w +//} +// +//// NewFrameReader implements stream framing for this serializer +//func (jsonFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { +// // we need to extract the JSON chunks of data to pass to Decode() +// return framer.NewJSONFramedReader(r) +//} diff --git a/resource-management/pkg/common-lib/serializer/protobuf/protobuf.go b/resource-management/pkg/common-lib/serializer/protobuf/protobuf.go new file mode 100644 index 00000000..649d5cdf --- /dev/null +++ b/resource-management/pkg/common-lib/serializer/protobuf/protobuf.go @@ -0,0 +1,118 @@ +/* +Copyright 2015 The Kubernetes Authors. +Copyright 2022 Authors of Arktos - file modified. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "fmt" + "io" + "reflect" + + "github.com/golang/protobuf/proto" + "k8s.io/klog/v2" + + "global-resource-service/resource-management/pkg/common-lib/serializer" +) + +type errNotMarshalable struct { + t reflect.Type +} + +func (e errNotMarshalable) Error() string { + return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t) +} + +// IsNotMarshalable checks the type of error, returns a boolean true if error is not nil and not marshalable false otherwise +func IsNotMarshalable(err error) bool { + _, ok := err.(errNotMarshalable) + return err != nil && ok +} + +// NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer +// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written +// as-is (any type info passed with the object will be used). +func NewSerializer(typer serializer.ObjectTyper) *Serializer { + return &Serializer{ + typer: typer, + } +} + +// Serializer handles encoding versioned objects into the proper wire form +type Serializer struct { + prefix []byte + typer serializer.ObjectTyper +} + +var _ serializer.Serializer = &Serializer{} + +const serializerIdentifier serializer.Identifier = "protobuf" + +// Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default +// gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *serializer.Unknown, +// the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will +// be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is +// not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most +// errors, the method will return the calculated schema kind. +func (s *Serializer) Decode(data []byte, into interface{}) (interface{}, error) { + + err := proto.Unmarshal(data, into.(proto.Message)) + + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data") + } + + return into, nil +} + +// Encode serializes the provided object to the given writer. +func (s *Serializer) Encode(obj interface{}, w io.Writer) error { + b, err := proto.Marshal(obj.(proto.Message)) + + if err != nil { + klog.Errorf("failed to marshal object, error %v", err) + return err + } + + _, err = w.Write(b) + if err != nil { + klog.Errorf("failed to write response, error %v", err) + return err + } + + return nil +} + +// Identifier implements serializer.Encoder interface. +func (s *Serializer) Identifier() serializer.Identifier { + return serializerIdentifier +} + +//// LengthDelimitedFramer is exported variable of type lengthDelimitedFramer +//var LengthDelimitedFramer = lengthDelimitedFramer{} +// +//// Provides length delimited frame reader and writer methods +//type lengthDelimitedFramer struct{} +// +//// NewFrameWriter implements stream framing for this serializer +//func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer { +// return framer.NewLengthDelimitedFrameWriter(w) +//} +// +//// NewFrameReader implements stream framing for this serializer +//func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser { +// return framer.NewLengthDelimitedFrameReader(r) +//} diff --git a/resource-management/pkg/service-api/endpoints/installer.go b/resource-management/pkg/service-api/endpoints/installer.go index 23b47813..df53fcfa 100644 --- a/resource-management/pkg/service-api/endpoints/installer.go +++ b/resource-management/pkg/service-api/endpoints/installer.go @@ -1,7 +1,6 @@ package endpoints import ( - "encoding/json" "fmt" "github.com/google/uuid" "io/ioutil" @@ -9,7 +8,9 @@ import ( "net/http" di "global-resource-service/resource-management/pkg/common-lib/interfaces/distributor" - store "global-resource-service/resource-management/pkg/common-lib/interfaces/store" + "global-resource-service/resource-management/pkg/common-lib/interfaces/store" + "global-resource-service/resource-management/pkg/common-lib/serializer" + "global-resource-service/resource-management/pkg/common-lib/serializer/json" "global-resource-service/resource-management/pkg/common-lib/types" "global-resource-service/resource-management/pkg/common-lib/types/event" apiTypes "global-resource-service/resource-management/pkg/service-api/types" @@ -17,21 +18,33 @@ import ( type Installer struct { dist di.Interface + js serializer.Serializer + ps serializer.Serializer } func NewInstaller(d di.Interface) *Installer { - return &Installer{d} + s := json.NewSerializer("placeHolder", false) + return &Installer{d, s, s} } func (i *Installer) ClientAdministrationHandler(resp http.ResponseWriter, req *http.Request) { klog.V(3).Infof("handle /client. URL path: %s", req.URL.Path) + // TODO: consider to add multiple handler for different serializer request, could avoid a bit perf impact for this set + var desiredSerializer serializer.Serializer + content := req.Header.Get("Content-Type") + if content == "application/json" { + desiredSerializer = i.js + } else { + desiredSerializer = i.ps + } + switch req.Method { case http.MethodPost: - i.handleClientRegistration(resp, req) + i.handleClientRegistration(resp, req, desiredSerializer) return case http.MethodDelete: - i.handleClientUnRegistration(resp, req) + i.handleClientUnRegistration(resp, req, desiredSerializer) return default: resp.WriteHeader(http.StatusMethodNotAllowed) @@ -41,7 +54,7 @@ func (i *Installer) ClientAdministrationHandler(resp http.ResponseWriter, req *h } // TODO: error handling function -func (i *Installer) handleClientRegistration(resp http.ResponseWriter, req *http.Request) { +func (i *Installer) handleClientRegistration(resp http.ResponseWriter, req *http.Request, desiredSerializer serializer.Serializer) { body, err := ioutil.ReadAll(req.Body) if err != nil { klog.V(3).Infof("error read request. error %v", err) @@ -51,12 +64,13 @@ func (i *Installer) handleClientRegistration(resp http.ResponseWriter, req *http clientReq := apiTypes.ClientRegistrationRequest{} - err = json.Unmarshal(body, &clientReq) + r, err := desiredSerializer.Decode(body, clientReq) if err != nil { klog.V(3).Infof("error unmarshal request body. error %v", err) resp.WriteHeader(http.StatusInternalServerError) return } + clientReq = r.(apiTypes.ClientRegistrationRequest) requestedMachines := clientReq.InitialRequestedResource.TotalMachines if requestedMachines > types.MaxTotalMachinesPerRequest || requestedMachines < types.MinTotalMachinesPerRequest { @@ -80,14 +94,7 @@ func (i *Installer) handleClientRegistration(resp http.ResponseWriter, req *http // for 630, request of initial resource request with client registration is either denied or granted in full ret := apiTypes.ClientRegistrationResponse{ClientId: client.ClientId, GrantedResource: client.Resource} - b, err := json.Marshal(ret) - if err != nil { - klog.V(3).Infof("error marshal client response. error %v", err) - resp.WriteHeader(http.StatusInternalServerError) - return - } - - _, err = resp.Write(b) + err = desiredSerializer.Encode(ret, resp) if err != nil { klog.V(3).Infof("error write response. error %v", err) resp.WriteHeader(http.StatusInternalServerError) @@ -97,7 +104,7 @@ func (i *Installer) handleClientRegistration(resp http.ResponseWriter, req *http return } -func (i *Installer) handleClientUnRegistration(resp http.ResponseWriter, req *http.Request) { +func (i *Installer) handleClientUnRegistration(resp http.ResponseWriter, req *http.Request, desiredSerializer serializer.Serializer) { klog.V(3).Infof("not implemented") resp.WriteHeader(http.StatusNotImplemented) return @@ -106,16 +113,24 @@ func (i *Installer) handleClientUnRegistration(resp http.ResponseWriter, req *ht func (i *Installer) ResourceHandler(resp http.ResponseWriter, req *http.Request) { klog.V(3).Infof("handle /resource. URL path: %s", req.URL.Path) + // TODO: consider to add multiple handler for different serializer request, could avoid a bit perf impact for this set + var desiredSerializer serializer.Serializer + content := req.Header.Get("Content-Type") + if content == "application/json" { + desiredSerializer = i.js + } else { + desiredSerializer = i.ps + } + switch req.Method { case http.MethodGet: ctx := req.Context() clientId := ctx.Value("clientid").(string) if req.URL.Query().Get(WatchParameter) == WatchParameterTrue { - i.serverWatch(resp, req, clientId) + i.serverWatch(resp, req, clientId, desiredSerializer) return } - resp.WriteHeader(http.StatusOK) resp.Header().Set("Content-Type", "text/plain") @@ -125,7 +140,7 @@ func (i *Installer) ResourceHandler(resp http.ResponseWriter, req *http.Request) resp.WriteHeader(http.StatusInternalServerError) return } - i.handleResponseTrunked(resp, nodes) + i.handleResponseTrunked(resp, nodes, desiredSerializer) case http.MethodPut: resp.WriteHeader(http.StatusMethodNotAllowed) return @@ -141,7 +156,7 @@ func (i *Installer) ResourceHandler(resp http.ResponseWriter, req *http.Request) // TODO: with serialization options // TODO: error code and string definition // -func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, clientId string) { +func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, clientId string, desiredSerializer serializer.Serializer) { klog.V(3).Infof("Serving watch for client: %s", clientId) // For 630 distributor impl, watchChannel and stopChannel passed into the Watch routine from API layer @@ -152,7 +167,7 @@ func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, cli defer stopWatch(stopCh) // read request body and get the crv - crvMap, err := getResourceVersionsMap(req) + crvMap, err := getResourceVersionsMap(req, desiredSerializer) if err != nil { klog.Errorf("uUable to get the resource versions. Error %v", err) resp.WriteHeader(http.StatusInternalServerError) @@ -191,7 +206,8 @@ func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, cli return } - if err := json.NewEncoder(resp).Encode(*record.Node); err != nil { + if err := desiredSerializer.Encode(*record.Node, resp); err != nil { + // if err := json.NewEncoder(resp).Encode(*record.Node); err != nil { klog.V(3).Infof("encoding record failed. error %v", err) resp.WriteHeader(http.StatusInternalServerError) return @@ -208,7 +224,7 @@ func stopWatch(stopCh chan struct{}) { stopCh <- struct{}{} } -func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) { +func getResourceVersionsMap(req *http.Request, s serializer.Serializer) (types.ResourceVersionMap, error) { body, err := ioutil.ReadAll(req.Body) if err != nil { @@ -217,30 +233,30 @@ func getResourceVersionsMap(req *http.Request) (types.ResourceVersionMap, error) wr := apiTypes.WatchRequest{} - err = json.Unmarshal(body, wr) + r, err := s.Decode(body, wr) + //err = json.Unmarshal(body, wr) if err != nil { return nil, err } + wr = r.(apiTypes.WatchRequest) return wr.ResourceVersions, nil } -func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode) { +func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*types.LogicalNode, desiredSerializer serializer.Serializer) { var nodesLen = len(nodes) if nodesLen <= ResponseTrunkSize { - ret, err := json.Marshal(nodes) + err := desiredSerializer.Encode(nodes, resp) if err != nil { klog.Errorf("error read get node list. error %v", err) resp.WriteHeader(http.StatusInternalServerError) return } - resp.Write(ret) } else { flusher, ok := resp.(http.Flusher) if !ok { klog.Errorf("expected http.ResponseWriter to be an http.Flusher") } - resp.Header().Set("Connection", "Keep-Alive") resp.Header().Set("X-Content-Type-Options", "nosniff") //TODO: handle network disconnect or similar cases. @@ -253,14 +269,12 @@ func (i *Installer) handleResponseTrunked(resp http.ResponseWriter, nodes []*typ } else { chunkedNodes = nodes[start:nodesLen] } - - ret, err := json.Marshal(chunkedNodes) + err := desiredSerializer.Encode(chunkedNodes, resp) if err != nil { klog.Errorf("error read get node list. error %v", err) resp.WriteHeader(http.StatusInternalServerError) return } - resp.Write(ret) flusher.Flush() start = end } diff --git a/resource-management/pkg/service-api/endpoints/installer_test.go b/resource-management/pkg/service-api/endpoints/installer_test.go index 5351a398..402ff453 100644 --- a/resource-management/pkg/service-api/endpoints/installer_test.go +++ b/resource-management/pkg/service-api/endpoints/installer_test.go @@ -74,7 +74,7 @@ func TestHttpGet(t *testing.T) { distributor.ProcessEvents(eventsAdd) //register client - client := types.Client{ClientId: "12345", Resource: types.ResourceRequest{TotalMachines: 500}, ClientInfo: types.ClientInfoType{}} + client := types.Client{ClientId: "12345", Resource: types.ResourceRequest{TotalMachines: 5000}, ClientInfo: types.ClientInfoType{}} err := distributor.RegisterClient(&client) clientId := client.ClientId @@ -103,18 +103,29 @@ func TestHttpGet(t *testing.T) { dec := json.NewDecoder(recorder.Body) + chunks := 0 + expectedChunks := 10 for dec.More() { err := dec.Decode(&decNodes) if err != nil { klog.Errorf("decode nodes error: %v\n", err) } + chunks++ + klog.Infof("decNode length: %v", len(decNodes)) actualNodes = append(actualNodes, decNodes...) } + klog.Infof("total nodes length: %v", len(actualNodes)) + assert.Equal(t, http.StatusOK, recorder.Code) assert.Equal(t, len(expectedNodes), len(actualNodes)) + // expectedChunks +/- 1 as the range of the expected node length can vary a bit in 630 + if chunks < expectedChunks-1 || chunks > expectedChunks+1 { + t.Fatal("Pagination page count is not correct") + } + // Node list is not ordered, so have to do a one by one comparison for _, n := range expectedNodes { if findNodeInList(n, actualNodes) == false { From 9eddca5e6fbd492f9e6ef310cb44e5585c4035d6 Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Fri, 10 Jun 2022 00:08:23 -0700 Subject: [PATCH 2/3] ut update --- .../service-api/endpoints/installer_test.go | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/resource-management/pkg/service-api/endpoints/installer_test.go b/resource-management/pkg/service-api/endpoints/installer_test.go index 402ff453..8979cef4 100644 --- a/resource-management/pkg/service-api/endpoints/installer_test.go +++ b/resource-management/pkg/service-api/endpoints/installer_test.go @@ -3,11 +3,13 @@ package endpoints import ( "context" "encoding/json" + "math/rand" "net/http" "net/http/httptest" "strconv" "sync" "testing" + "time" "k8s.io/klog/v2" @@ -38,12 +40,15 @@ func tearDown(resourceDistributor *distributor.ResourceDistributor) { func createRandomNode(rv int) *types.LogicalNode { id := uuid.New() + r := types.RegionName(rand.Intn(5)) + rp := types.ResourcePartitionName(rand.Intn(10)) + return &types.LogicalNode{ Id: id.String(), ResourceVersion: strconv.Itoa(rv), GeoInfo: types.NodeGeoInfo{ - Region: 0, - ResourcePartition: 0, + Region: r, + ResourcePartition: rp, }, } } @@ -70,11 +75,16 @@ func TestHttpGet(t *testing.T) { installer := NewInstaller(distributor) // initialize node store with 10K nodes - eventsAdd := generateAddNodeEvent(10000) + eventsAdd := generateAddNodeEvent(1000000) + + klog.Infof("start process [%v] events: %v", len(eventsAdd), time.Now().String()) distributor.ProcessEvents(eventsAdd) + klog.Infof("end process [%v] events: %v", len(eventsAdd), time.Now().String()) + + requestMachines := 25000 //register client - client := types.Client{ClientId: "12345", Resource: types.ResourceRequest{TotalMachines: 5000}, ClientInfo: types.ClientInfoType{}} + client := types.Client{ClientId: "12345", Resource: types.ResourceRequest{TotalMachines: requestMachines}, ClientInfo: types.ClientInfoType{}} err := distributor.RegisterClient(&client) clientId := client.ClientId @@ -92,6 +102,7 @@ func TestHttpGet(t *testing.T) { t.Fatal(err) } + klog.Infof("start list [%v] events: %v", len(expectedNodes), time.Now().String()) recorder := httptest.NewRecorder() ctx := context.WithValue(req.Context(), "clientid", clientId) @@ -104,7 +115,7 @@ func TestHttpGet(t *testing.T) { dec := json.NewDecoder(recorder.Body) chunks := 0 - expectedChunks := 10 + expectedChunks := requestMachines / 500 for dec.More() { err := dec.Decode(&decNodes) if err != nil { @@ -112,10 +123,9 @@ func TestHttpGet(t *testing.T) { } chunks++ - klog.Infof("decNode length: %v", len(decNodes)) actualNodes = append(actualNodes, decNodes...) } - + klog.Infof("end list [%v] events: %v", len(expectedNodes), time.Now().String()) klog.Infof("total nodes length: %v", len(actualNodes)) assert.Equal(t, http.StatusOK, recorder.Code) From e0cc97b819abdbabd828b0ef073459fc7d847ef8 Mon Sep 17 00:00:00 2001 From: Yunwen Bai Date: Fri, 10 Jun 2022 10:34:58 -0700 Subject: [PATCH 3/3] minor update --- resource-management/pkg/service-api/endpoints/installer.go | 2 -- .../pkg/service-api/endpoints/installer_test.go | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-management/pkg/service-api/endpoints/installer.go b/resource-management/pkg/service-api/endpoints/installer.go index df53fcfa..89f5006b 100644 --- a/resource-management/pkg/service-api/endpoints/installer.go +++ b/resource-management/pkg/service-api/endpoints/installer.go @@ -207,7 +207,6 @@ func (i *Installer) serverWatch(resp http.ResponseWriter, req *http.Request, cli } if err := desiredSerializer.Encode(*record.Node, resp); err != nil { - // if err := json.NewEncoder(resp).Encode(*record.Node); err != nil { klog.V(3).Infof("encoding record failed. error %v", err) resp.WriteHeader(http.StatusInternalServerError) return @@ -234,7 +233,6 @@ func getResourceVersionsMap(req *http.Request, s serializer.Serializer) (types.R wr := apiTypes.WatchRequest{} r, err := s.Decode(body, wr) - //err = json.Unmarshal(body, wr) if err != nil { return nil, err } diff --git a/resource-management/pkg/service-api/endpoints/installer_test.go b/resource-management/pkg/service-api/endpoints/installer_test.go index 8979cef4..b73f96b3 100644 --- a/resource-management/pkg/service-api/endpoints/installer_test.go +++ b/resource-management/pkg/service-api/endpoints/installer_test.go @@ -74,7 +74,7 @@ func TestHttpGet(t *testing.T) { distributor.SetPersistHelper(fakeStorage) installer := NewInstaller(distributor) - // initialize node store with 10K nodes + // initialize node store with 1m nodes eventsAdd := generateAddNodeEvent(1000000) klog.Infof("start process [%v] events: %v", len(eventsAdd), time.Now().String()) @@ -115,7 +115,7 @@ func TestHttpGet(t *testing.T) { dec := json.NewDecoder(recorder.Body) chunks := 0 - expectedChunks := requestMachines / 500 + expectedChunks := requestMachines / 500 // default chunk size is 500 for dec.More() { err := dec.Decode(&decNodes) if err != nil {