@@ -5,21 +5,19 @@ package stencil
5
5
6
6
import (
7
7
"encoding/json"
8
- "io"
9
8
"time"
10
9
10
+ "github.com/goburrow/cache"
11
11
"github.com/pkg/errors"
12
- "go.uber.org/multierr"
13
12
"google.golang.org/protobuf/encoding/protojson"
14
13
"google.golang.org/protobuf/proto"
15
14
"google.golang.org/protobuf/reflect/protoreflect"
16
- "google.golang.org/protobuf/types/dynamicpb"
17
15
)
18
16
19
17
var (
20
18
//ErrNotFound default sentinel error if proto not found
21
19
ErrNotFound = errors .New ("not found" )
22
- //ErrNotFound is for when descriptor does not match the message
20
+ //ErrInvalidDescriptor is for when descriptor does not match the message
23
21
ErrInvalidDescriptor = errors .New ("invalid descriptor" )
24
22
)
25
23
@@ -29,23 +27,17 @@ type Client interface {
29
27
// Parse parses protobuf message from wire format to protoreflect.ProtoMessage given fully qualified name of proto message.
30
28
// Returns ErrNotFound error if given class name is not found
31
29
Parse (string , []byte ) (protoreflect.ProtoMessage , error )
32
- // ParseWithRefresh parses protobuf message from wire format to `protoreflect.ProtoMessage` given fully qualified name of proto message.
33
- // Refreshes proto definitions if parsed message has unknown fields and parses the message again.
34
- // Returns ErrNotFound error if given class name is not found.
35
- ParseWithRefresh (string , []byte ) (protoreflect.ProtoMessage , error )
36
30
// Serialize serializes data to bytes given fully qualified name of proto message.
37
31
// Returns ErrNotFound error if given class name is not found
38
32
Serialize (string , interface {}) ([]byte , error )
39
- // SerializeWithRefresh serializes data to bytes given fully qualified name of proto message.
40
- // Refreshes proto definitions if message has unknown fields and serialized the message again.
41
- // Returns ErrNotFound error if given class name is not found.
42
- SerializeWithRefresh (string , interface {}) ([]byte , error )
43
33
// GetDescriptor returns protoreflect.MessageDescriptor given fully qualified proto java class name
44
34
GetDescriptor (string ) (protoreflect.MessageDescriptor , error )
45
35
// Close stops background refresh if configured.
46
36
Close ()
47
- // Refresh downloads latest proto definitions
48
- Refresh () error
37
+ // Refresh loads new values from specified url. If the schema is already fetched, the previous value
38
+ // will continue to be used by Parse methods while the new value is loading.
39
+ // If schemas not loaded, then this function will block until the value is loaded.
40
+ Refresh ()
49
41
}
50
42
51
43
// HTTPOptions options for http client
@@ -66,6 +58,11 @@ type Options struct {
66
58
RefreshInterval time.Duration
67
59
// HTTPOptions options for http client
68
60
HTTPOptions
61
+ // RefreshStrategy refresh strategy to use while fetching schema.
62
+ // Default strategy set to `stencil.LongPollingRefresh` strategy
63
+ RefreshStrategy
64
+ // Logger is the interface used to get logging from stencil internals.
65
+ Logger
69
66
}
70
67
71
68
func (o * Options ) setDefaults () {
@@ -77,50 +74,56 @@ func (o *Options) setDefaults() {
77
74
}
78
75
}
79
76
77
+ // NewClient creates stencil client. Downloads proto descriptor file from given url and stores the definitions.
78
+ // It will throw error if download fails or downloaded file is not fully contained descriptor file
79
+ func NewClient (urls []string , options Options ) (Client , error ) {
80
+ cacheOptions := []cache.Option {cache .WithMaximumSize (len (urls ))}
81
+ options .setDefaults ()
82
+ if options .AutoRefresh {
83
+ cacheOptions = append (cacheOptions , cache .WithRefreshAfterWrite (options .RefreshInterval ), cache .WithExpireAfterWrite (options .RefreshInterval ))
84
+ }
85
+ newCache := cache .NewLoadingCache (options .RefreshStrategy .getLoader (options ), cacheOptions ... )
86
+ s , err := newStore (urls , newCache )
87
+ if err != nil {
88
+ return nil , err
89
+ }
90
+ return & stencilClient {urls : urls , store : s , options : options }, nil
91
+ }
92
+
80
93
type stencilClient struct {
81
- timer io.Closer
82
94
urls []string
83
- store * descriptorStore
95
+ store * store
84
96
options Options
85
97
}
86
98
87
99
func (s * stencilClient ) Parse (className string , data []byte ) (protoreflect.ProtoMessage , error ) {
88
- desc , ok := s .store . get (className )
100
+ resolver , ok := s .getMatchingResolver (className )
89
101
if ! ok {
90
102
return nil , ErrNotFound
91
103
}
92
- m := dynamicpb .NewMessage (desc ).New ().Interface ()
93
- err := proto.UnmarshalOptions {Resolver : s .store .extensionResolver }.Unmarshal (data , m )
104
+ messageType , _ := resolver .Get (className )
105
+ m := messageType .New ().Interface ()
106
+ err := proto.UnmarshalOptions {Resolver : resolver .GetTypeResolver ()}.Unmarshal (data , m )
94
107
return m , err
95
108
}
96
109
97
- func (s * stencilClient ) ParseWithRefresh (className string , data []byte ) (protoreflect.ProtoMessage , error ) {
98
- m , err := s .Parse (className , data )
99
- if err != nil || m .ProtoReflect ().GetUnknown () == nil {
100
- return m , err
101
- }
102
- if err = s .Refresh (); err != nil {
103
- return m , err
104
- }
105
- return s .Parse (className , data )
106
- }
107
-
108
110
func (s * stencilClient ) Serialize (className string , data interface {}) (bytes []byte , err error ) {
109
111
// message to json
110
112
jsonBytes , err := json .Marshal (data )
111
113
if err != nil {
112
114
return
113
115
}
114
116
115
- // get descriptor
116
- desc , err := s .GetDescriptor (className )
117
- if err != nil {
118
- return
117
+ resolver , ok := s .getMatchingResolver (className )
118
+ if ! ok {
119
+ return nil , ErrNotFound
119
120
}
120
121
122
+ // get descriptor
123
+ messageType , _ := resolver .Get (className )
121
124
// construct proto message
122
- m := dynamicpb . NewMessage ( desc ) .New ().Interface ()
123
- err = protojson.UnmarshalOptions {Resolver : s . store . extensionResolver }.Unmarshal (jsonBytes , m )
125
+ m := messageType .New ().Interface ()
126
+ err = protojson.UnmarshalOptions {Resolver : resolver . GetTypeResolver () }.Unmarshal (jsonBytes , m )
124
127
if err != nil {
125
128
return bytes , ErrInvalidDescriptor
126
129
}
@@ -129,63 +132,37 @@ func (s *stencilClient) Serialize(className string, data interface{}) (bytes []b
129
132
return proto .Marshal (m )
130
133
}
131
134
132
- func (s * stencilClient ) SerializeWithRefresh (className string , data interface {}) (bytes []byte , err error ) {
133
- bytes , err = s .Serialize (className , data )
134
- if err == nil || (err != ErrNotFound && err != ErrInvalidDescriptor ) {
135
- return
136
- }
137
-
138
- if err = s .Refresh (); err != nil {
139
- return bytes , errors .Wrap (err , "error refreshing descriptor" )
135
+ func (s * stencilClient ) getMatchingResolver (className string ) (* Resolver , bool ) {
136
+ for _ , url := range s .urls {
137
+ resolver , ok := s .store .getResolver (url )
138
+ if ! ok {
139
+ return nil , false
140
+ }
141
+ _ , ok = resolver .Get (className )
142
+ if ok {
143
+ return resolver , ok
144
+ }
140
145
}
141
-
142
- return s .Serialize (className , data )
146
+ return nil , false
143
147
}
144
148
145
149
func (s * stencilClient ) GetDescriptor (className string ) (protoreflect.MessageDescriptor , error ) {
146
- desc , ok := s .store . get (className )
150
+ resolver , ok := s .getMatchingResolver (className )
147
151
if ! ok {
148
152
return nil , ErrNotFound
149
153
}
150
- return desc , nil
154
+ desc , _ := resolver .Get (className )
155
+ return desc .Descriptor (), nil
151
156
}
152
157
153
158
func (s * stencilClient ) Close () {
154
- if s .timer != nil {
155
- s .timer .Close ()
159
+ if s .store != nil {
160
+ s .store .Close ()
156
161
}
157
162
}
158
163
159
- func (s * stencilClient ) Refresh () error {
160
- var err error
164
+ func (s * stencilClient ) Refresh () {
161
165
for _ , url := range s .urls {
162
- err = multierr . Combine ( err , s .store .loadFromURI (url , s . options ) )
166
+ s .store .Refresh (url )
163
167
}
164
- return err
165
- }
166
-
167
- func (s * stencilClient ) load () error {
168
- s .options .setDefaults ()
169
- if s .options .AutoRefresh {
170
- s .timer = setInterval (s .options .RefreshInterval , func () { s .Refresh () })
171
- }
172
- err := s .Refresh ()
173
- return err
174
- }
175
-
176
- // NewClient creates stencil client. Downloads proto descriptor file from given url and stores the definitions.
177
- // It will throw error if download fails or downloaded file is not fully contained descriptor file
178
- func NewClient (url string , options Options ) (Client , error ) {
179
- s := & stencilClient {store : newStore (), urls : []string {url }, options : options }
180
- err := s .load ()
181
- return s , err
182
- }
183
-
184
- // NewMultiURLClient creates stencil client with multiple urls. Downloads proto descriptor file from given urls and stores the definitions.
185
- // If descriptor files from multiple urls has different schema definitions with same name, last downloaded proto descriptor will override previous entries.
186
- // It will throw error if any of the download fails or any downloaded file is not fully contained descriptor file
187
- func NewMultiURLClient (urls []string , options Options ) (Client , error ) {
188
- s := & stencilClient {store : newStore (), urls : urls , options : options }
189
- err := s .load ()
190
- return s , err
191
168
}
0 commit comments