From 17d8074d532d039958263d6d781d93ed722513e5 Mon Sep 17 00:00:00 2001 From: Koopa Date: Fri, 21 Mar 2025 13:42:09 +0800 Subject: [PATCH 1/6] feat(server): Add metadata to registry nodes in entrypoint implementations Fixes go-orb/go-orb#21 --- client/orb/client.go | 2 +- server/drpc/drpc.go | 7 ++++++- server/grpc/grpc.go | 7 ++++++- server/http/entrypoint.go | 7 ++++++- server/memory/memory.go | 5 +++++ 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/client/orb/client.go b/client/orb/client.go index d6b99bd9..99bd6c27 100644 --- a/client/orb/client.go +++ b/client/orb/client.go @@ -119,7 +119,7 @@ func (c *Client) selectNode(ctx context.Context, service string, opts *client.Ca } // Run the configured Selector to get a node from the resolved nodes. - node, err := opts.Selector(ctx, service, nodes, opts.PreferredTransports, opts.AnyTransport) + node, err := opts.Selector(ctx, service, nodes, opts.PreferredTransports, opts.AnyTransport, opts.Metadata) if err != nil { c.Logger().Error("Failed to resolve service", "error", err, "service", service) return "", "", err diff --git a/server/drpc/drpc.go b/server/drpc/drpc.go index 5eba7ba2..8fbbb7c0 100644 --- a/server/drpc/drpc.go +++ b/server/drpc/drpc.go @@ -139,6 +139,11 @@ func (s *Server) Transport() string { return "drpc" } +// Metadata returns the metadata of this entrypoint. +func (s *Server) Metadata() map[string]string { + return s.config.Metadata +} + // EntrypointID returns the id of this entrypoint (node) in the registry. func (s *Server) EntrypointID() string { return s.registry.ServiceName() + types.DefaultSeparator + s.config.Name @@ -189,7 +194,7 @@ func (s *Server) registryService() *registry.Service { ID: s.EntrypointID(), Address: s.Address(), Transport: s.Transport(), - Metadata: make(map[string]string), + Metadata: s.Metadata(), } return ®istry.Service{ diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index fc001e42..771597bd 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -249,6 +249,11 @@ func (s *Server) Transport() string { return "grpc" } +// Metadata returns the metadata of this entrypoint. +func (s *Server) Metadata() map[string]string { + return s.config.Metadata +} + // EntrypointID returns the id of this entrypoint (node) in the registry. func (s *Server) EntrypointID() string { return s.registry.ServiceName() + types.DefaultSeparator + s.config.Name @@ -339,7 +344,7 @@ func (s *Server) registryService() *registry.Service { ID: s.EntrypointID(), Address: s.Address(), Transport: s.Transport(), - Metadata: make(map[string]string), + Metadata: s.Metadata(), } eps := s.getEndpoints() diff --git a/server/http/entrypoint.go b/server/http/entrypoint.go index f582ee39..8e18f56c 100644 --- a/server/http/entrypoint.go +++ b/server/http/entrypoint.go @@ -309,6 +309,11 @@ func (s *Server) Transport() string { return "http" } +// Metadata returns the metadata of this entrypoint. +func (s *Server) Metadata() map[string]string { + return s.config.Metadata +} + // EntrypointID returns the id (configured name) of this entrypoint in the registry. func (s *Server) EntrypointID() string { return s.config.Name @@ -384,7 +389,7 @@ func (s *Server) registryService() (*registry.Service, error) { ID: s.EntrypointID(), Address: s.Address(), Transport: s.Transport(), - Metadata: make(map[string]string), + Metadata: s.Metadata(), } eps, err := s.getEndpoints() diff --git a/server/memory/memory.go b/server/memory/memory.go index 5c3b92e0..94cc699b 100644 --- a/server/memory/memory.go +++ b/server/memory/memory.go @@ -131,6 +131,11 @@ func (s *Server) EntrypointID() string { return s.config.Name } +// Metadata returns the metadata of this entrypoint. +func (s *Server) Metadata() map[string]string { + return s.config.Metadata +} + // String returns the entrypoint type. func (s *Server) String() string { return s.Type() From 5d89427d102df2e940a2b042549c2f577fc22b91 Mon Sep 17 00:00:00 2001 From: Koopa Date: Fri, 21 Mar 2025 15:47:12 +0800 Subject: [PATCH 2/6] test: A complete end-to-end test, with for example a "region --- client/orb_transport/drpc/drpc_test.go | 23 ++++--- client/orb_transport/grpc/grpc_test.go | 9 ++- client/orb_transport/http/http_test.go | 4 +- client/tests/tests.go | 84 ++++++++++++++++++++++++-- 4 files changed, 104 insertions(+), 16 deletions(-) diff --git a/client/orb_transport/drpc/drpc_test.go b/client/orb_transport/drpc/drpc_test.go index 6dab7d33..04004e7d 100644 --- a/client/orb_transport/drpc/drpc_test.go +++ b/client/orb_transport/drpc/drpc_test.go @@ -5,13 +5,13 @@ import ( "os" "testing" + "github.com/stretchr/testify/suite" + "github.com/go-orb/go-orb/log" "github.com/go-orb/go-orb/registry" "github.com/go-orb/go-orb/server" "github.com/go-orb/go-orb/types" "github.com/go-orb/plugins/client/tests" - "github.com/stretchr/testify/suite" - "github.com/go-orb/plugins/server/drpc" echohandler "github.com/go-orb/plugins/client/tests/handler/echo" @@ -28,24 +28,20 @@ import ( _ "github.com/go-orb/plugins/log/slog" ) -func setupServer(sn string) (*tests.SetupData, error) { +func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) { ctx, cancel := context.WithCancel(context.Background()) - setupData := &tests.SetupData{} - sv := "" logger, err := log.New() if err != nil { cancel() - return nil, err } reg, err := registry.New(sn, sv, nil, &types.Components{}, logger) if err != nil { cancel() - return nil, err } @@ -55,10 +51,19 @@ func setupServer(sn string) (*tests.SetupData, error) { fileHInstance := new(filehandler.Handler) fileHRegister := fileproto.RegisterFileServiceHandler(fileHInstance) - ep, err := drpc.New(drpc.NewConfig(drpc.WithHandlers(echoHRegister, fileHRegister)), logger, reg) + // 建立配置選項 + options := []server.Option{ + drpc.WithHandlers(echoHRegister, fileHRegister), + } + + // 如果提供了 metadata,添加到選項中 + if metadata != nil { + options = append(options, server.WithEntrypointMetadata(metadata)) + } + + ep, err := drpc.New(drpc.NewConfig(options...), logger, reg) if err != nil { cancel() - return nil, err } diff --git a/client/orb_transport/grpc/grpc_test.go b/client/orb_transport/grpc/grpc_test.go index 66425bc0..386ac0d5 100644 --- a/client/orb_transport/grpc/grpc_test.go +++ b/client/orb_transport/grpc/grpc_test.go @@ -28,7 +28,7 @@ import ( _ "github.com/go-orb/plugins/log/slog" ) -func setupServer(sn string) (*tests.SetupData, error) { +func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) { ctx, cancel := context.WithCancel(context.Background()) setupData := &tests.SetupData{} @@ -59,6 +59,7 @@ func setupServer(sn string) (*tests.SetupData, error) { grpc.NewConfig(server.WithEntrypointName("grpc"), grpc.WithHandlers(hRegister, fileHRegister), grpc.WithInsecure(), + server.WithEntrypointMetadata(metadata), ), logger, reg) if err != nil { cancel() @@ -66,7 +67,11 @@ func setupServer(sn string) (*tests.SetupData, error) { return nil, err } - ep2, err := grpc.New(grpc.NewConfig(server.WithEntrypointName("grpcs"), grpc.WithHandlers(hRegister, fileHRegister)), logger, reg) + ep2, err := grpc.New( + grpc.NewConfig(server.WithEntrypointName("grpcs"), + grpc.WithHandlers(hRegister, fileHRegister), + server.WithEntrypointMetadata(metadata), + ), logger, reg) if err != nil { cancel() diff --git a/client/orb_transport/http/http_test.go b/client/orb_transport/http/http_test.go index 8a11c388..81d37fa1 100644 --- a/client/orb_transport/http/http_test.go +++ b/client/orb_transport/http/http_test.go @@ -25,7 +25,7 @@ import ( _ "github.com/go-orb/plugins/log/slog" ) -func setupServer(sn string) (*tests.SetupData, error) { +func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) { ctx, cancel := context.WithCancel(context.Background()) setupData := &tests.SetupData{} @@ -54,6 +54,7 @@ func setupServer(sn string) (*tests.SetupData, error) { server.WithEntrypointName("http"), http.WithHandlers(hRegister), http.WithInsecure(), + server.WithEntrypointMetadata(metadata), ), logger, reg, @@ -70,6 +71,7 @@ func setupServer(sn string) (*tests.SetupData, error) { http.WithHandlers(hRegister), http.WithInsecure(), http.WithAllowH2C(), + server.WithEntrypointMetadata(metadata), ), logger, reg, diff --git a/client/tests/tests.go b/client/tests/tests.go index 9bc1046d..2b419586 100644 --- a/client/tests/tests.go +++ b/client/tests/tests.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "errors" "fmt" + "time" "github.com/go-orb/go-orb/client" "github.com/go-orb/go-orb/codecs" @@ -133,7 +134,7 @@ type TestSuite struct { entrypoints []server.Entrypoint ctx context.Context - setupServer func(service string) (*SetupData, error) + setupServer func(service string, metadata map[string]string) (*SetupData, error) stopServer context.CancelFunc // To create more clients in Benchmarks. @@ -141,7 +142,8 @@ type TestSuite struct { } // NewSuite creates a new test suite. -func NewSuite(setupServer func(service string) (*SetupData, error), transports []string, requests ...TestRequest) *TestSuite { +func NewSuite(setupServer func(service string, metadata map[string]string) (*SetupData, error), + transports []string, requests ...TestRequest) *TestSuite { s := new(TestSuite) s.Transports = transports @@ -174,7 +176,7 @@ type TestRequest struct { func (s *TestSuite) SetupSuite() { var err error - setupData, err := s.setupServer(ServiceName) + setupData, err := s.setupServer(ServiceName, nil) if err != nil { s.Require().NoError(err, "while setting up the server") } @@ -218,7 +220,7 @@ func (s *TestSuite) TearDownSuite() { } func (s *TestSuite) doRequest(ctx context.Context, req *TestRequest, clientWire client.Type, transport string) { - opts := []client.CallOption{} + var opts []client.CallOption if req.ContentType != "" { opts = append(opts, client.WithContentType(req.ContentType)) } @@ -318,6 +320,80 @@ func (s *TestSuite) TestMetadata() { } } +// TestMetadataFilter tests if metadata gets filtered. +func (s *TestSuite) TestMetadataFilter() { + regions := []string{"as-1", "eu-1", "us-1"} + + setupDatas := make([]*SetupData, 0, len(regions)) + clientTypes := make([]client.Type, 0, len(regions)) + + defer func() { + for _, cli := range clientTypes { + if stopErr := cli.Stop(context.Background()); stopErr != nil { + s.T().Logf("Error stopping client: %v", stopErr) + } + } + + for _, setup := range setupDatas { + setup.Stop() + } + }() + + for _, region := range regions { + serverName := "metadata-server-" + region + setupData, err := s.setupServer(serverName, map[string]string{"region": region}) + s.Require().NoError(err, "Server setup failed for region "+region) + + s.Require().NoError(setupData.Registry.Start(setupData.Ctx), + "Registry start failed for region "+region) + + for _, ep := range setupData.Entrypoints { + s.Require().NoError(ep.Start(setupData.Ctx), + "Entrypoint start failed for region "+region) + } + + setupDatas = append(setupDatas, setupData) + + cli, err := client.New(nil, &types.Components{}, setupData.Logger, setupData.Registry) + s.Require().NoError(err, "Client creation failed for region "+region) + + s.Require().NoError(cli.Start(setupData.Ctx), + "Client start failed for region "+region) + + clientTypes = append(clientTypes, cli) + } + + time.Sleep(time.Second) + + for i, region := range regions { + serverName := "metadata-server-" + region + echoClient := echo.NewStreamsClient(clientTypes[i]) + + s.Run("Matching region "+region, func() { + resp, err := echoClient.Call( + context.Background(), + serverName, + &echo.CallRequest{Name: "test"}, + client.WithMetadata(map[string]string{"region": region}), + ) + + s.Require().NoError(err, "Request with matching region should succeed") + s.Require().Equal("Hello test", resp.GetMsg(), "Unexpected response message") + }) + + s.Run("Non-matching region for "+region, func() { + _, err := echoClient.Call( + context.Background(), + serverName, + &echo.CallRequest{Name: "test"}, + client.WithMetadata(map[string]string{"region": "wrong-region"}), + ) + + s.Require().Error(err, "Request with non-matching region should fail") + }) + } +} + // TestFileUpload tests the client streaming functionality for file uploads. func (s *TestSuite) TestFileUpload() { // Create a file service client From 588615cb9d3745fc02415d452b900fb729f6dcd3 Mon Sep 17 00:00:00 2001 From: Koopa Date: Fri, 21 Mar 2025 23:30:55 +0800 Subject: [PATCH 3/6] feat(tests): Use common server name for metadata filtering tests Fixes go-orb/go-orb#21 --- client/orb_transport/drpc/drpc_test.go | 3 --- client/tests/tests.go | 15 ++++++++------- registry/tests/tests.go | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/client/orb_transport/drpc/drpc_test.go b/client/orb_transport/drpc/drpc_test.go index 04004e7d..ec6c4f5e 100644 --- a/client/orb_transport/drpc/drpc_test.go +++ b/client/orb_transport/drpc/drpc_test.go @@ -51,12 +51,9 @@ func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error fileHInstance := new(filehandler.Handler) fileHRegister := fileproto.RegisterFileServiceHandler(fileHInstance) - // 建立配置選項 options := []server.Option{ drpc.WithHandlers(echoHRegister, fileHRegister), } - - // 如果提供了 metadata,添加到選項中 if metadata != nil { options = append(options, server.WithEntrypointMetadata(metadata)) } diff --git a/client/tests/tests.go b/client/tests/tests.go index 2b419586..82c5c5d8 100644 --- a/client/tests/tests.go +++ b/client/tests/tests.go @@ -324,6 +324,8 @@ func (s *TestSuite) TestMetadata() { func (s *TestSuite) TestMetadataFilter() { regions := []string{"as-1", "eu-1", "us-1"} + const commonServerName = "metadata-server" + setupDatas := make([]*SetupData, 0, len(regions)) clientTypes := make([]client.Type, 0, len(regions)) @@ -340,8 +342,7 @@ func (s *TestSuite) TestMetadataFilter() { }() for _, region := range regions { - serverName := "metadata-server-" + region - setupData, err := s.setupServer(serverName, map[string]string{"region": region}) + setupData, err := s.setupServer(commonServerName, map[string]string{"region": region}) s.Require().NoError(err, "Server setup failed for region "+region) s.Require().NoError(setupData.Registry.Start(setupData.Ctx), @@ -365,14 +366,14 @@ func (s *TestSuite) TestMetadataFilter() { time.Sleep(time.Second) - for i, region := range regions { - serverName := "metadata-server-" + region - echoClient := echo.NewStreamsClient(clientTypes[i]) + mainClient := clientTypes[0] + echoClient := echo.NewStreamsClient(mainClient) + for _, region := range regions { s.Run("Matching region "+region, func() { resp, err := echoClient.Call( context.Background(), - serverName, + commonServerName, &echo.CallRequest{Name: "test"}, client.WithMetadata(map[string]string{"region": region}), ) @@ -384,7 +385,7 @@ func (s *TestSuite) TestMetadataFilter() { s.Run("Non-matching region for "+region, func() { _, err := echoClient.Call( context.Background(), - serverName, + commonServerName, &echo.CallRequest{Name: "test"}, client.WithMetadata(map[string]string{"region": "wrong-region"}), ) diff --git a/registry/tests/tests.go b/registry/tests/tests.go index e43fdf68..d3e01529 100644 --- a/registry/tests/tests.go +++ b/registry/tests/tests.go @@ -793,7 +793,7 @@ func (r *TestSuite) TestMetadataFiltering() { r.Require().NoError(err) // Manually filter for production services - prodServices := []*registry.Service{} + var prodServices []*registry.Service for _, svc := range allServices { // Skip non-test services (those not starting with orb.test.filter) From 04cf619ab85eca3fea45103ceaaa89ac7cb358e1 Mon Sep 17 00:00:00 2001 From: Koopa Date: Sat, 22 Mar 2025 15:17:50 +0800 Subject: [PATCH 4/6] git commit -m "feat(tests): Use common server name for metadata filtering tests Fixes go-orb/go-orb#21" --- client/orb_transport/drpc/drpc_test.go | 3 --- client/tests/tests.go | 29 +++++++------------------- registry/tests/tests.go | 2 +- 3 files changed, 8 insertions(+), 26 deletions(-) diff --git a/client/orb_transport/drpc/drpc_test.go b/client/orb_transport/drpc/drpc_test.go index 04004e7d..ec6c4f5e 100644 --- a/client/orb_transport/drpc/drpc_test.go +++ b/client/orb_transport/drpc/drpc_test.go @@ -51,12 +51,9 @@ func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error fileHInstance := new(filehandler.Handler) fileHRegister := fileproto.RegisterFileServiceHandler(fileHInstance) - // 建立配置選項 options := []server.Option{ drpc.WithHandlers(echoHRegister, fileHRegister), } - - // 如果提供了 metadata,添加到選項中 if metadata != nil { options = append(options, server.WithEntrypointMetadata(metadata)) } diff --git a/client/tests/tests.go b/client/tests/tests.go index 2b419586..bb219b9a 100644 --- a/client/tests/tests.go +++ b/client/tests/tests.go @@ -324,24 +324,18 @@ func (s *TestSuite) TestMetadata() { func (s *TestSuite) TestMetadataFilter() { regions := []string{"as-1", "eu-1", "us-1"} + const commonServerName = "metadata-server" + setupDatas := make([]*SetupData, 0, len(regions)) - clientTypes := make([]client.Type, 0, len(regions)) defer func() { - for _, cli := range clientTypes { - if stopErr := cli.Stop(context.Background()); stopErr != nil { - s.T().Logf("Error stopping client: %v", stopErr) - } - } - for _, setup := range setupDatas { setup.Stop() } }() for _, region := range regions { - serverName := "metadata-server-" + region - setupData, err := s.setupServer(serverName, map[string]string{"region": region}) + setupData, err := s.setupServer(commonServerName, map[string]string{"region": region}) s.Require().NoError(err, "Server setup failed for region "+region) s.Require().NoError(setupData.Registry.Start(setupData.Ctx), @@ -353,26 +347,17 @@ func (s *TestSuite) TestMetadataFilter() { } setupDatas = append(setupDatas, setupData) - - cli, err := client.New(nil, &types.Components{}, setupData.Logger, setupData.Registry) - s.Require().NoError(err, "Client creation failed for region "+region) - - s.Require().NoError(cli.Start(setupData.Ctx), - "Client start failed for region "+region) - - clientTypes = append(clientTypes, cli) } time.Sleep(time.Second) - for i, region := range regions { - serverName := "metadata-server-" + region - echoClient := echo.NewStreamsClient(clientTypes[i]) + echoClient := echo.NewStreamsClient(s.client) + for _, region := range regions { s.Run("Matching region "+region, func() { resp, err := echoClient.Call( context.Background(), - serverName, + commonServerName, &echo.CallRequest{Name: "test"}, client.WithMetadata(map[string]string{"region": region}), ) @@ -384,7 +369,7 @@ func (s *TestSuite) TestMetadataFilter() { s.Run("Non-matching region for "+region, func() { _, err := echoClient.Call( context.Background(), - serverName, + commonServerName, &echo.CallRequest{Name: "test"}, client.WithMetadata(map[string]string{"region": "wrong-region"}), ) diff --git a/registry/tests/tests.go b/registry/tests/tests.go index e43fdf68..d3e01529 100644 --- a/registry/tests/tests.go +++ b/registry/tests/tests.go @@ -793,7 +793,7 @@ func (r *TestSuite) TestMetadataFiltering() { r.Require().NoError(err) // Manually filter for production services - prodServices := []*registry.Service{} + var prodServices []*registry.Service for _, svc := range allServices { // Skip non-test services (those not starting with orb.test.filter) From 86b7fba031f6a016aa10bb41a35e825c41307817 Mon Sep 17 00:00:00 2001 From: Koopa Date: Fri, 21 Mar 2025 23:30:55 +0800 Subject: [PATCH 5/6] feat(tests): Use common server name for metadata filtering tests Fixes go-orb/go-orb#21 # Conflicts: # client/tests/tests.go --- client/tests/tests.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/client/tests/tests.go b/client/tests/tests.go index bb219b9a..82c5c5d8 100644 --- a/client/tests/tests.go +++ b/client/tests/tests.go @@ -327,8 +327,15 @@ func (s *TestSuite) TestMetadataFilter() { const commonServerName = "metadata-server" setupDatas := make([]*SetupData, 0, len(regions)) + clientTypes := make([]client.Type, 0, len(regions)) defer func() { + for _, cli := range clientTypes { + if stopErr := cli.Stop(context.Background()); stopErr != nil { + s.T().Logf("Error stopping client: %v", stopErr) + } + } + for _, setup := range setupDatas { setup.Stop() } @@ -347,11 +354,20 @@ func (s *TestSuite) TestMetadataFilter() { } setupDatas = append(setupDatas, setupData) + + cli, err := client.New(nil, &types.Components{}, setupData.Logger, setupData.Registry) + s.Require().NoError(err, "Client creation failed for region "+region) + + s.Require().NoError(cli.Start(setupData.Ctx), + "Client start failed for region "+region) + + clientTypes = append(clientTypes, cli) } time.Sleep(time.Second) - echoClient := echo.NewStreamsClient(s.client) + mainClient := clientTypes[0] + echoClient := echo.NewStreamsClient(mainClient) for _, region := range regions { s.Run("Matching region "+region, func() { From 12415a0b193505a857ba107ba7a05ca0c914f3b6 Mon Sep 17 00:00:00 2001 From: Koopa Date: Sat, 22 Mar 2025 17:01:24 +0800 Subject: [PATCH 6/6] test(client): Update TestMetadataFilter to use WithRegistryMetadata Update test cases to validate the new WithRegistryMetadata functionality replacing previous WithMetadata implementation. This ensures proper testing of the metadata-based server selection feature. --- client/tests/tests.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/tests/tests.go b/client/tests/tests.go index 82c5c5d8..f86db4e6 100644 --- a/client/tests/tests.go +++ b/client/tests/tests.go @@ -375,7 +375,7 @@ func (s *TestSuite) TestMetadataFilter() { context.Background(), commonServerName, &echo.CallRequest{Name: "test"}, - client.WithMetadata(map[string]string{"region": region}), + client.WithRegistryMetadata("region", region), ) s.Require().NoError(err, "Request with matching region should succeed") @@ -387,7 +387,7 @@ func (s *TestSuite) TestMetadataFilter() { context.Background(), commonServerName, &echo.CallRequest{Name: "test"}, - client.WithMetadata(map[string]string{"region": "wrong-region"}), + client.WithRegistryMetadata("region", "wrong-region"), ) s.Require().Error(err, "Request with non-matching region should fail")