Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/fx"

"github.com/temporalio/s2s-proxy/config"
"github.com/temporalio/s2s-proxy/logging"
"github.com/temporalio/s2s-proxy/proto/compat"
"github.com/temporalio/s2s-proxy/proxy"
)
Expand Down Expand Up @@ -95,6 +96,7 @@ func startProxy(c *cli.Context) error {
fx.Provide(func() log.Logger {
return log.NewZapLogger(log.BuildZapLogger(logCfg))
}),
logging.Module,
config.Module,
proxy.Module,
fx.Populate(&proxyParams),
Expand Down
16 changes: 9 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ type (
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
ShardCountConfig ShardCountConfig `yaml:"shardCount"`
// TODO: Soon to be deprecated! Create an item in ClusterConnections instead
MemberlistConfig *MemberlistConfig `yaml:"memberlist"`
NamespaceNameTranslation NameTranslationConfig `yaml:"namespaceNameTranslation"`
SearchAttributeTranslation SATranslationConfig `yaml:"searchAttributeTranslation"`
Metrics *MetricsConfig `yaml:"metrics"`
ProfilingConfig ProfilingConfig `yaml:"profiling"`
Logging LoggingConfig `yaml:"logging"`
ClusterConnections []ClusterConnConfig `yaml:"clusterConnections"`
MemberlistConfig *MemberlistConfig `yaml:"memberlist"`
NamespaceNameTranslation NameTranslationConfig `yaml:"namespaceNameTranslation"`
SearchAttributeTranslation SATranslationConfig `yaml:"searchAttributeTranslation"`
Metrics *MetricsConfig `yaml:"metrics"`
ProfilingConfig ProfilingConfig `yaml:"profiling"`
Logging LoggingConfig `yaml:"logging"`
LogConfigs map[string]LoggingConfig `yaml:"logConfigs"`
ClusterConnections []ClusterConnConfig `yaml:"clusterConnections"`
}

SATranslationConfig struct {
Expand Down Expand Up @@ -221,6 +222,7 @@ type (

LoggingConfig struct {
ThrottleMaxRPS float64 `yaml:"throttleMaxRPS"`
Disabled bool `yaml:"disabled"`
}

MemberlistConfig struct {
Expand Down
1 change: 1 addition & 0 deletions config/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ToClusterConnConfig(config S2SProxyConfig) S2SProxyConfig {
Metrics: config.Metrics,
ProfilingConfig: config.ProfilingConfig,
Logging: config.Logging,
LogConfigs: config.LogConfigs,
}
}

Expand Down
5 changes: 5 additions & 0 deletions config/new_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func TestConversionWithTLS(t *testing.T) {
proxyConfig, err := LoadConfig[S2SProxyConfig](samplePath)
require.NoError(t, err)
converted := ToClusterConnConfig(proxyConfig)
require.Equal(t, float64(10), converted.LogConfigs["adminservice"].ThrottleMaxRPS)
require.Equal(t, float64(11), converted.LogConfigs["testexample"].ThrottleMaxRPS)
require.Equal(t, float64(12), converted.LogConfigs["adminstreams"].ThrottleMaxRPS)
require.Equal(t, false, converted.LogConfigs["adminstreams"].Disabled)
require.Equal(t, true, converted.LogConfigs["testdisabled"].Disabled)
require.Equal(t, 1, len(converted.ClusterConnections))
require.Nil(t, converted.Inbound)
require.Nil(t, converted.Outbound)
Expand Down
11 changes: 10 additions & 1 deletion develop/old-config-with-TLS.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,13 @@ metrics:
prometheus:
listenAddress: "0.0.0.0:9090"
# This enables profiling with the default config and port
profiling: {}
profiling: {}
logConfigs:
adminservice:
throttleMaxRPS: 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can RPS be < 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will update to demonstrate it

testexample:
throttleMaxRPS: 11
adminstreams:
throttleMaxRPS: 12
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we always config adminservice and adminstreams and treat logConfigs as overrides? this is to avoid adding these configs in all yaml files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yeah some defaults based on the old values is probably a good idea. I'll add a block in logging

testdisabled:
disabled: true
3 changes: 2 additions & 1 deletion endtoendtest/echo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/temporalio/s2s-proxy/config"
"github.com/temporalio/s2s-proxy/encryption"
"github.com/temporalio/s2s-proxy/endtoendtest/testservices"
"github.com/temporalio/s2s-proxy/logging"
"github.com/temporalio/s2s-proxy/metrics"
s2sproxy "github.com/temporalio/s2s-proxy/proxy"
"github.com/temporalio/s2s-proxy/transport/grpcutil"
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewEchoServer(
configProvider := config.NewMockConfigProvider(*localClusterInfo.S2sProxyConfig)
proxy = s2sproxy.NewProxy(
configProvider,
logger,
logging.NewLoggerProvider(logger, configProvider),
)

clientConfig = config.ProxyClientConfig{
Expand Down
7 changes: 7 additions & 0 deletions logging/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package logging

import "go.uber.org/fx"

var Module = fx.Provide(
NewLoggerProvider,
)
59 changes: 59 additions & 0 deletions logging/logger_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package logging

import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"

"github.com/temporalio/s2s-proxy/config"
)

type (
LogComponentName string
// LoggerProvider provides customized loggers for different components.
// Based on the component name, different throttling levels can be applied.
// Right now, any tags stored with the LoggerProvider with With() will be applied to all loggers returned by Get().
LoggerProvider interface {
// Get returns a logger for the given component. If there is no custom config, the root logger will be returned.
Get(component LogComponentName) log.Logger
// With returns a new logger provider with the given tags added to all loggers.
With(tags ...tag.Tag) LoggerProvider
}
loggerProvider struct {
root log.Logger
loggers map[LogComponentName]log.Logger
tags []tag.Tag
}
)

func NewLoggerProvider(root log.Logger, config config.ConfigProvider) LoggerProvider {
logConfigs := config.GetS2SProxyConfig().LogConfigs
globalRootThrottle := log.NewThrottledLogger(root, config.GetS2SProxyConfig().Logging.GetThrottleMaxRPS)
loggersByComponent := make(map[LogComponentName]log.Logger, len(logConfigs))
for component, logConfig := range logConfigs {
if logConfig.Disabled {
loggersByComponent[LogComponentName(component)] = log.NewNoopLogger()
} else {
loggersByComponent[LogComponentName(component)] = log.NewThrottledLogger(globalRootThrottle, logConfig.GetThrottleMaxRPS)
}
}
return &loggerProvider{
root: root,
loggers: loggersByComponent,
}
}

func (l *loggerProvider) Get(component LogComponentName) log.Logger {
logger, exists := l.loggers[component]
if !exists {
logger = l.root
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if component doesn't exist, then the default behavior is to use root which means always log right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, though the default logger can be set with a throttle as well.

}
return log.With(logger, l.tags...)
}

func (l *loggerProvider) With(tags ...tag.Tag) LoggerProvider {
return &loggerProvider{
root: l.root,
loggers: l.loggers,
tags: append(l.tags, tags...),
}
}
49 changes: 23 additions & 26 deletions proxy/adminservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import (

"github.com/temporalio/s2s-proxy/common"
"github.com/temporalio/s2s-proxy/config"
"github.com/temporalio/s2s-proxy/logging"
"github.com/temporalio/s2s-proxy/metrics"
)

const (
LogAdminService = "adminService"
LogReplicationStreams = "replicationStreams"
)

type (
LCMParameters struct {
LCM int32
Expand All @@ -34,8 +40,7 @@ type (
shardManager ShardManager
adminClient adminservice.AdminServiceClient
adminClientReverse adminservice.AdminServiceClient
verboseLogger log.Logger
replicationLogger log.Logger
loggers logging.LoggerProvider
apiOverrides *config.APIOverridesConfig
metricLabelValues []string
reportStreamValue func(idx int32, value int32)
Expand All @@ -57,23 +62,15 @@ func NewAdminServiceProxyServer(
shardCountConfig config.ShardCountConfig,
lcmParameters LCMParameters,
routingParameters RoutingParameters,
logger log.Logger,
logProvider logging.LoggerProvider,
shardManager ShardManager,
lifetime context.Context,
) adminservice.AdminServiceServer {
// Replication streams / APIs will run many hundreds of times per second. Throttle their output
// to 3 / min
replicationLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
func() float64 { return 3.0 / 60.0 })
// For config operations, allow most logs so that we can see all the info without putting disk at risk
verboseLogger := log.NewThrottledLogger(log.With(logger, common.ServiceTag(serviceName)),
func() float64 { return 3.0 })
return &adminServiceProxyServer{
shardManager: shardManager,
adminClient: adminClient,
adminClientReverse: adminClientReverse,
verboseLogger: verboseLogger,
replicationLogger: replicationLogger,
loggers: logProvider.With(common.ServiceTag(serviceName)),
apiOverrides: apiOverrides,
metricLabelValues: metricLabelValues,
reportStreamValue: reportStreamValue,
Expand All @@ -85,7 +82,7 @@ func NewAdminServiceProxyServer(
}

func (s *adminServiceProxyServer) AddOrUpdateRemoteCluster(ctx context.Context, in0 *adminservice.AddOrUpdateRemoteClusterRequest) (resp *adminservice.AddOrUpdateRemoteClusterResponse, err error) {
s.verboseLogger.Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues))
s.loggers.Get(LogAdminService).Info("Received AddOrUpdateRemoteCluster", tag.Address(in0.FrontendAddress), tag.NewBoolTag("Enabled", in0.GetEnableRemoteClusterConnection()), tag.NewStringsTag("configTags", s.metricLabelValues))
if !common.IsRequestTranslationDisabled(ctx) && s.apiOverrides != nil {
reqOverride := s.apiOverrides.AdminService.AddOrUpdateRemoteCluster
if reqOverride != nil && len(reqOverride.Request.FrontendAddress) > 0 {
Expand All @@ -94,12 +91,12 @@ func (s *adminServiceProxyServer) AddOrUpdateRemoteCluster(ctx context.Context,
// from the local temporal server, or the proxy may be deployed behind a load balancer.
// Only used in single-proxy scenarios, i.e. Temporal <> Proxy <> Temporal
in0.FrontendAddress = reqOverride.Request.FrontendAddress
s.verboseLogger.Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues))
s.loggers.Get(LogAdminService).Info("Overwrote outbound address", tag.Address(in0.FrontendAddress), tag.NewStringsTag("configTags", s.metricLabelValues))
}
}
resp, err = s.adminClient.AddOrUpdateRemoteCluster(ctx, in0)
if err != nil {
s.verboseLogger.Error("Error when adding remote cluster", tag.Error(err), tag.Operation("AddOrUpdateRemoteCluster"),
s.loggers.Get(LogAdminService).Error("Error when adding remote cluster", tag.Error(err), tag.Operation("AddOrUpdateRemoteCluster"),
tag.NewStringTag("FrontendAddress", in0.GetFrontendAddress()))
}
return
Expand All @@ -126,16 +123,16 @@ func (s *adminServiceProxyServer) DeleteWorkflowExecution(ctx context.Context, i
}

func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *adminservice.DescribeClusterRequest) (*adminservice.DescribeClusterResponse, error) {
s.verboseLogger.Info("Received DescribeClusterRequest")
s.loggers.Get(LogAdminService).Info("Received DescribeClusterRequest")
resp, err := s.adminClient.DescribeCluster(ctx, in0)
if resp != nil {
s.verboseLogger.Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
s.loggers.Get(LogAdminService).Info("Raw DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion),
tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion),
tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues))
}
if common.IsRequestTranslationDisabled(ctx) {
s.verboseLogger.Info("Request translation disabled. Returning as-is")
s.loggers.Get(LogAdminService).Info("Request translation disabled. Returning as-is")
return resp, err
}

Expand All @@ -154,19 +151,19 @@ func (s *adminServiceProxyServer) DescribeCluster(ctx context.Context, in0 *admi
responseOverride := s.apiOverrides.AdminService.DescribeCluster.Response
if resp != nil && responseOverride.FailoverVersionIncrement != nil {
resp.FailoverVersionIncrement = *responseOverride.FailoverVersionIncrement
s.verboseLogger.Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement),
s.loggers.Get(LogAdminService).Info("Overwrite FailoverVersionIncrement", tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement),
tag.NewStringsTag("configTags", s.metricLabelValues))
}
}

if resp != nil {
s.verboseLogger.Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
s.loggers.Get(LogAdminService).Info("Translated DescribeClusterResponse", tag.NewStringTag("clusterID", resp.ClusterId),
tag.NewStringTag("clusterName", resp.ClusterName), tag.NewStringTag("version", resp.ServerVersion),
tag.NewInt64("failoverVersionIncrement", resp.FailoverVersionIncrement), tag.NewInt64("initialFailoverVersion", resp.InitialFailoverVersion),
tag.NewBoolTag("isGlobalNamespaceEnabled", resp.IsGlobalNamespaceEnabled), tag.NewStringsTag("configTags", s.metricLabelValues))
}
if err != nil {
s.verboseLogger.Info("Got error when calling DescribeCluster!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues))
s.loggers.Get(LogAdminService).Info("Got error when calling DescribeCluster!", tag.Error(err), tag.NewStringsTag("configTags", s.metricLabelValues))
}
return resp, err
}
Expand All @@ -183,7 +180,7 @@ func (s *adminServiceProxyServer) DescribeMutableState(ctx context.Context, in0
resp, err = s.adminClient.DescribeMutableState(ctx, in0)
if err != nil {
// This is a duplicate of the grpc client metrics, but not everyone has metrics set up
s.replicationLogger.Error("Failed to describe workflow",
s.loggers.Get(LogReplicationStreams).Error("Failed to describe workflow",
tag.NewStringTag("WorkflowId", in0.GetExecution().GetWorkflowId()),
tag.NewStringTag("RunId", in0.GetExecution().GetRunId()),
tag.NewStringTag("Namespace", in0.GetNamespace()),
Expand Down Expand Up @@ -212,7 +209,7 @@ func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Co
resp, err = s.adminClient.GetNamespaceReplicationMessages(ctx, in0)
if err != nil {
// This is a duplicate of the grpc client metrics, but not everyone has metrics set up
s.replicationLogger.Error("Failed to get namespace replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
s.loggers.Get(LogReplicationStreams).Error("Failed to get namespace replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
tag.Error(err), tag.Operation("GetNamespaceReplicationMessages"))
}
return
Expand All @@ -221,7 +218,7 @@ func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Co
func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (resp *adminservice.GetReplicationMessagesResponse, err error) {
resp, err = s.adminClient.GetReplicationMessages(ctx, in0)
if err != nil {
s.replicationLogger.Error("Failed to get replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
s.loggers.Get(LogReplicationStreams).Error("Failed to get replication messages", tag.NewStringTag("Cluster", in0.GetClusterName()),
tag.Error(err), tag.Operation("GetReplicationMessages"))
}
return
Expand Down Expand Up @@ -326,7 +323,7 @@ func ClusterShardIDtoShortString(sd history.ClusterShardID) string {
func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
streamServer adminservice.AdminService_StreamWorkflowReplicationMessagesServer,
) (retError error) {
defer log.CapturePanic(s.replicationLogger, &retError)
defer log.CapturePanic(s.loggers.Get(LogReplicationStreams), &retError)

targetMetadata, ok := metadata.FromIncomingContext(streamServer.Context())
if !ok {
Expand All @@ -339,7 +336,7 @@ func (s *adminServiceProxyServer) StreamWorkflowReplicationMessages(
return err
}

logger := log.With(s.replicationLogger,
logger := log.With(s.loggers.Get(LogReplicationStreams),
tag.NewStringTag("source", ClusterShardIDtoString(sourceClusterShardID)),
tag.NewStringTag("target", ClusterShardIDtoString(targetClusterShardID)))

Expand Down
4 changes: 3 additions & 1 deletion proxy/adminservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/temporalio/s2s-proxy/common"
"github.com/temporalio/s2s-proxy/config"
"github.com/temporalio/s2s-proxy/logging"
clientmock "github.com/temporalio/s2s-proxy/mocks/client"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ type adminProxyServerInput struct {
func (s *adminserviceSuite) newAdminServiceProxyServer(in adminProxyServerInput, observer *ReplicationStreamObserver) adminservice.AdminServiceServer {
return NewAdminServiceProxyServer("test-service-name", s.adminClientMock,
s.adminClientMock,
in.apiOverrides, in.metricLabels, observer.ReportStreamValue, config.ShardCountConfig{}, LCMParameters{}, RoutingParameters{}, log.NewTestLogger(), nil, context.Background())
in.apiOverrides, in.metricLabels, observer.ReportStreamValue, config.ShardCountConfig{}, LCMParameters{},
RoutingParameters{}, logging.NewLoggerProvider(log.NewTestLogger(), config.NewMockConfigProvider(config.S2SProxyConfig{})), nil, context.Background())
}

func (s *adminserviceSuite) TestAddOrUpdateRemoteCluster() {
Expand Down
Loading
Loading