Skip to content

Commit

Permalink
Protect the Kafka settings behind a mutex (jaegertracing#2308)
Browse files Browse the repository at this point in the history
Signed-off-by: Israel Blancas <[email protected]>
  • Loading branch information
iblancasa authored Sep 6, 2023
1 parent 2517ef3 commit 0273379
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 28 deletions.
7 changes: 2 additions & 5 deletions apis/v1/jaeger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,8 @@ const (
// FlagProvisionKafkaAuto represents the 'auto' value for the 'kafka-provision' flag
FlagProvisionKafkaAuto = "auto"

// FlagProvisionKafkaYes represents the value 'yes' for the 'kafka-provision' flag
FlagProvisionKafkaYes = "yes"

// FlagProvisionKafkaNo represents the value 'no' for the 'kafka-provision' flag
FlagProvisionKafkaNo = "no"
// FlagKafkaProvision represents the 'kafka-provision' flag.
FlagKafkaProvision = "kafka-provision"

// IngressSecurityNone disables any form of security for ingress objects (default)
IngressSecurityNone IngressSecurityType = ""
Expand Down
12 changes: 6 additions & 6 deletions pkg/autodetect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,28 +321,28 @@ func (b *Background) detectElasticsearch(ctx context.Context, apiList []*metav1.

// detectKafka checks whether the Kafka Operator is available
func (b *Background) detectKafka(_ context.Context, apiList []*metav1.APIResourceList) {
currentKafkaProvision := viper.GetString("kafka-provision")
currentKafkaProvision := OperatorConfiguration.GetKafkaIntegration()
if !b.retryDetectKafka {
log.Log.V(-1).Info(
"The 'kafka-provision' option is explicitly set",
"kafka-provision", currentKafkaProvision,
"kafka-provision", currentKafkaProvision.String(),
)
return
}

log.Log.V(-1).Info("Determining whether we should enable the Kafka Operator integration")

kafkaProvision := v1.FlagProvisionKafkaNo
kafkaProvision := KafkaOperatorIntegrationNo
if isKafkaOperatorAvailable(apiList) {
kafkaProvision = v1.FlagProvisionKafkaYes
kafkaProvision = KafkaOperatorIntegrationYes
}

if currentKafkaProvision != kafkaProvision {
log.Log.Info(
"Automatically adjusted the 'kafka-provision' flag",
"kafka-provision", kafkaProvision,
"kafka-provision", kafkaProvision.String(),
)
viper.Set("kafka-provision", kafkaProvision)
OperatorConfiguration.SetKafkaIntegration(kafkaProvision)
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/autodetect/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestAutoDetectKafkaProvisionNoKafkaOperator(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaNo, viper.GetString("kafka-provision"))
assert.False(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectKafkaProvisionWithKafkaOperator(t *testing.T) {
Expand All @@ -344,12 +344,12 @@ func TestAutoDetectKafkaProvisionWithKafkaOperator(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaYes, viper.GetString("kafka-provision"))
assert.True(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectKafkaExplicitYes(t *testing.T) {
// prepare
viper.Set("kafka-provision", v1.FlagProvisionKafkaYes)
OperatorConfiguration.SetKafkaIntegration(KafkaOperatorIntegrationYes)
defer viper.Reset()

dcl := &fakeDiscoveryClient{}
Expand All @@ -360,12 +360,12 @@ func TestAutoDetectKafkaExplicitYes(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaYes, viper.GetString("kafka-provision"))
assert.True(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectKafkaExplicitNo(t *testing.T) {
// prepare
viper.Set("kafka-provision", v1.FlagProvisionKafkaNo)
OperatorConfiguration.SetKafkaIntegration(KafkaOperatorIntegrationNo)
defer viper.Reset()

dcl := &fakeDiscoveryClient{}
Expand All @@ -376,7 +376,7 @@ func TestAutoDetectKafkaExplicitNo(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaNo, viper.GetString("kafka-provision"))
assert.False(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectKafkaDefaultNoOperator(t *testing.T) {
Expand All @@ -392,7 +392,7 @@ func TestAutoDetectKafkaDefaultNoOperator(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaNo, viper.GetString("kafka-provision"))
assert.False(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectKafkaDefaultWithOperator(t *testing.T) {
Expand All @@ -417,7 +417,7 @@ func TestAutoDetectKafkaDefaultWithOperator(t *testing.T) {
b.autoDetectCapabilities()

// verify
assert.Equal(t, v1.FlagProvisionKafkaYes, viper.GetString("kafka-provision"))
assert.True(t, OperatorConfiguration.IsKafkaOperatorIntegrationEnabled())
}

func TestAutoDetectCronJobsVersion(t *testing.T) {
Expand Down
48 changes: 48 additions & 0 deletions pkg/autodetect/operatorconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ func (p ESOperatorIntegration) String() string {
return [...]string{"Yes", "No"}[p]
}

// KafkaOperatorIntegration holds the if the Kafka Operator integration is enabled.
type KafkaOperatorIntegration int

const (
// KafkaOperatorIntegrationYes represents the Kafka Operator integration is enabled.
KafkaOperatorIntegrationYes KafkaOperatorIntegration = iota

// KafkaOperatorIntegrationNo represents the Kafka Operator integration is disabled.
KafkaOperatorIntegrationNo
)

func (p KafkaOperatorIntegration) String() string {
return [...]string{"Yes", "No"}[p]
}

var OperatorConfiguration operatorConfigurationWrapper

type operatorConfigurationWrapper struct {
Expand Down Expand Up @@ -112,3 +127,36 @@ func (c *operatorConfigurationWrapper) GetESPIntegration() ESOperatorIntegration
func (c *operatorConfigurationWrapper) IsESOperatorIntegrationEnabled() bool {
return c.GetESPIntegration() == ESOperatorIntegrationYes
}

func (c *operatorConfigurationWrapper) SetKafkaIntegration(e interface{}) {
var integration string
switch v := e.(type) {
case string:
integration = v
case KafkaOperatorIntegration:
integration = v.String()
default:
integration = KafkaOperatorIntegrationNo.String()
}

c.mu.Lock()
viper.Set(v1.FlagKafkaProvision, integration)
c.mu.Unlock()
}

func (c *operatorConfigurationWrapper) GetKafkaIntegration() KafkaOperatorIntegration {
c.mu.RLock()
e := viper.GetString(v1.FlagKafkaProvision)
c.mu.RUnlock()

if strings.ToLower(e) == "yes" {
return KafkaOperatorIntegrationYes
}
return KafkaOperatorIntegrationNo
}

// IsKafkaOperatorIntegrationEnabled returns true if the integration with the
// Kafaka Operator is enabled
func (c *operatorConfigurationWrapper) IsKafkaOperatorIntegrationEnabled() bool {
return c.GetKafkaIntegration() == KafkaOperatorIntegrationYes
}
2 changes: 1 addition & 1 deletion pkg/controller/jaeger/jaeger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strat

kafkas := str.Kafkas()
kafkaUsers := str.KafkaUsers()
if strings.EqualFold(viper.GetString("kafka-provision"), v1.FlagProvisionKafkaYes) {
if autodetect.OperatorConfiguration.IsKafkaOperatorIntegrationEnabled() {
if err := r.applyKafkas(ctx, jaeger, kafkas); err != nil {
return jaeger, tracing.HandleError(err, span)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jaeger/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "github.com/jaegertracing/jaeger-operator/apis/v1"
"github.com/jaegertracing/jaeger-operator/pkg/autodetect"
"github.com/jaegertracing/jaeger-operator/pkg/kafka/v1beta2"
kafkav1beta2 "github.com/jaegertracing/jaeger-operator/pkg/kafka/v1beta2"
"github.com/jaegertracing/jaeger-operator/pkg/strategy"
)

func TestKafkaCreate(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestKafkaCreate(t *testing.T) {

func TestKafkaUpdate(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestKafkaUpdate(t *testing.T) {

func TestKafkaDelete(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestKafkaDelete(t *testing.T) {

func TestKafkaCreateExistingNameInAnotherNamespace(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jaeger/kafkauser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "github.com/jaegertracing/jaeger-operator/apis/v1"
"github.com/jaegertracing/jaeger-operator/pkg/autodetect"
"github.com/jaegertracing/jaeger-operator/pkg/kafka/v1beta2"
kafkav1beta2 "github.com/jaegertracing/jaeger-operator/pkg/kafka/v1beta2"
"github.com/jaegertracing/jaeger-operator/pkg/strategy"
)

func TestKafkaUserCreate(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestKafkaUserCreate(t *testing.T) {

func TestKafkaUserUpdate(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestKafkaUserUpdate(t *testing.T) {

func TestKafkaUserDelete(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestKafkaUserDelete(t *testing.T) {

func TestKafkaUserCreateExistingNameInAnotherNamespace(t *testing.T) {
// prepare
viper.SetDefault("kafka-provision", v1.FlagProvisionKafkaYes)
autodetect.OperatorConfiguration.SetKafkaIntegration(autodetect.KafkaOperatorIntegrationYes)
defer viper.Reset()

nsn := types.NamespacedName{
Expand Down

0 comments on commit 0273379

Please sign in to comment.