Skip to content

Commit 0ee04c7

Browse files
Merge pull request #2483 from rh-roman/backport-api-stream-418
[release-4.18] CNTRLPLANE-1610: Backport StreamingCollectionEncoding for JSON and protobuf
2 parents e2e5d62 + 2c61e8e commit 0ee04c7

File tree

18 files changed

+2368
-34
lines changed

18 files changed

+2368
-34
lines changed

pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,14 @@ const (
817817
// Enables support for the StorageVersionMigrator controller.
818818
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"
819819

820+
// owner: @serathius
821+
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
822+
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"
823+
824+
// owner: serathius
825+
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
826+
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"
827+
820828
// owner: @robscott
821829
// kep: https://kep.k8s.io/2433
822830
// alpha: v1.21

pkg/features/versioned_kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package features
1818

1919
import (
20+
"k8s.io/apimachinery/pkg/util/version"
2021
"k8s.io/component-base/featuregate"
2122
)
2223

@@ -31,4 +32,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
3132
// genericfeatures.EmulationVersion: {
3233
// {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
3334
// },
35+
StreamingCollectionEncodingToJSON: {
36+
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
37+
},
38+
39+
StreamingCollectionEncodingToProtobuf: {
40+
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
41+
},
3442
}

pkg/registry/core/rest/storage_core_generic.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import (
3333
"k8s.io/client-go/informers"
3434
restclient "k8s.io/client-go/rest"
3535

36+
"k8s.io/apimachinery/pkg/runtime/serializer"
37+
"k8s.io/apiserver/pkg/features"
38+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3639
"k8s.io/kubernetes/pkg/api/legacyscheme"
3740
api "k8s.io/kubernetes/pkg/apis/core"
3841
configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage"
@@ -69,6 +72,17 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
6972
NegotiatedSerializer: legacyscheme.Codecs,
7073
}
7174

75+
opts := []serializer.CodecFactoryOptionsMutator{}
76+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
77+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
78+
}
79+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
80+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
81+
}
82+
if len(opts) != 0 {
83+
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
84+
}
85+
7286
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
7387
if err != nil {
7488
return genericapiserver.APIGroupInfo{}, err

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ import (
6969
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
7070
"k8s.io/apiserver/pkg/endpoints/metrics"
7171
apirequest "k8s.io/apiserver/pkg/endpoints/request"
72+
"k8s.io/apiserver/pkg/features"
7273
"k8s.io/apiserver/pkg/registry/generic"
7374
genericfilters "k8s.io/apiserver/pkg/server/filters"
75+
utilfeature "k8s.io/apiserver/pkg/util/feature"
7476
"k8s.io/apiserver/pkg/util/webhook"
7577
"k8s.io/apiserver/pkg/warning"
7678
"k8s.io/client-go/scale"
@@ -848,6 +850,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
848850
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
849851

850852
// CRDs explicitly do not support protobuf, but some objects returned by the API server do
853+
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
851854
negotiatedSerializer := unstructuredNegotiatedSerializer{
852855
typer: typer,
853856
creator: creator,
@@ -861,10 +864,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
861864
MediaTypeType: "application",
862865
MediaTypeSubType: "json",
863866
EncodesAsText: true,
864-
Serializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, false),
865-
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, true),
867+
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
868+
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
866869
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
867-
Strict: true,
870+
Strict: true,
871+
StreamingCollectionsEncoding: streamingCollections,
868872
}),
869873
StreamSerializer: &runtime.StreamSerializerInfo{
870874
EncodesAsText: true,
@@ -887,7 +891,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
887891
MediaType: "application/vnd.kubernetes.protobuf",
888892
MediaTypeType: "application",
889893
MediaTypeSubType: "vnd.kubernetes.protobuf",
890-
Serializer: protobuf.NewSerializer(creator, typer),
894+
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
895+
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
896+
}),
891897
StreamSerializer: &runtime.StreamSerializerInfo{
892898
Serializer: protobuf.NewRawSerializer(creator, typer),
893899
Framer: protobuf.LengthDelimitedFramer,
@@ -958,7 +964,14 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
958964
scaleScope := *requestScopes[v.Name]
959965
scaleConverter := scale.NewScaleConverter()
960966
scaleScope.Subresource = "scale"
961-
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
967+
var opts []serializer.CodecFactoryOptionsMutator
968+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
969+
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
970+
}
971+
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
972+
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
973+
}
974+
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
962975
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
963976
scaleScope.Namer = handlers.ContextBasedNaming{
964977
Namer: meta.NewAccessor(),

staging/src/k8s.io/apimachinery/pkg/api/meta/help.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
221221
if err != nil {
222222
return nil, err
223223
}
224+
if items.IsNil() {
225+
return nil, nil
226+
}
224227
list := make([]runtime.Object, items.Len())
225228
if len(list) == 0 {
226229
return list, nil

staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type serializerType struct {
5252
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType {
5353
jsonSerializer := json.NewSerializerWithOptions(
5454
mf, scheme, scheme,
55-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
55+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
5656
)
5757
jsonSerializerType := serializerType{
5858
AcceptContentTypes: []string{runtime.ContentTypeJSON},
@@ -73,7 +73,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
7373

7474
strictJSONSerializer := json.NewSerializerWithOptions(
7575
mf, scheme, scheme,
76-
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
76+
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
7777
)
7878
jsonSerializerType.StrictSerializer = strictJSONSerializer
7979

@@ -85,7 +85,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
8585
mf, scheme, scheme,
8686
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
8787
)
88-
protoSerializer := protobuf.NewSerializer(scheme, scheme)
88+
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
89+
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
90+
})
8991
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)
9092

9193
serializers := []serializerType{
@@ -136,6 +138,9 @@ type CodecFactoryOptions struct {
136138
Strict bool
137139
// Pretty includes a pretty serializer along with the non-pretty one
138140
Pretty bool
141+
142+
StreamingCollectionsEncodingToJSON bool
143+
StreamingCollectionsEncodingToProtobuf bool
139144
}
140145

141146
// CodecFactoryOptionsMutator takes a pointer to an options struct and then modifies it.
@@ -162,6 +167,18 @@ func DisableStrict(options *CodecFactoryOptions) {
162167
options.Strict = false
163168
}
164169

170+
func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
171+
return func(options *CodecFactoryOptions) {
172+
options.StreamingCollectionsEncodingToJSON = true
173+
}
174+
}
175+
176+
func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
177+
return func(options *CodecFactoryOptions) {
178+
options.StreamingCollectionsEncodingToProtobuf = true
179+
}
180+
}
181+
165182
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
166183
// and conversion wrappers to define preferred internal and external versions. In the future,
167184
// as the internal version is used less, callers may instead use a defaulting serializer and

0 commit comments

Comments
 (0)