diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 1212854e..41c803cc 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -13,6 +13,7 @@ import ( "go.uber.org/fx" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/proto/compat" "github.com/temporalio/s2s-proxy/proxy" ) @@ -95,6 +96,7 @@ func startProxy(c *cli.Context) error { fx.Provide(func() log.Logger { return log.NewZapLogger(log.BuildZapLogger(logCfg)) }), + logging.Module, config.Module, proxy.Module, fx.Populate(&proxyParams), diff --git a/config/config.go b/config/config.go index 3f2b1eb7..f0bbc98e 100644 --- a/config/config.go +++ b/config/config.go @@ -158,13 +158,14 @@ type ( // TODO: Soon to be deprecated! Create an item in ClusterConnections instead ShardCountConfig ShardCountConfig `yaml:"shardCount"` // TODO: Soon to be deprecated! Create an item in ClusterConnections instead - MemberlistConfig *MemberlistConfig `yaml:"memberlist"` - NamespaceNameTranslation NameTranslationConfig `yaml:"namespaceNameTranslation"` - SearchAttributeTranslation SATranslationConfig `yaml:"searchAttributeTranslation"` - Metrics *MetricsConfig `yaml:"metrics"` - ProfilingConfig ProfilingConfig `yaml:"profiling"` - Logging LoggingConfig `yaml:"logging"` - ClusterConnections []ClusterConnConfig `yaml:"clusterConnections"` + MemberlistConfig *MemberlistConfig `yaml:"memberlist"` + NamespaceNameTranslation NameTranslationConfig `yaml:"namespaceNameTranslation"` + SearchAttributeTranslation SATranslationConfig `yaml:"searchAttributeTranslation"` + Metrics *MetricsConfig `yaml:"metrics"` + ProfilingConfig ProfilingConfig `yaml:"profiling"` + Logging LoggingConfig `yaml:"logging"` + LogConfigs map[string]LoggingConfig `yaml:"logConfigs"` + ClusterConnections []ClusterConnConfig `yaml:"clusterConnections"` } SATranslationConfig struct { @@ -221,6 +222,7 @@ type ( LoggingConfig struct { ThrottleMaxRPS float64 `yaml:"throttleMaxRPS"` + Disabled bool `yaml:"disabled"` } MemberlistConfig struct { diff --git a/config/converter.go b/config/converter.go index fb09ce06..d30e81c4 100644 --- a/config/converter.go +++ b/config/converter.go @@ -48,6 +48,7 @@ func ToClusterConnConfig(config S2SProxyConfig) S2SProxyConfig { Metrics: config.Metrics, ProfilingConfig: config.ProfilingConfig, Logging: config.Logging, + LogConfigs: config.LogConfigs, } } diff --git a/config/new_config_test.go b/config/new_config_test.go index 1db59702..bb489729 100644 --- a/config/new_config_test.go +++ b/config/new_config_test.go @@ -68,6 +68,11 @@ func TestConversionWithTLS(t *testing.T) { proxyConfig, err := LoadConfig[S2SProxyConfig](samplePath) require.NoError(t, err) converted := ToClusterConnConfig(proxyConfig) + require.Equal(t, 0.1, converted.LogConfigs["adminservice"].ThrottleMaxRPS) + require.Equal(t, float64(11), converted.LogConfigs["testexample"].ThrottleMaxRPS) + require.Equal(t, 0.12, converted.LogConfigs["adminstreams"].ThrottleMaxRPS) + require.Equal(t, false, converted.LogConfigs["adminstreams"].Disabled) + require.Equal(t, true, converted.LogConfigs["testdisabled"].Disabled) require.Equal(t, 1, len(converted.ClusterConnections)) require.Nil(t, converted.Inbound) require.Nil(t, converted.Outbound) diff --git a/develop/old-config-with-TLS.yaml b/develop/old-config-with-TLS.yaml index 1eadb4bb..37210b77 100644 --- a/develop/old-config-with-TLS.yaml +++ b/develop/old-config-with-TLS.yaml @@ -72,4 +72,13 @@ metrics: prometheus: listenAddress: "0.0.0.0:9090" # This enables profiling with the default config and port -profiling: {} \ No newline at end of file +profiling: {} +logConfigs: + adminservice: + throttleMaxRPS: 0.10 + testexample: + throttleMaxRPS: 11 + adminstreams: + throttleMaxRPS: 0.12 + testdisabled: + disabled: true \ No newline at end of file diff --git a/endtoendtest/echo_server.go b/endtoendtest/echo_server.go index 22e9504f..447fe7c0 100644 --- a/endtoendtest/echo_server.go +++ b/endtoendtest/echo_server.go @@ -22,6 +22,7 @@ import ( "github.com/temporalio/s2s-proxy/config" "github.com/temporalio/s2s-proxy/encryption" "github.com/temporalio/s2s-proxy/endtoendtest/testservices" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" s2sproxy "github.com/temporalio/s2s-proxy/proxy" "github.com/temporalio/s2s-proxy/transport/grpcutil" @@ -115,7 +116,7 @@ func NewEchoServer( configProvider := config.NewMockConfigProvider(*localClusterInfo.S2sProxyConfig) proxy = s2sproxy.NewProxy( configProvider, - logger, + logging.NewLoggerProvider(logger, configProvider), ) clientConfig = config.ProxyClientConfig{ diff --git a/logging/fx.go b/logging/fx.go new file mode 100644 index 00000000..8b6498d4 --- /dev/null +++ b/logging/fx.go @@ -0,0 +1,7 @@ +package logging + +import "go.uber.org/fx" + +var Module = fx.Provide( + NewLoggerProvider, +) diff --git a/logging/logger_provider.go b/logging/logger_provider.go new file mode 100644 index 00000000..f8cd2f82 --- /dev/null +++ b/logging/logger_provider.go @@ -0,0 +1,80 @@ +package logging + +import ( + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + + "github.com/temporalio/s2s-proxy/config" +) + +const ( + AdminService = "adminService" + WorkflowService = "workflowService" + ReplicationStreams = "replicationStreams" + ShardManager = "shardManager" + ShardRouting = "shardRouting" +) + +type ( + LogComponentName string + // LoggerProvider provides customized loggers for different components. + // Based on the component name, different throttling levels can be applied. + // Right now, any tags stored with the LoggerProvider with With() will be applied to all loggers returned by Get(). + LoggerProvider interface { + // Get returns a logger for the given component. If there is no custom config, the root logger will be returned. + Get(component LogComponentName) log.Logger + // With returns a new logger provider with the given tags added to all loggers. + With(tags ...tag.Tag) LoggerProvider + } + loggerProvider struct { + root log.Logger + loggers map[LogComponentName]log.Logger + tags []tag.Tag + } +) + +var defaultLoggers = map[LogComponentName]config.LoggingConfig{ + AdminService: {ThrottleMaxRPS: 3.0}, + ReplicationStreams: {ThrottleMaxRPS: 3.0 / 60.0}, + ShardManager: {ThrottleMaxRPS: 3.0}, + ShardRouting: {ThrottleMaxRPS: 3.0 / 60.0}, +} + +func NewLoggerProvider(root log.Logger, config config.ConfigProvider) LoggerProvider { + logConfigs := config.GetS2SProxyConfig().LogConfigs + throttledRootLog := log.NewThrottledLogger(root, config.GetS2SProxyConfig().Logging.GetThrottleMaxRPS) + loggersByComponent := make(map[LogComponentName]log.Logger, max(len(logConfigs), len(defaultLoggers))) + for component, defaultConfig := range defaultLoggers { + loggersByComponent[component] = loggerForConfig(throttledRootLog, defaultConfig) + } + for component, logConfig := range logConfigs { + loggersByComponent[LogComponentName(component)] = loggerForConfig(throttledRootLog, logConfig) + } + return &loggerProvider{ + root: root, + loggers: loggersByComponent, + } +} + +func loggerForConfig(logger log.Logger, config config.LoggingConfig) log.Logger { + if config.Disabled { + return log.NewNoopLogger() + } + return log.NewThrottledLogger(logger, config.GetThrottleMaxRPS) +} + +func (l *loggerProvider) Get(component LogComponentName) log.Logger { + logger, exists := l.loggers[component] + if !exists { + logger = l.root + } + return log.With(logger, l.tags...) +} + +func (l *loggerProvider) With(tags ...tag.Tag) LoggerProvider { + return &loggerProvider{ + root: l.root, + loggers: l.loggers, + tags: append(l.tags, tags...), + } +} diff --git a/proxy/adminservice.go b/proxy/adminservice.go index 86d29d8d..c207fb8b 100644 --- a/proxy/adminservice.go +++ b/proxy/adminservice.go @@ -14,6 +14,7 @@ import ( "github.com/temporalio/s2s-proxy/common" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" ) @@ -34,8 +35,7 @@ type ( shardManager ShardManager adminClient adminservice.AdminServiceClient adminClientReverse adminservice.AdminServiceClient - verboseLogger log.Logger - replicationLogger log.Logger + loggers logging.LoggerProvider apiOverrides *config.APIOverridesConfig metricLabelValues []string reportStreamValue func(idx int32, value int32) @@ -57,23 +57,15 @@ func NewAdminServiceProxyServer( shardCountConfig config.ShardCountConfig, lcmParameters LCMParameters, routingParameters RoutingParameters, - logger log.Logger, + logProvider logging.LoggerProvider, shardManager ShardManager, lifetime context.Context, ) adminservice.AdminServiceServer { - // Replication streams / APIs will run many hundreds of times per second. Throttle their output - // to 3 / min - replicationLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)), - func() float64 { return 3.0 / 60.0 }) - // For config operations, allow most logs so that we can see all the info without putting disk at risk - verboseLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)), - func() float64 { return 3.0 }) return &adminServiceProxyServer{ shardManager: shardManager, adminClient: adminClient, adminClientReverse: adminClientReverse, - verboseLogger: verboseLogger, - replicationLogger: replicationLogger, + loggers: logProvider.With(common.ServiceTag(serviceName)), apiOverrides: apiOverrides, metricLabelValues: metricLabelValues, reportStreamValue: reportStreamValue, @@ -85,7 +77,7 @@ func NewAdminServiceProxyServer( } func (s *adminServiceProxyServer) AddOrUpdateRemoteCluster(ctx context.Context, in0 *adminservice.AddOrUpdateRemoteClusterRequest) (resp *adminservice.AddOrUpdateRemoteClusterResponse, err error) { - s.verboseLogger.Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues)) + s.loggers.Get(logging.AdminService).Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues)) if !common.IsRequestTranslationDisabled(ctx) && s.apiOverrides != nil { reqOverride := s.apiOverrides.AdminService.AddOrUpdateRemoteCluster if reqOverride != nil && len(reqOverride.Request.FrontendAddress) > 0 { @@ -94,12 +86,12 @@ func (s *adminServiceProxyServer) AddOrUpdateRemoteCluster(ctx context.Context, // from the local temporal server, or the proxy may be deployed behind a load balancer. // Only used in single-proxy scenarios, i.e. Temporal <> Proxy <> Temporal in0.FrontendAddress = reqOverride.Request.FrontendAddress - s.verboseLogger.Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues)) + s.loggers.Get(logging.AdminService).Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues)) } } resp, err = s.adminClient.AddOrUpdateRemoteCluster(ctx, in0) if err != nil { - s.verboseLogger.Error("Error when adding remote cluster", tag.Error(err), tag.Operation("AddOrUpdateRemoteCluster"), + s.loggers.Get(logging.AdminService).Error("Error when adding remote cluster", tag.Error(err), tag.Operation("AddOrUpdateRemoteCluster"), tag.NewStringTag("FrontendAddress", in0.GetFrontendAddress())) } return @@ -126,16 +118,16 @@ func (s *adminServiceProxyServer) DeleteWorkflowExecution(ctx context.Context, i } func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *adminservice.DescribeClusterRequest) (*adminservice.DescribeClusterResponse, error) { - s.verboseLogger.Info("Received DescribeClusterRequest") + s.loggers.Get(logging.AdminService).Info("Received DescribeClusterRequest") resp, err := s.adminClient.DescribeCluster(ctx, in0) if resp != nil { - s.verboseLogger.Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId), + s.loggers.Get(logging.AdminService).Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId), tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion), tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion), tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues)) } if common.IsRequestTranslationDisabled(ctx) { - s.verboseLogger.Info("Request translation disabled. Returning as-is") + s.loggers.Get(logging.AdminService).Info("Request translation disabled. Returning as-is") return resp, err } @@ -154,19 +146,19 @@ func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *admi responseOverride := s.apiOverrides.AdminService.DescribeCluster.Response if resp != nil && responseOverride.FailoverVersionIncrement != nil { resp.FailoverVersionIncrement = *responseOverride.FailoverVersionIncrement - s.verboseLogger.Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), + s.loggers.Get(logging.AdminService).Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewStringsTag("configTags", s.metricLabelValues)) } } if resp != nil { - s.verboseLogger.Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId), + s.loggers.Get(logging.AdminService).Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId), tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion), tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion), tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues)) } if err != nil { - s.verboseLogger.Info("Got error when calling DescribeCluster!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues)) + s.loggers.Get(logging.AdminService).Info("Got error when calling DescribeCluster!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues)) } return resp, err } @@ -183,7 +175,7 @@ func (s *adminServiceProxyServer) DescribeMutableState(ctx context.Context, in0 resp, err = s.adminClient.DescribeMutableState(ctx, in0) if err != nil { // This is a duplicate of the grpc client metrics, but not everyone has metrics set up - s.replicationLogger.Error("Failed to describe workflow", + s.loggers.Get(logging.ReplicationStreams).Error("Failed to describe workflow", tag.NewStringTag("WorkflowId", in0.GetExecution().GetWorkflowId()), tag.NewStringTag("RunId", in0.GetExecution().GetRunId()), tag.NewStringTag("Namespace", in0.GetNamespace()), @@ -212,7 +204,7 @@ func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Co resp, err = s.adminClient.GetNamespaceReplicationMessages(ctx, in0) if err != nil { // This is a duplicate of the grpc client metrics, but not everyone has metrics set up - s.replicationLogger.Error("Failed to get namespace replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()), + s.loggers.Get(logging.ReplicationStreams).Error("Failed to get namespace replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()), tag.Error(err), tag.Operation("GetNamespaceReplicationMessages")) } return @@ -221,7 +213,7 @@ func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Co func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (resp *adminservice.GetReplicationMessagesResponse, err error) { resp, err = s.adminClient.GetReplicationMessages(ctx, in0) if err != nil { - s.replicationLogger.Error("Failed to get replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()), + s.loggers.Get(logging.ReplicationStreams).Error("Failed to get replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()), tag.Error(err), tag.Operation("GetReplicationMessages")) } return @@ -326,7 +318,7 @@ func ClusterShardIDtoShortString(sd history.ClusterShardID) string { func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages( streamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer, ) (retError error) { - defer log.CapturePanic(s.replicationLogger, &retError) + defer log.CapturePanic(s.loggers.Get(logging.ReplicationStreams), &retError) targetMetadata, ok := metadata.FromIncomingContext(streamServer.Context()) if !ok { @@ -339,7 +331,7 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages( return err } - logger := log.With(s.replicationLogger, + logger := log.With(s.loggers.Get(logging.ReplicationStreams), tag.NewStringTag("source", ClusterShardIDtoString(sourceClusterShardID)), tag.NewStringTag("target", ClusterShardIDtoString(targetClusterShardID))) diff --git a/proxy/adminservice_test.go b/proxy/adminservice_test.go index 655e9260..f1ae41b8 100644 --- a/proxy/adminservice_test.go +++ b/proxy/adminservice_test.go @@ -14,6 +14,7 @@ import ( "github.com/temporalio/s2s-proxy/common" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" clientmock "github.com/temporalio/s2s-proxy/mocks/client" ) @@ -47,7 +48,8 @@ type adminProxyServerInput struct { func (s *adminserviceSuite) newAdminServiceProxyServer(in adminProxyServerInput, observer *ReplicationStreamObserver) adminservice.AdminServiceServer { return NewAdminServiceProxyServer("test-service-name", s.adminClientMock, s.adminClientMock, - in.apiOverrides, in.metricLabels, observer.ReportStreamValue, config.ShardCountConfig{}, LCMParameters{}, RoutingParameters{}, log.NewTestLogger(), nil, context.Background()) + in.apiOverrides, in.metricLabels, observer.ReportStreamValue, config.ShardCountConfig{}, LCMParameters{}, + RoutingParameters{}, logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})), nil, context.Background()) } func (s *adminserviceSuite) TestAddOrUpdateRemoteCluster() { diff --git a/proxy/cluster_connection.go b/proxy/cluster_connection.go index a4f49f34..8feba1d9 100644 --- a/proxy/cluster_connection.go +++ b/proxy/cluster_connection.go @@ -24,11 +24,21 @@ import ( "github.com/temporalio/s2s-proxy/config" "github.com/temporalio/s2s-proxy/encryption" "github.com/temporalio/s2s-proxy/interceptor" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" "github.com/temporalio/s2s-proxy/transport/grpcutil" "github.com/temporalio/s2s-proxy/transport/mux" ) +const ( + LogStreamObserver = "streamObserver" + LogMuxManager = "muxManager" + LogTCPServer = "tcpServer" + LogClusterConnection = "clusterConnection" + LogInterceptor = "interceptor" + LogTLSHandshake = "tlsHandshake" +) + type ( // simpleGRPCServer is a self-sufficient package of listener, server, logger, and lifetime. It can be Started, // which creates a GoRoutine that listens on the provided listener using server.Serve until lifetime closes. @@ -60,7 +70,7 @@ type ( inboundObserver *ReplicationStreamObserver outboundObserver *ReplicationStreamObserver shardManager ShardManager - logger log.Logger + loggers logging.LoggerProvider } // contextAwareServer represents a startable gRPC server used to provide the Temporal interface on some connection. // IsUsable and Describe allow the caller to know and log the current state of the server. @@ -93,7 +103,7 @@ type ( nsTranslations collect.StaticBiMap[string, string] saTranslations config.SearchAttributeTranslation shardCountConfig config.ShardCountConfig - logger log.Logger + loggers logging.LoggerProvider shardManager ShardManager lcmParameters LCMParameters @@ -107,12 +117,12 @@ func sanitizeConnectionName(name string) string { } // NewClusterConnection unpacks the connConfig and creates the inbound and outbound clients and servers. -func NewClusterConnection(lifetime context.Context, connConfig config.ClusterConnConfig, logger log.Logger) (*ClusterConnection, error) { +func NewClusterConnection(lifetime context.Context, connConfig config.ClusterConnConfig, logProvider logging.LoggerProvider) (*ClusterConnection, error) { // The name is used in metrics and in the protocol for identifying the multi-client-conn. Sanitize it or else grpc.Dial will be very unhappy. sanitizedConnectionName := sanitizeConnectionName(connConfig.Name) cc := &ClusterConnection{ lifetime: lifetime, - logger: log.With(logger, tag.NewStringTag("clusterConn", sanitizedConnectionName)), + loggers: logProvider.With(tag.NewStringTag("clusterConn", sanitizedConnectionName)), } var err error cc.inboundClient, err = createClient(lifetime, sanitizedConnectionName, connConfig.LocalServer.Connection, "inbound") @@ -132,7 +142,7 @@ func NewClusterConnection(lifetime context.Context, connConfig config.ClusterCon return nil, err } - cc.shardManager = NewShardManager(connConfig.MemberlistConfig, connConfig.ShardCountConfig, connConfig.LocalServer.Connection.TcpClient.TLSConfig, logger) + cc.shardManager = NewShardManager(connConfig.MemberlistConfig, connConfig.ShardCountConfig, connConfig.LocalServer.Connection.TcpClient.TLSConfig, logProvider) getLCMParameters := func(shardCountConfig config.ShardCountConfig, inverse bool) LCMParameters { if shardCountConfig.Mode != config.ShardCountLCM { @@ -177,7 +187,7 @@ func NewClusterConnection(lifetime context.Context, connConfig config.ClusterCon nsTranslations: nsTranslations.Inverse(), saTranslations: saTranslations.Inverse(), shardCountConfig: connConfig.ShardCountConfig, - logger: cc.logger, + loggers: cc.loggers, shardManager: cc.shardManager, lcmParameters: getLCMParameters(connConfig.ShardCountConfig, true), routingParameters: getRoutingParameters(connConfig.ShardCountConfig, true, "inbound"), @@ -195,7 +205,7 @@ func NewClusterConnection(lifetime context.Context, connConfig config.ClusterCon nsTranslations: nsTranslations, saTranslations: saTranslations, shardCountConfig: connConfig.ShardCountConfig, - logger: cc.logger, + loggers: cc.loggers, shardManager: cc.shardManager, lcmParameters: getLCMParameters(connConfig.ShardCountConfig, false), routingParameters: getRoutingParameters(connConfig.ShardCountConfig, false, "outbound"), @@ -226,13 +236,14 @@ func createServer(lifetime context.Context, c serverConfiguration) (contextAware // No special logic required for managedClient return createTCPServer(lifetime, c) case config.ConnTypeMuxClient, config.ConnTypeMuxServer: - observer := NewReplicationStreamObserver(c.logger) + observer := NewReplicationStreamObserver(c.loggers.Get(LogStreamObserver)) grpcServer, err := buildProxyServer(c, c.clusterDefinition.Connection.MuxAddressInfo.TLSConfig, observer.ReportStreamValue, lifetime) if err != nil { return nil, nil, err } // The Mux manager needs to update its associated client - muxMgr, err := mux.NewGRPCMuxManager(lifetime, c.name, c.clusterDefinition, c.managedClient.(*grpcutil.MultiClientConn), grpcServer, c.logger) + muxMgr, err := mux.NewGRPCMuxManager(lifetime, c.name, c.clusterDefinition, + c.managedClient.(*grpcutil.MultiClientConn), grpcServer, c.loggers.Get(LogMuxManager)) if err != nil { return nil, nil, err } @@ -243,7 +254,7 @@ func createServer(lifetime context.Context, c serverConfiguration) (contextAware } func createTCPServer(lifetime context.Context, c serverConfiguration) (contextAwareServer, *ReplicationStreamObserver, error) { - observer := NewReplicationStreamObserver(c.logger) + observer := NewReplicationStreamObserver(c.loggers.Get(LogStreamObserver)) listener, err := net.Listen("tcp", c.clusterDefinition.Connection.TcpServer.ConnectionString) if err != nil { return nil, nil, fmt.Errorf("invalid configuration for inbound server: %w", err) @@ -257,7 +268,7 @@ func createTCPServer(lifetime context.Context, c serverConfiguration) (contextAw lifetime: lifetime, listener: listener, server: grpcServer, - logger: c.logger, + logger: log.With(c.loggers.Get(LogTCPServer), tag.NewStringTag("direction", c.directionLabel), tag.Address(listener.Addr().String())), } return server, observer, nil } @@ -288,7 +299,7 @@ func (c *ClusterConnection) Start() { if c.shardManager != nil { err := c.shardManager.Start(c.lifetime) if err != nil { - c.logger.Error("Failed to start shard manager", tag.Error(err)) + c.loggers.Get(LogClusterConnection).Error("Failed to start shard manager", tag.Error(err)) } } c.inboundServer.Start() @@ -329,7 +340,7 @@ func buildProxyServer(c serverConfiguration, tlsConfig encryption.TLSConfig, obs c.shardCountConfig, c.lcmParameters, c.routingParameters, - c.logger, + c.loggers, c.shardManager, lifetime, ) @@ -338,7 +349,7 @@ func buildProxyServer(c serverConfiguration, tlsConfig encryption.TLSConfig, obs accessControl = auth.NewAccesControl(c.clusterDefinition.ACLPolicy.AllowedNamespaces) } workflowServiceImpl := NewWorkflowServiceProxyServer("inboundWorkflowService", workflowservice.NewWorkflowServiceClient(c.client), - accessControl, c.logger) + accessControl, c.loggers) adminservice.RegisterAdminServiceServer(server, adminServiceImpl) workflowservice.RegisterWorkflowServiceServer(server, workflowServiceImpl) return server, nil @@ -357,25 +368,28 @@ func makeServerOptions(c serverConfiguration, tlsConfig encryption.TLSConfig) ([ var translators []interceptor.Translator if c.nsTranslations.Len() > 0 { - translators = append(translators, interceptor.NewNamespaceNameTranslator(c.logger, c.nsTranslations.AsMap(), c.nsTranslations.Inverse().AsMap())) + translators = append(translators, interceptor.NewNamespaceNameTranslator(c.loggers.Get(LogInterceptor), + c.nsTranslations.AsMap(), c.nsTranslations.Inverse().AsMap())) } if c.saTranslations.LenNamespaces() > 0 { - c.logger.Info("search attribute translation enabled", tag.NewAnyTag("mappings", c.saTranslations)) + c.loggers.Get(LogClusterConnection).Info("search attribute translation enabled", tag.NewAnyTag("mappings", c.saTranslations)) if c.saTranslations.LenNamespaces() > 1 { panic("multiple namespace search attribute mappings are not supported") } - translators = append(translators, interceptor.NewSearchAttributeTranslator(c.logger, c.saTranslations.FlattenMaps(), c.saTranslations.Inverse().FlattenMaps())) + translators = append(translators, interceptor.NewSearchAttributeTranslator(c.loggers.Get(LogInterceptor), + c.saTranslations.FlattenMaps(), c.saTranslations.Inverse().FlattenMaps())) } if len(translators) > 0 { - tr := interceptor.NewTranslationInterceptor(c.logger, translators) + tr := interceptor.NewTranslationInterceptor(c.loggers.Get(LogInterceptor), translators) unaryInterceptors = append(unaryInterceptors, tr.Intercept) streamInterceptors = append(streamInterceptors, tr.InterceptStream) } if c.clusterDefinition.ACLPolicy != nil { - aclInterceptor := interceptor.NewAccessControlInterceptor(c.logger, c.clusterDefinition.ACLPolicy.AllowedMethods.AdminService, c.clusterDefinition.ACLPolicy.AllowedNamespaces) + aclInterceptor := interceptor.NewAccessControlInterceptor(c.loggers.Get(LogInterceptor), + c.clusterDefinition.ACLPolicy.AllowedMethods.AdminService, c.clusterDefinition.ACLPolicy.AllowedNamespaces) unaryInterceptors = append(unaryInterceptors, aclInterceptor.Intercept) streamInterceptors = append(streamInterceptors, aclInterceptor.StreamIntercept) } @@ -386,7 +400,7 @@ func makeServerOptions(c serverConfiguration, tlsConfig encryption.TLSConfig) ([ } if tlsConfig.IsEnabled() { - tlsConfig, err := encryption.GetServerTLSConfig(tlsConfig, c.logger) + tlsConfig, err := encryption.GetServerTLSConfig(tlsConfig, c.loggers.Get(LogTLSHandshake)) if err != nil { return opts, err } @@ -400,11 +414,12 @@ func (s *simpleGRPCServer) Start() { metrics.GRPCServerStarted.WithLabelValues(s.name).Inc() go func() { s.logger.Info("Starting TCP-TLS gRPC server", tag.Name(s.name), tag.Address(s.listener.Addr().String())) + defer s.logger.Info("TCP-TLS gRPC server closed as requested", tag.Name(s.name), tag.Address(s.listener.Addr().String())) for s.lifetime.Err() == nil { err := s.server.Serve(s.listener) if s.lifetime.Err() != nil { // Cluster is closing, just exit. - break + return } if err != nil { s.logger.Warn("GRPC server failed", tag.NewStringTag("direction", "outbound"), @@ -417,7 +432,6 @@ func (s *simpleGRPCServer) Start() { } time.Sleep(1 * time.Second) } - s.logger.Info("TCP-TLS gRPC server closed as requested", tag.Name(s.name), tag.Address(s.listener.Addr().String())) }() // The basic net.Listen, grpc.Server, and ClientConn are not context-aware, so make sure they clean up on context close. context.AfterFunc(s.lifetime, func() { diff --git a/proxy/cluster_connection_test.go b/proxy/cluster_connection_test.go index cf45a56d..bbcb5192 100644 --- a/proxy/cluster_connection_test.go +++ b/proxy/cluster_connection_test.go @@ -16,6 +16,7 @@ import ( "github.com/temporalio/s2s-proxy/config" "github.com/temporalio/s2s-proxy/endtoendtest/testservices" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" "github.com/temporalio/s2s-proxy/transport/grpcutil" "github.com/temporalio/s2s-proxy/transport/mux" @@ -159,11 +160,11 @@ func makeEchoServer(name string, listenAddress string, logger log.Logger) *tests nil, listenAddress, logger) } -func newPairedLocalClusterConnection(t *testing.T, isMux bool, logger log.Logger) *pairedLocalClusterConnection { +func newPairedLocalClusterConnection(t *testing.T, isMux bool, loggers logging.LoggerProvider) *pairedLocalClusterConnection { a := getDynamicPlccAddresses(t) - localTemporal := makeEchoServer("local", a.localTemporalAddr, logger) - remoteTemporal := makeEchoServer("remote", a.remoteTemporalAddr, logger) + localTemporal := makeEchoServer("local", a.localTemporalAddr, loggers.Get("root")) + remoteTemporal := makeEchoServer("remote", a.remoteTemporalAddr, loggers.Get("root")) var localCC, remoteCC *ClusterConnection var cancelLocalCC, cancelRemoteCC context.CancelFunc @@ -172,25 +173,25 @@ func newPairedLocalClusterConnection(t *testing.T, isMux bool, logger log.Logger var localCtx context.Context localCtx, cancelLocalCC = context.WithCancel(t.Context()) localCC, err = NewClusterConnection(localCtx, makeTCPClusterConfig("TCP-only Connection Local Proxy", - a.localTemporalAddr, a.localProxyInbound, a.localProxyOutbound, a.remoteProxyInbound), logger) + a.localTemporalAddr, a.localProxyInbound, a.localProxyOutbound, a.remoteProxyInbound), loggers) require.NoError(t, err) var remoteCtx context.Context remoteCtx, cancelRemoteCC = context.WithCancel(t.Context()) remoteCC, err = NewClusterConnection(remoteCtx, makeTCPClusterConfig("TCP-only Connection Remote Proxy", - a.remoteTemporalAddr, a.remoteProxyInbound, a.remoteProxyOutbound, a.localProxyInbound), logger) + a.remoteTemporalAddr, a.remoteProxyInbound, a.remoteProxyOutbound, a.localProxyInbound), loggers) require.NoError(t, err) } else { var localCtx context.Context localCtx, cancelLocalCC = context.WithCancel(t.Context()) localCC, err = NewClusterConnection(localCtx, makeMuxClusterConfig("Mux Connection Local Establishing Proxy", - config.ConnTypeMuxClient, a.localTemporalAddr, a.localProxyOutbound, a.remoteProxyInbound), logger) + config.ConnTypeMuxClient, a.localTemporalAddr, a.localProxyOutbound, a.remoteProxyInbound), loggers) require.NoError(t, err) var remoteCtx context.Context remoteCtx, cancelRemoteCC = context.WithCancel(t.Context()) remoteCC, err = NewClusterConnection(remoteCtx, makeMuxClusterConfig("Mux Connection Remote Receiving Proxy", - config.ConnTypeMuxServer, a.remoteTemporalAddr, a.remoteProxyOutbound, a.remoteProxyInbound), logger) + config.ConnTypeMuxServer, a.remoteTemporalAddr, a.remoteProxyOutbound, a.remoteProxyInbound), loggers) require.NoError(t, err) } clientFromLocal, err := grpc.NewClient(a.localProxyOutbound, grpcutil.MakeDialOptions(nil, metrics.GetStandardGRPCClientInterceptor("outbound-local"))...) @@ -211,8 +212,8 @@ func newPairedLocalClusterConnection(t *testing.T, isMux bool, logger log.Logger } func TestTCPClusterConnection(t *testing.T) { - logger := log.NewTestLogger() - plcc := newPairedLocalClusterConnection(t, false, logger) + loggerProvider := logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})) + plcc := newPairedLocalClusterConnection(t, false, loggerProvider) plcc.StartAll(t) ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -228,8 +229,8 @@ func TestTCPClusterConnection(t *testing.T) { } func TestMuxClusterConnection(t *testing.T) { - logger := log.NewTestLogger() - plcc := newPairedLocalClusterConnection(t, true, logger) + loggerProvider := logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})) + plcc := newPairedLocalClusterConnection(t, true, loggerProvider) plcc.StartAll(t) t.Log("Started plcc") @@ -248,8 +249,8 @@ func TestMuxClusterConnection(t *testing.T) { } func TestMuxCCFailover(t *testing.T) { - logger := log.NewTestLogger() - plcc := newPairedLocalClusterConnection(t, true, logger) + loggerProvider := logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})) + plcc := newPairedLocalClusterConnection(t, true, loggerProvider) plcc.StartAll(t) plcc.cancelRemoteCC() @@ -259,7 +260,7 @@ func TestMuxCCFailover(t *testing.T) { cancel() newConnection, err := NewClusterConnection(t.Context(), makeMuxClusterConfig("newRemoteMux", config.ConnTypeMuxServer, plcc.addresses.remoteTemporalAddr, plcc.addresses.remoteProxyOutbound, plcc.addresses.remoteProxyInbound, - func(cc *config.ClusterConnConfig) { cc.RemoteServer.Connection.MuxCount = 5 }), logger) + func(cc *config.ClusterConnConfig) { cc.RemoteServer.Connection.MuxCount = 5 }), loggerProvider) require.NoError(t, err) newConnection.Start() // Wait for localCC's client retry... diff --git a/proxy/intra_proxy_router.go b/proxy/intra_proxy_router.go index be4b1a8a..517487cd 100644 --- a/proxy/intra_proxy_router.go +++ b/proxy/intra_proxy_router.go @@ -20,6 +20,7 @@ import ( "github.com/temporalio/s2s-proxy/common" "github.com/temporalio/s2s-proxy/encryption" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" "github.com/temporalio/s2s-proxy/transport/grpcutil" ) @@ -39,7 +40,7 @@ type RoutedMessage struct { // intraProxyManager maintains long-lived intra-proxy streams to peer proxies and // provides simple send helpers (e.g., forwarding ACKs). type intraProxyManager struct { - logger log.Logger + loggers logging.LoggerProvider streamsMu sync.RWMutex shardManager ShardManager notifyCh chan struct{} @@ -59,9 +60,9 @@ type peerStreamKey struct { sourceShard history.ClusterShardID } -func newIntraProxyManager(logger log.Logger, shardManager ShardManager) *intraProxyManager { +func newIntraProxyManager(logProvider logging.LoggerProvider, shardManager ShardManager) *intraProxyManager { return &intraProxyManager{ - logger: logger, + loggers: logProvider, shardManager: shardManager, peers: make(map[string]*peerState), notifyCh: make(chan struct{}), @@ -460,7 +461,9 @@ func (m *intraProxyManager) RegisterSender( return } key := peerStreamKey{targetShard: targetShard, sourceShard: sourceShard} - m.logger.Info("RegisterSender", tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("sender", sender.streamID)) + m.loggers.Get(logging.ShardRouting).Info("RegisterSender", + tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("key", fmt.Sprintf("%v", key)), + tag.NewStringTag("sender", sender.streamID)) m.streamsMu.Lock() ps := m.peers[peerNodeName] if ps == nil { @@ -480,7 +483,8 @@ func (m *intraProxyManager) UnregisterSender( sourceShard history.ClusterShardID, ) { key := peerStreamKey{targetShard: targetShard, sourceShard: sourceShard} - m.logger.Info("UnregisterSender", tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("key", fmt.Sprintf("%v", key))) + m.loggers.Get(logging.ShardRouting).Info("UnregisterSender", tag.NewStringTag("peerNodeName", peerNodeName), + tag.NewStringTag("key", fmt.Sprintf("%v", key))) m.streamsMu.Lock() if ps := m.peers[peerNodeName]; ps != nil && ps.senders != nil { delete(ps.senders, key) @@ -490,7 +494,7 @@ func (m *intraProxyManager) UnregisterSender( // EnsureReceiverForPeerShard ensures a client stream and an ACK aggregator exist for the given peer/shard pair. func (m *intraProxyManager) EnsureReceiverForPeerShard(peerNodeName string, targetShard history.ClusterShardID, sourceShard history.ClusterShardID) { - logger := log.With(m.logger, + logger := log.With(m.loggers.Get(logging.ShardRouting), tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("targetShard", ClusterShardIDtoString(targetShard)), tag.NewStringTag("sourceShard", ClusterShardIDtoString(sourceShard))) @@ -523,7 +527,7 @@ func (m *intraProxyManager) ensurePeer( ctx context.Context, peerNodeName string, ) (*peerState, error) { - logger := log.With(m.logger, tag.NewStringTag("peerNodeName", peerNodeName)) + logger := log.With(m.loggers.Get(logging.ShardRouting), tag.NewStringTag("peerNodeName", peerNodeName)) logger.Debug("ensurePeer started") defer logger.Debug("ensurePeer finished") @@ -626,7 +630,7 @@ func (m *intraProxyManager) ensureStream( // Create receiver and register tracking recv := &intraProxyStreamReceiver{ - logger: log.With(m.logger, + logger: log.With(m.loggers.Get(logging.ShardRouting), tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("targetShardID", ClusterShardIDtoString(targetShard)), tag.NewStringTag("sourceShardID", ClusterShardIDtoString(sourceShard))), @@ -642,12 +646,12 @@ func (m *intraProxyManager) ensureStream( ps.receivers[key] = recv ps.recvShutdown[key] = recv.shutdown m.streamsMu.Unlock() - m.logger.Debug("intraProxyStreamReceiver added", tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", recv.streamID)) + logger.Debug("intraProxyStreamReceiver added", tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", recv.streamID)) // Let the receiver open stream, register tracking, and start goroutines go func() { if err := recv.Run(ctx, m.shardManager, ps.conn); err != nil { - m.logger.Error("intraProxyStreamReceiver.Run error", tag.Error(err)) + m.loggers.Get(logging.ShardRouting).Error("intraProxyStreamReceiver.Run error", tag.Error(err)) } // remove the receiver from the peer state m.streamsMu.Lock() @@ -672,7 +676,7 @@ func (m *intraProxyManager) sendAck( if ps, ok := m.peers[peerNodeName]; ok && ps != nil { if r, ok2 := ps.receivers[key]; ok2 && r != nil && r.streamClient != nil { if err := r.sendAck(req); err != nil { - m.logger.Error("Failed to send intra-proxy ACK", tag.Error(err)) + m.loggers.Get(logging.ShardRouting).Error("Failed to send intra-proxy ACK", tag.Error(err)) return err } return nil @@ -690,7 +694,7 @@ func (m *intraProxyManager) sendReplicationMessages( resp *adminservice.StreamWorkflowReplicationMessagesResponse, ) error { key := peerStreamKey{targetShard: targetShard, sourceShard: sourceShard} - logger := log.With(m.logger, tag.NewStringTag("task-target-shard", ClusterShardIDtoString(targetShard)), tag.NewStringTag("task-source-shard", ClusterShardIDtoString(sourceShard))) + logger := log.With(m.loggers.Get(logging.ShardRouting), tag.NewStringTag("task-target-shard", ClusterShardIDtoString(targetShard)), tag.NewStringTag("task-source-shard", ClusterShardIDtoString(sourceShard))) logger.Debug("sendReplicationMessages") defer logger.Debug("sendReplicationMessages finished") @@ -744,7 +748,7 @@ func (m *intraProxyManager) closePeerLocked(peer string, ps *peerState) { } // Close client streams (receiver cleanup is handled by its own goroutine) for key := range ps.receivers { - m.logger.Info("intraProxyStreamReceiver deleted", tag.NewStringTag("peerNodeName", peer), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", ps.receivers[key].streamID)) + m.loggers.Get(logging.ShardRouting).Info("intraProxyStreamReceiver deleted", tag.NewStringTag("peerNodeName", peer), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", ps.receivers[key].streamID)) delete(ps.receivers, key) } // Unregister server-side tracker entries @@ -763,7 +767,7 @@ func (m *intraProxyManager) closePeerLocked(peer string, ps *peerState) { // closePeerShardLocked shuts down and removes resources for a specific peer/shard pair. Caller must hold m.streamsMu. func (m *intraProxyManager) closePeerShardLocked(peer string, ps *peerState, key peerStreamKey) { - m.logger.Info("closePeerShardLocked", tag.NewStringTag("peer", peer), tag.NewStringTag("clientShard", ClusterShardIDtoString(key.targetShard)), tag.NewStringTag("serverShard", ClusterShardIDtoString(key.sourceShard))) + m.loggers.Get(logging.ShardRouting).Info("closePeerShardLocked", tag.NewStringTag("peer", peer), tag.NewStringTag("clientShard", ClusterShardIDtoString(key.targetShard)), tag.NewStringTag("serverShard", ClusterShardIDtoString(key.sourceShard))) if shut, ok := ps.recvShutdown[key]; ok && shut != nil { shut.Shutdown() st := GetGlobalStreamTracker() @@ -779,7 +783,7 @@ func (m *intraProxyManager) closePeerShardLocked(peer string, ps *peerState, key if r.streamClient != nil { _ = r.streamClient.CloseSend() } - m.logger.Info("intraProxyStreamReceiver deleted", tag.NewStringTag("peerNodeName", peer), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", r.streamID)) + m.loggers.Get(logging.ShardRouting).Info("intraProxyStreamReceiver deleted", tag.NewStringTag("peerNodeName", peer), tag.NewStringTag("key", fmt.Sprintf("%v", key)), tag.NewStringTag("receiver", r.streamID)) delete(ps.receivers, key) } st := GetGlobalStreamTracker() @@ -808,8 +812,8 @@ func (m *intraProxyManager) ClosePeerShard(peer string, clientShard, serverShard } func (m *intraProxyManager) Start() { - m.logger.Info("intraProxyManager starting") - defer m.logger.Info("intraProxyManager started") + m.loggers.Get(logging.ShardRouting).Info("intraProxyManager starting") + defer m.loggers.Get(logging.ShardRouting).Info("intraProxyManager started") go func() { for { // timer @@ -835,16 +839,16 @@ func (m *intraProxyManager) Notify() { // for a given peer and closes any sender/receiver not in the desired set. // This mirrors the Temporal StreamReceiverMonitor approach. func (m *intraProxyManager) ReconcilePeerStreams(peerNodeName string) { - m.logger.Debug("ReconcilePeerStreams started", tag.NewStringTag("peerNodeName", peerNodeName)) - defer m.logger.Debug("ReconcilePeerStreams done", tag.NewStringTag("peerNodeName", peerNodeName)) + m.loggers.Get(logging.ShardRouting).Debug("ReconcilePeerStreams started", tag.NewStringTag("peerNodeName", peerNodeName)) + defer m.loggers.Get(logging.ShardRouting).Debug("ReconcilePeerStreams done", tag.NewStringTag("peerNodeName", peerNodeName)) localShards := m.shardManager.GetLocalShards() remoteShards, err := m.shardManager.GetRemoteShardsForPeer(peerNodeName) if err != nil { - m.logger.Error("Failed to get remote shards for peer", tag.Error(err)) + m.loggers.Get(logging.ShardRouting).Error("Failed to get remote shards for peer", tag.Error(err)) return } - m.logger.Debug("ReconcilePeerStreams remote and local shards", + m.loggers.Get(logging.ShardRouting).Debug("ReconcilePeerStreams remote and local shards", tag.NewStringTag("peerNodeName", peerNodeName), tag.NewStringTag("remoteShards", fmt.Sprintf("%v", remoteShards)), tag.NewStringTag("localShards", fmt.Sprintf("%v", localShards)), @@ -877,7 +881,7 @@ func (m *intraProxyManager) ReconcilePeerStreams(peerNodeName string) { } } - m.logger.Debug("ReconcilePeerStreams desired receivers and senders", tag.NewStringTag("desiredReceivers", fmt.Sprintf("%v", desiredReceivers)), tag.NewStringTag("desiredSenders", fmt.Sprintf("%v", desiredSenders))) + m.loggers.Get(logging.ShardRouting).Debug("ReconcilePeerStreams desired receivers and senders", tag.NewStringTag("desiredReceivers", fmt.Sprintf("%v", desiredReceivers)), tag.NewStringTag("desiredSenders", fmt.Sprintf("%v", desiredSenders))) // Ensure all desired receivers exist for key := range desiredReceivers { diff --git a/proxy/proxy.go b/proxy/proxy.go index 4acfacdb..a120fbb2 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -7,10 +7,10 @@ import ( "net/http" "strings" - "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" "github.com/temporalio/s2s-proxy/metrics" ) @@ -31,23 +31,18 @@ type ( inboundHealthCheckServer *http.Server outboundHealthCheckServer *http.Server metricsServer *http.Server - logger log.Logger + logProvider logging.LoggerProvider } ) -func NewProxy(configProvider config.ConfigProvider, logger log.Logger) *Proxy { +func NewProxy(configProvider config.ConfigProvider, logProvider logging.LoggerProvider) *Proxy { s2sConfig := config.ToClusterConnConfig(configProvider.GetS2SProxyConfig()) ctx, cancel := context.WithCancel(context.Background()) proxy := &Proxy{ lifetime: ctx, cancel: cancel, clusterConnections: make(map[migrationId]*ClusterConnection, len(s2sConfig.MuxTransports)), - logger: log.NewThrottledLogger( - logger, - func() float64 { - return s2sConfig.Logging.GetThrottleMaxRPS() - }, - ), + logProvider: logProvider, } if len(s2sConfig.ClusterConnections) == 0 { panic(errors.New("cannot create proxy without inbound and outbound config")) @@ -57,9 +52,9 @@ func NewProxy(configProvider config.ConfigProvider, logger log.Logger) *Proxy { } for _, clusterCfg := range s2sConfig.ClusterConnections { - cc, err := NewClusterConnection(ctx, clusterCfg, logger) + cc, err := NewClusterConnection(ctx, clusterCfg, logProvider) if err != nil { - logger.Fatal("Incorrectly configured Mux cluster connection", tag.Error(err), tag.NewStringTag("name", clusterCfg.Name)) + logProvider.Get("init").Fatal("Incorrectly configured Mux cluster connection", tag.Error(err), tag.NewStringTag("name", clusterCfg.Name)) continue } migrationId := migrationId{clusterCfg.Name} @@ -92,9 +87,9 @@ func (s *Proxy) startHealthCheckHandler(lifetime context.Context, healthChecker } go func() { - s.logger.Info("Starting health check server", tag.Address(cfg.ListenAddress)) + s.logProvider.Get("init").Info("Starting health check server", tag.Address(cfg.ListenAddress)) if err := healthCheckServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - s.logger.Error("Error starting server", tag.Error(err)) + s.logProvider.Get("init").Error("Error starting server", tag.Error(err)) } }() context.AfterFunc(lifetime, func() { @@ -106,16 +101,16 @@ func (s *Proxy) startHealthCheckHandler(lifetime context.Context, healthChecker func (s *Proxy) startMetricsHandler(lifetime context.Context, cfg config.MetricsConfig) error { // Why not use the global ServeMux? So that it can be used in unit tests mux := http.NewServeMux() - mux.Handle("/metrics", metrics.NewMetricsHandler(s.logger)) + mux.Handle("/metrics", metrics.NewMetricsHandler(s.logProvider.Get("metrics"))) s.metricsServer = &http.Server{ Addr: cfg.Prometheus.ListenAddress, Handler: mux, } go func() { - s.logger.Info("Starting metrics server", tag.Address(cfg.Prometheus.ListenAddress)) + s.logProvider.Get("metrics").Info("Starting metrics server", tag.Address(cfg.Prometheus.ListenAddress)) if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - s.logger.Error("Error starting server", tag.Error(err)) + s.logProvider.Get("metrics").Error("Error starting server", tag.Error(err)) } }() context.AfterFunc(lifetime, func() { @@ -131,11 +126,11 @@ func (s *Proxy) Start() error { // TODO: Rethink health checks. The inbound/outbound traffic availability isn't quite right for a health check return true } - if s.inboundHealthCheckServer, err = s.startHealthCheckHandler(s.lifetime, newInboundHealthCheck(healthFn, s.logger), *s.inboundHealthCheckConfig); err != nil { + if s.inboundHealthCheckServer, err = s.startHealthCheckHandler(s.lifetime, newInboundHealthCheck(healthFn, s.logProvider.Get("healthCheck")), *s.inboundHealthCheckConfig); err != nil { return err } } else { - s.logger.Warn("Started up without inbound health check! Double-check the YAML config," + + s.logProvider.Get("init").Warn("Started up without inbound health check! Double-check the YAML config," + " it needs at least the following path: healthCheck.listenAddress") } @@ -145,11 +140,11 @@ func (s *Proxy) Start() error { return true } var err error - if s.outboundHealthCheckServer, err = s.startHealthCheckHandler(s.lifetime, newOutboundHealthCheck(healthFn, s.logger), *s.outboundHealthCheckConfig); err != nil { + if s.outboundHealthCheckServer, err = s.startHealthCheckHandler(s.lifetime, newOutboundHealthCheck(healthFn, s.logProvider.Get("healthCheck")), *s.outboundHealthCheckConfig); err != nil { return err } } else { - s.logger.Warn("Started up without outbound health check! Double-check the YAML config," + + s.logProvider.Get("init").Warn("Started up without outbound health check! Double-check the YAML config," + " it needs at least the following path: outboundHealthCheck.listenAddress") } @@ -158,7 +153,7 @@ func (s *Proxy) Start() error { return err } } else { - s.logger.Warn(`Started up without metrics! Double-check the YAML config,` + + s.logProvider.Get("init").Warn(`Started up without metrics! Double-check the YAML config,` + ` it needs at least the following path: metrics.prometheus.listenAddress`) } @@ -166,7 +161,7 @@ func (s *Proxy) Start() error { v.Start() } - s.logger.Info(fmt.Sprintf("Started Proxy with the following config:\n%s", s.Describe())) + s.logProvider.Get("init").Info(fmt.Sprintf("Started Proxy with the following config:\n%s", s.Describe())) return nil } diff --git a/proxy/shard_manager.go b/proxy/shard_manager.go index 61fc34f0..76e29a64 100644 --- a/proxy/shard_manager.go +++ b/proxy/shard_manager.go @@ -17,6 +17,7 @@ import ( "github.com/temporalio/s2s-proxy/config" "github.com/temporalio/s2s-proxy/encryption" + "github.com/temporalio/s2s-proxy/logging" ) type ( @@ -174,14 +175,14 @@ type ( ) // NewShardManager creates a new shard manager instance -func NewShardManager(memberlistConfig *config.MemberlistConfig, shardCountConfig config.ShardCountConfig, intraProxyTLSConfig encryption.TLSConfig, logger log.Logger) ShardManager { +func NewShardManager(memberlistConfig *config.MemberlistConfig, shardCountConfig config.ShardCountConfig, intraProxyTLSConfig encryption.TLSConfig, loggers logging.LoggerProvider) ShardManager { delegate := &shardDelegate{ - logger: logger, + logger: loggers.Get(logging.ShardManager), } sm := &shardManagerImpl{ memberlistConfig: memberlistConfig, - logger: logger, + logger: loggers.Get(logging.ShardManager), delegate: delegate, localShards: make(map[string]ShardInfo), intraMgr: nil, @@ -197,7 +198,7 @@ func NewShardManager(memberlistConfig *config.MemberlistConfig, shardCountConfig delegate.manager = sm if memberlistConfig != nil && shardCountConfig.Mode == config.ShardCountRouting { - sm.intraMgr = newIntraProxyManager(logger, sm) + sm.intraMgr = newIntraProxyManager(loggers, sm) } return sm diff --git a/proxy/test/test_common.go b/proxy/test/test_common.go index f321335d..f96b1d93 100644 --- a/proxy/test/test_common.go +++ b/proxy/test/test_common.go @@ -20,6 +20,7 @@ import ( "go.temporal.io/server/tests/testcore" "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" s2sproxy "github.com/temporalio/s2s-proxy/proxy" ) @@ -366,7 +367,7 @@ func createProxy( } configProvider := &simpleConfigProvider{cfg: *cfg} - proxy := s2sproxy.NewProxy(configProvider, logger) + proxy := s2sproxy.NewProxy(configProvider, logging.NewLoggerProvider(logger, configProvider)) if proxy == nil { t.Fatalf("Failed to create proxy %s", name) } diff --git a/proxy/workflowservice.go b/proxy/workflowservice.go index 1bc73542..da66c410 100644 --- a/proxy/workflowservice.go +++ b/proxy/workflowservice.go @@ -9,6 +9,7 @@ import ( "github.com/temporalio/s2s-proxy/auth" "github.com/temporalio/s2s-proxy/common" + "github.com/temporalio/s2s-proxy/logging" ) const DCRedirectionContextHeaderName = "xdc-redirection" // https://github.com/temporalio/temporal/blob/9a1060c4162ff62576cb899d7e5b1bae179af814/common/rpc/interceptor/redirection.go#L27 @@ -29,9 +30,9 @@ func NewWorkflowServiceProxyServer( serviceName string, workflowServiceClient workflowservice.WorkflowServiceClient, namespaceAccess *auth.AccessControl, - logger log.Logger, + loggers logging.LoggerProvider, ) workflowservice.WorkflowServiceServer { - logger = log.With(logger, common.ServiceTag(serviceName)) + logger := log.With(loggers.Get(logging.WorkflowService), common.ServiceTag(serviceName)) return &workflowServiceProxyServer{ workflowServiceClient: workflowServiceClient, namespaceAccess: namespaceAccess, diff --git a/proxy/workflowservice_test.go b/proxy/workflowservice_test.go index b1ab04bf..9858ec2a 100644 --- a/proxy/workflowservice_test.go +++ b/proxy/workflowservice_test.go @@ -17,6 +17,8 @@ import ( "google.golang.org/grpc/metadata" "github.com/temporalio/s2s-proxy/auth" + "github.com/temporalio/s2s-proxy/config" + "github.com/temporalio/s2s-proxy/logging" ) var ( @@ -80,8 +82,9 @@ func (s *workflowServiceTestSuite) SetupTest() { // a predefined set of namespaces to the proxy layer. Then we check to make sure the proxy kept the allowed namespace // and rejected the disallowed namespace. func (s *workflowServiceTestSuite) TestNamespaceFiltering() { + loggerProvider := logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})) wfProxy := NewWorkflowServiceProxyServer("My cool test server", s.clientMock, - auth.NewAccesControl([]string{"Bob Ross's Paint Shop"}), log.NewTestLogger()) + auth.NewAccesControl([]string{"Bob Ross's Paint Shop"}), loggerProvider) s.clientMock.EXPECT().ListNamespaces(gomock.Any(), gomock.Any()).Return(listNamespacesResponse, nil) res, _ := wfProxy.ListNamespaces(metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{})), @@ -105,7 +108,8 @@ func (s *workflowServiceTestSuite) TestNamespaceFiltering() { } func (s *workflowServiceTestSuite) TestPreserveRedirectionHeader() { - wfProxy := NewWorkflowServiceProxyServer("My cool test server", s.clientMock, nil, log.NewTestLogger()) + loggerProvider := logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})) + wfProxy := NewWorkflowServiceProxyServer("My cool test server", s.clientMock, nil, loggerProvider) // Client should be called with xdc-redirection=false header for _, headerValue := range []string{"true", "false", ""} {