Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions connector/topologyconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
)

const (
// cache defaults
defaultCacheSize = 5000
maxCacheSize = 1000_000
minValidTTL = 100 * time.Nanosecond // to avoid hashicorp/golang-lru ticker panic
)

// Config defines the configuration options for topologyconnector.
type Config struct {
ExpressionCacheSettings CacheSettings `mapstructure:"expression_cache_settings"`
TagRegexCacheSettings CacheSettings `mapstructure:"tag_regex_cache_settings"`
TagTemplateCacheSettings CacheSettings `mapstructure:"tag_template_cache_settings"`
ExpressionCache CacheSettings `mapstructure:"expression_cache"`
TagRegexCache CacheSettings `mapstructure:"tag_regex_cache"`
TagTemplateCache CacheSettings `mapstructure:"tag_template_cache"`
Deduplication DeduplicationSettings `mapstructure:"deduplication"`
}

type CacheSettings struct {
Expand All @@ -40,14 +42,25 @@ func (c *CacheSettings) ToMetered(
}
}

// DeduplicationSettings controls pre-mapping CacheTTL-aware deduplication.
type DeduplicationSettings struct {
// Enabled toggles deduplication entirely.
Enabled bool `mapstructure:"enabled"`
// RefreshFraction determines the fraction of CacheTTL under which an already-sent element is considered
// "recent" and therefore skipped. For example 0.5 means: if last-sent < CacheTTL/2 ago, skip/suppress.
// Acceptable range: 0.1 .. 0.9. If 0, defaults to 0.5.
RefreshFraction float64 `mapstructure:"refresh_fraction"`
Cache CacheSettings `mapstructure:"cache"`
}

func (c *CacheSettings) Validate(cacheName string) error {
if c.Size <= 0 {
c.Size = defaultCacheSize
} else if c.Size > maxCacheSize {
return fmt.Errorf("%s.size: must not exceed %d", cacheName, maxCacheSize)
}

// TTL = 0 means "no expiration", so allow it
// CacheTTL = 0 means "no expiration", so allow it
if c.TTL < 0 {
return fmt.Errorf("%s.ttl: must not be negative (0 disables expiration)", cacheName)
} else if c.TTL > 0 && c.TTL <= minValidTTL {
Expand All @@ -57,14 +70,32 @@ func (c *CacheSettings) Validate(cacheName string) error {
return nil
}

func (d *DeduplicationSettings) Validate() error {
// Normalise deduplication refresh fraction
if d.RefreshFraction <= 0 {
d.RefreshFraction = 0.5
} else if d.RefreshFraction > 1 {
d.RefreshFraction = 1
}

if err := d.Cache.Validate("deduplication_cache"); err != nil {
return err
}

return nil
}

func (cfg *Config) Validate() error {
if err := cfg.ExpressionCacheSettings.Validate("expression_cache"); err != nil {
if err := cfg.ExpressionCache.Validate("expression_cache"); err != nil {
return err
}
if err := cfg.TagRegexCache.Validate("tag_regex_cache"); err != nil {
return err
}
if err := cfg.TagRegexCacheSettings.Validate("tag_regex_cache"); err != nil {
if err := cfg.TagTemplateCache.Validate("tag_template_cache"); err != nil {
return err
}
if err := cfg.TagTemplateCacheSettings.Validate("tag_template_cache"); err != nil {
if err := cfg.Deduplication.Validate(); err != nil {
return err
}
return nil
Expand Down
93 changes: 85 additions & 8 deletions connector/topologyconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "valid config",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: 1000,
TTL: 10 * time.Minute,
},
Expand All @@ -31,7 +31,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "apply default size if not set",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: 0,
TTL: 10 * time.Minute,
},
Expand All @@ -41,7 +41,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "return error if size exceeds max",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: maxCacheSize + 1,
TTL: 10 * time.Minute,
},
Expand All @@ -51,7 +51,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "allow ttl = 0 (no expiration)",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: 100,
TTL: 0,
},
Expand All @@ -60,7 +60,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "return error if ttl is too small",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: 100,
TTL: 50 * time.Nanosecond, // <= minValidTTL
},
Expand All @@ -70,7 +70,7 @@ func TestConfig_Validation(t *testing.T) {
{
name: "return error if ttl is negative",
cfg: &Config{
ExpressionCacheSettings: CacheSettings{
ExpressionCache: CacheSettings{
Size: 100,
TTL: -1 * time.Second,
},
Expand All @@ -92,10 +92,87 @@ func TestConfig_Validation(t *testing.T) {
require.NoError(t, err)

if tt.wantSize > 0 {
assert.Equal(t, tt.wantSize, tt.cfg.ExpressionCacheSettings.Size)
assert.Equal(t, tt.wantSize, tt.cfg.ExpressionCache.Size)
}
if tt.wantTTLMin > 0 {
assert.Equal(t, tt.wantTTLMin, tt.cfg.ExpressionCacheSettings.TTL)
assert.Equal(t, tt.wantTTLMin, tt.cfg.ExpressionCache.TTL)
}
})
}
}

func TestDeduplicationSettings_Validate(t *testing.T) {
tests := []struct {
name string
dedup DeduplicationSettings
wantErr string
wantRefreshFraction float64
wantCacheSize int
}{
{
name: "valid deduplication settings",
dedup: DeduplicationSettings{
Enabled: true,
RefreshFraction: 0.4,
Cache: CacheSettings{
Size: 100,
TTL: 5 * time.Minute,
},
},
wantRefreshFraction: 0.4,
},
{
name: "refresh_fraction < 0 defaults to 0.5",
dedup: DeduplicationSettings{
RefreshFraction: -1,
Cache: CacheSettings{
Size: 100,
TTL: 1 * time.Minute,
},
},
wantRefreshFraction: 0.5,
},
{
name: "refresh_fraction == 0 defaults to 0.5",
dedup: DeduplicationSettings{
RefreshFraction: 0,
Cache: CacheSettings{
Size: 100,
TTL: 1 * time.Minute,
},
},
wantRefreshFraction: 0.5,
},
{
name: "refresh_fraction > 1 clamps to 1",
dedup: DeduplicationSettings{
RefreshFraction: 1.5,
Cache: CacheSettings{
Size: 100,
TTL: 1 * time.Minute,
},
},
wantRefreshFraction: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.dedup.Validate()

if tt.wantErr != "" {
require.Error(t, err)
assert.EqualError(t, err, tt.wantErr)
return
}

require.NoError(t, err)

if tt.wantRefreshFraction > 0 {
assert.Equal(t, tt.wantRefreshFraction, tt.dedup.RefreshFraction)
}
if tt.wantCacheSize > 0 {
assert.Equal(t, tt.wantCacheSize, tt.dedup.Cache.Size)
}
})
}
Expand Down
48 changes: 27 additions & 21 deletions connector/topologyconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ import (
)

type connectorImpl struct {
cfg *Config
logger *zap.Logger
logsConsumer consumer.Logs
settingsProvider stsSettingsApi.StsSettingsProvider
snapshotManager *SnapshotManager
eval *internal.CelEvaluator
mapper *internal.Mapper
metricsRecorder metrics.ConnectorMetricsRecorder
supportedSignal settings.OtelInputSignal
cfg *Config
logger *zap.Logger
logsConsumer consumer.Logs
settingsProvider stsSettingsApi.StsSettingsProvider
snapshotManager *SnapshotManager
expressionRefManager ExpressionRefManager
eval internal.ExpressionEvaluator
deduplicator internal.Deduplicator
mapper *internal.Mapper
metricsRecorder metrics.ConnectorMetricsRecorder
supportedSignal settings.OtelInputSignal
}

func newConnector(
Expand All @@ -38,20 +40,24 @@ func newConnector(
telemetrySettings component.TelemetrySettings,
nextConsumer consumer.Logs,
snapshotManager *SnapshotManager,
eval *internal.CelEvaluator,
expressionRefManager ExpressionRefManager,
eval internal.ExpressionEvaluator,
deduplicator internal.Deduplicator,
mapper *internal.Mapper,
supportedSignal settings.OtelInputSignal,
) *connectorImpl {
logger.Info("Building topology connector")
return &connectorImpl{
cfg: &cfg,
logger: logger,
logsConsumer: nextConsumer,
eval: eval,
mapper: mapper,
snapshotManager: snapshotManager,
metricsRecorder: metrics.NewConnectorMetrics(Type.String(), telemetrySettings),
supportedSignal: supportedSignal,
cfg: &cfg,
logger: logger,
logsConsumer: nextConsumer,
eval: eval,
deduplicator: deduplicator,
mapper: mapper,
snapshotManager: snapshotManager,
expressionRefManager: expressionRefManager,
metricsRecorder: metrics.NewConnectorMetrics(Type.String(), telemetrySettings),
supportedSignal: supportedSignal,
}
}

Expand Down Expand Up @@ -93,14 +99,14 @@ func (p *connectorImpl) Capabilities() consumer.Capabilities {

func (p *connectorImpl) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
start := time.Now()

collectionTimestampMs := time.Now().UnixMilli()
componentMappings, relationMappings := p.snapshotManager.Current(p.supportedSignal)

componentMappings, relationMappings := p.snapshotManager.Current(p.supportedSignal)
messagesWithKeys := internal.ConvertMetricsToTopologyStreamMessage(
ctx,
p.logger,
p.eval,
p.deduplicator,
p.mapper,
metrics,
componentMappings,
Expand Down Expand Up @@ -129,6 +135,7 @@ func (p *connectorImpl) ConsumeTraces(ctx context.Context, traceData ptrace.Trac
ctx,
p.logger,
p.eval,
p.deduplicator,
p.mapper,
traceData,
componentMappings,
Expand All @@ -138,7 +145,6 @@ func (p *connectorImpl) ConsumeTraces(ctx context.Context, traceData ptrace.Trac
)

duration := time.Since(start)

p.publishMessagesAsLogs(ctx, messages)

p.metricsRecorder.RecordRequestDuration(
Expand Down
7 changes: 6 additions & 1 deletion connector/topologyconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ func newConnectorEnv(t *testing.T, signal settings.OtelInputSignal) *connectorTe
logsConsumer := &consumertest.LogsSink{}
logger := zaptest.NewLogger(t)

snapshotManager := NewSnapshotManager(logger, []settings.OtelInputSignal{settings.TRACES, settings.METRICS})
celEvaluator, _ := internal.NewCELEvaluator(
ctx,
metrics.MeteredCacheSettings{
Name: "expression_cache",
EnableMetrics: false,
},
)
expressionRefManager := NewExpressionRefManager(logger, celEvaluator)
snapshotManager := NewSnapshotManager(
logger, []settings.OtelInputSignal{settings.TRACES, settings.METRICS}, expressionRefManager,
)
mapper := internal.NewMapper(
ctx,
metrics.MeteredCacheSettings{
Expand All @@ -107,7 +110,9 @@ func newConnectorEnv(t *testing.T, signal settings.OtelInputSignal) *connectorTe
componenttest.NewNopTelemetrySettings(),
logsConsumer,
snapshotManager,
expressionRefManager,
celEvaluator,
internal.NewNoopDeduplicator(),
mapper,
signal,
)
Expand Down
Loading
Loading