@@ -30,9 +30,11 @@ import (
3030)
3131
3232const (
33- clusterID = "fake-cluster"
34- topicID = "fake-topic"
35- consumerGroupID = "fake-consumergroup"
33+ clusterID = "fake-cluster"
34+ topicID = "fake-topic"
35+ consumerGroupID = "fake-consumergroup"
36+ connectClusterID = "fake-connect-cluster"
37+ connectorID = "fake-connector"
3638)
3739
3840// The reason why we have a fake server is because testing end-to-end will exceed the deadline of 10 minutes.
@@ -41,14 +43,20 @@ type fakeManagedKafkaServer struct {
4143 managedkafkapb.UnimplementedManagedKafkaServer
4244}
4345
46+ type fakeManagedKafkaConnectServer struct {
47+ managedkafkapb.UnimplementedManagedKafkaConnectServer
48+ }
49+
4450func Options (t * testing.T ) []option.ClientOption {
4551 server := & fakeManagedKafkaServer {}
52+ connectServer := & fakeManagedKafkaConnectServer {}
4653 listener , err := net .Listen ("tcp" , "localhost:0" )
4754 if err != nil {
4855 t .Fatal (err )
4956 }
5057 gsrv := grpc .NewServer ()
5158 managedkafkapb .RegisterManagedKafkaServer (gsrv , server )
59+ managedkafkapb .RegisterManagedKafkaConnectServer (gsrv , connectServer )
5260 fakeServerAddr := listener .Addr ().String ()
5361 go func () {
5462 if err := gsrv .Serve (listener ); err != nil {
@@ -165,3 +173,104 @@ func (f *fakeManagedKafkaServer) UpdateConsumerGroup(ctx context.Context, req *m
165173 Name : consumerGroupID ,
166174 }, nil
167175}
176+
177+ // Connect server methods
178+ func (f * fakeManagedKafkaConnectServer ) CreateConnectCluster (ctx context.Context , req * managedkafkapb.CreateConnectClusterRequest ) (* longrunningpb.Operation , error ) {
179+ anypb := & anypb.Any {}
180+ err := anypb .MarshalFrom (req .ConnectCluster )
181+ if err != nil {
182+ return nil , fmt .Errorf ("anypb.MarshalFrom got err: %w" , err )
183+ }
184+ return & longrunningpb.Operation {
185+ Done : true ,
186+ Result : & longrunningpb.Operation_Response {
187+ Response : anypb ,
188+ },
189+ }, nil
190+ }
191+
192+ func (f * fakeManagedKafkaConnectServer ) DeleteConnectCluster (ctx context.Context , req * managedkafkapb.DeleteConnectClusterRequest ) (* longrunningpb.Operation , error ) {
193+ return & longrunningpb.Operation {
194+ Done : true ,
195+ Result : & longrunningpb.Operation_Response {
196+ Response : & anypb.Any {},
197+ },
198+ }, nil
199+ }
200+
201+ func (f * fakeManagedKafkaConnectServer ) GetConnectCluster (ctx context.Context , req * managedkafkapb.GetConnectClusterRequest ) (* managedkafkapb.ConnectCluster , error ) {
202+ return & managedkafkapb.ConnectCluster {
203+ Name : connectClusterID ,
204+ }, nil
205+ }
206+
207+ func (f * fakeManagedKafkaConnectServer ) ListConnectClusters (ctx context.Context , req * managedkafkapb.ListConnectClustersRequest ) (* managedkafkapb.ListConnectClustersResponse , error ) {
208+ return & managedkafkapb.ListConnectClustersResponse {
209+ ConnectClusters : []* managedkafkapb.ConnectCluster {{
210+ Name : connectClusterID ,
211+ }},
212+ }, nil
213+ }
214+
215+ func (f * fakeManagedKafkaConnectServer ) UpdateConnectCluster (ctx context.Context , req * managedkafkapb.UpdateConnectClusterRequest ) (* longrunningpb.Operation , error ) {
216+ anypb := & anypb.Any {}
217+ err := anypb .MarshalFrom (req .ConnectCluster )
218+ if err != nil {
219+ return nil , fmt .Errorf ("anypb.MarshalFrom got err: %w" , err )
220+ }
221+ return & longrunningpb.Operation {
222+ Done : true ,
223+ Result : & longrunningpb.Operation_Response {
224+ Response : anypb ,
225+ },
226+ }, nil
227+ }
228+
229+ // Connector methods
230+ func (f * fakeManagedKafkaConnectServer ) CreateConnector (ctx context.Context , req * managedkafkapb.CreateConnectorRequest ) (* managedkafkapb.Connector , error ) {
231+ return req .Connector , nil
232+ }
233+
234+ func (f * fakeManagedKafkaConnectServer ) GetConnector (ctx context.Context , req * managedkafkapb.GetConnectorRequest ) (* managedkafkapb.Connector , error ) {
235+ return & managedkafkapb.Connector {
236+ Name : connectorID ,
237+ Configs : map [string ]string {
238+ "connector.class" : "test.connector" ,
239+ },
240+ }, nil
241+ }
242+
243+ func (f * fakeManagedKafkaConnectServer ) ListConnectors (ctx context.Context , req * managedkafkapb.ListConnectorsRequest ) (* managedkafkapb.ListConnectorsResponse , error ) {
244+ return & managedkafkapb.ListConnectorsResponse {
245+ Connectors : []* managedkafkapb.Connector {{
246+ Name : connectorID ,
247+ Configs : map [string ]string {
248+ "connector.class" : "test.connector" ,
249+ },
250+ }},
251+ }, nil
252+ }
253+
254+ func (f * fakeManagedKafkaConnectServer ) UpdateConnector (ctx context.Context , req * managedkafkapb.UpdateConnectorRequest ) (* managedkafkapb.Connector , error ) {
255+ return req .Connector , nil
256+ }
257+
258+ func (f * fakeManagedKafkaConnectServer ) DeleteConnector (ctx context.Context , req * managedkafkapb.DeleteConnectorRequest ) (* emptypb.Empty , error ) {
259+ return & emptypb.Empty {}, nil
260+ }
261+
262+ func (f * fakeManagedKafkaConnectServer ) PauseConnector (ctx context.Context , req * managedkafkapb.PauseConnectorRequest ) (* managedkafkapb.PauseConnectorResponse , error ) {
263+ return & managedkafkapb.PauseConnectorResponse {}, nil
264+ }
265+
266+ func (f * fakeManagedKafkaConnectServer ) ResumeConnector (ctx context.Context , req * managedkafkapb.ResumeConnectorRequest ) (* managedkafkapb.ResumeConnectorResponse , error ) {
267+ return & managedkafkapb.ResumeConnectorResponse {}, nil
268+ }
269+
270+ func (f * fakeManagedKafkaConnectServer ) StopConnector (ctx context.Context , req * managedkafkapb.StopConnectorRequest ) (* managedkafkapb.StopConnectorResponse , error ) {
271+ return & managedkafkapb.StopConnectorResponse {}, nil
272+ }
273+
274+ func (f * fakeManagedKafkaConnectServer ) RestartConnector (ctx context.Context , req * managedkafkapb.RestartConnectorRequest ) (* managedkafkapb.RestartConnectorResponse , error ) {
275+ return & managedkafkapb.RestartConnectorResponse {}, nil
276+ }
0 commit comments