Skip to content
5 changes: 5 additions & 0 deletions providers/flagd/e2e/inprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package e2e
import (
"testing"

flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"

"github.com/open-feature/go-sdk-contrib/tests/flagd/testframework"
)

Expand All @@ -17,6 +19,9 @@ func TestInProcessProviderE2E(t *testing.T) {
runner := testframework.NewTestbedRunner(testframework.TestbedConfig{
ResolverType: testframework.InProcess,
TestbedConfig: "default",
ExtraOptions: []flagd.ProviderOption{
flagd.WithRetryBackoffMaxMs(3000),
},
})
defer runner.Cleanup()

Expand Down
32 changes: 26 additions & 6 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
defaultHost = "localhost"
defaultResolver = rpc
defaultGracePeriod = 5
defaultRetryBackoffMs = 1000
defaultRetryBackoffMaxMs = 120000
defaultFatalStatusCodes = ""

rpc ResolverType = "rpc"
Expand All @@ -47,6 +49,8 @@ const (
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD"
flagdRetryBackoffMsVariableName = "FLAGD_RETRY_BACKOFF_MS"
flagdRetryBackoffMaxMsVariableName = "FLAGD_RETRY_BACKOFF_MAX_MS"
flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES"
)

Expand All @@ -69,6 +73,8 @@ type ProviderConfiguration struct {
CustomSyncProviderUri string
GrpcDialOptionsOverride []grpc.DialOption
RetryGracePeriod int
RetryBackoffMs int
RetryBackoffMaxMs int
FatalStatusCodes []string

log logr.Logger
Expand All @@ -84,6 +90,8 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
Resolver: defaultResolver,
Tls: defaultTLS,
RetryGracePeriod: defaultGracePeriod,
RetryBackoffMs: defaultRetryBackoffMs,
RetryBackoffMaxMs: defaultRetryBackoffMaxMs,
}

p.updateFromEnvVar()
Expand Down Expand Up @@ -212,12 +220,10 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" {
cfg.TargetUri = targetUri
}
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
cfg.RetryGracePeriod = seconds
cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
}
}

cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use time.ParseDuration to parse durations.

cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log)
cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log)

var fatalStatusCodes string
if envVal := os.Getenv(flagdFatalStatusCodesVariableName); envVal != "" {
Expand Down Expand Up @@ -431,6 +437,20 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption {
}
}

// WithRetryBackoffMs sets the initial backoff duration (in milliseconds) for retrying failed connections
func WithRetryBackoffMs(retryBackoffMs int) ProviderOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A parameter that represents a duration should be of type time.Duration. The name of the parameter should be changed to remove any reference to a specific unit like milliseconds.

return func(p *ProviderConfiguration) {
p.RetryBackoffMs = retryBackoffMs
}
}

// WithRetryBackoffMaxMs sets the maximum backoff duration (in milliseconds) for retrying failed connections
func WithRetryBackoffMaxMs(retryBackoffMaxMs int) ProviderOption {
return func(p *ProviderConfiguration) {
p.RetryBackoffMaxMs = retryBackoffMaxMs
}
}

// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up
// and put the provider in a PROVIDER_FATAL state
func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption {
Expand Down
2 changes: 2 additions & 0 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod,
RetryBackOffMs: provider.providerConfiguration.RetryBackoffMs,
RetryBackOffMaxMs: provider.providerConfiguration.RetryBackoffMaxMs,
FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes,
})
default:
Expand Down
4 changes: 2 additions & 2 deletions providers/flagd/pkg/service/in_process/grpc_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (g *Sync) buildRetryPolicy() string {
},
"retryPolicy": RetryPolicy{
MaxAttempts: 3,
InitialBackoff: "1s",
MaxBackoff: "5s",
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
BackoffMultiplier: 2.0,
RetryableStatusCodes: []string{"UNKNOWN", "UNAVAILABLE"},
},
Expand Down
62 changes: 62 additions & 0 deletions providers/flagd/pkg/service/in_process/grpc_config_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,74 @@
package process

import (
"encoding/json"
"github.com/open-feature/flagd/core/pkg/logger"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"strings"
"testing"
)

func TestBuildRetryPolicy(t *testing.T) {
g := &Sync{
RetryBackOffMs: 100,
RetryBackOffMaxMs: 500,
}

result := g.buildRetryPolicy()

// Unmarshal to check structure
var policy map[string]interface{}
if err := json.Unmarshal([]byte(result), &policy); err != nil {
t.Fatalf("Failed to unmarshal result: %v", err)
}

methodConfig, ok := policy["methodConfig"].([]interface{})
if !ok || len(methodConfig) == 0 {
t.Fatalf("methodConfig missing or empty")
}

config := methodConfig[0].(map[string]interface{})
retryPolicy, ok := config["retryPolicy"].(map[string]interface{})
if !ok {
t.Fatalf("retryPolicy missing")
}

if retryPolicy["MaxAttempts"].(float64) != 3 {
t.Errorf("MaxAttempts = %v; want 3", retryPolicy["MaxAttempts"])
}
if retryPolicy["InitialBackoff"].(string) != "100ms" {
t.Errorf("InitialBackoff = %v; want 100ms", retryPolicy["InitialBackoff"])
}
if retryPolicy["MaxBackoff"].(string) != "500ms" {
t.Errorf("MaxBackoff = %v; want 500ms", retryPolicy["MaxBackoff"])
}
if retryPolicy["BackoffMultiplier"].(float64) != 2.0 {
t.Errorf("BackoffMultiplier = %v; want 2.0", retryPolicy["BackoffMultiplier"])
}
codes := retryPolicy["RetryableStatusCodes"].([]interface{})
expectedCodes := []string{"UNKNOWN", "UNAVAILABLE"}
for i, code := range expectedCodes {
if codes[i].(string) != code {
t.Errorf("RetryableStatusCodes[%d] = %v; want %v", i, codes[i], code)
}
}

// Also check that the result is valid JSON and contains expected substrings
if !strings.Contains(result, `"MaxAttempts":3`) {
t.Error("Result does not contain MaxAttempts")
}
if !strings.Contains(result, `"InitialBackoff":"100ms"`) {
t.Error("Result does not contain InitialBackoff")
}
if !strings.Contains(result, `"MaxBackoff":"500ms"`) {
t.Error("Result does not contain MaxBackoff")
}
if !strings.Contains(result, `"RetryableStatusCodes":["UNKNOWN","UNAVAILABLE"]`) {
t.Error("Result does not contain RetryableStatusCodes")
}
}

func TestSync_initNonRetryableStatusCodesSet(t *testing.T) {
tests := []struct {
name string
Expand Down
24 changes: 18 additions & 6 deletions providers/flagd/pkg/service/in_process/grpc_sync.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package process

import (
"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"context"
"fmt"
msync "sync"
"time"

"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/flagd/sync/v1"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
grpccredential "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
Expand All @@ -13,8 +16,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
msync "sync"
"time"
)

// FlagSyncServiceClient Type aliases for interfaces required by this component - needed for mock generation with gomock
Expand All @@ -38,6 +39,9 @@ type Sync struct {
Selector string
URI string
MaxMsgSize int
RetryGracePeriod int
RetryBackOffMs int
RetryBackOffMaxMs int
FatalStatusCodes []string

// Runtime state
Expand Down Expand Up @@ -190,13 +194,21 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
}
}
}

g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying...", err))
g.sendEvent(ctx, SyncEvent{event: of.ProviderError})

if ctx.Err() != nil {
return ctx.Err()
}

// Backoff before retrying.
// This is for unusual error scenarios when the normal gRPC retry/backoff policy (which only works on the connection level) is bypassed because the error is only at the stream (application level), and help avoids tight loops in that situation.
g.Logger.Warn(fmt.Sprintf("sync cycle failed: %v, retrying after %d backoff...", err, g.RetryBackOffMaxMs))
select {
case <-time.After(time.Duration(g.RetryBackOffMaxMs) * time.Millisecond):
// Backoff completed
case <-ctx.Done():
return ctx.Err()
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions providers/flagd/pkg/service/in_process/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type Configuration struct {
GrpcDialOptionsOverride []googlegrpc.DialOption
CertificatePath string
RetryGracePeriod int
RetryBackOffMs int
RetryBackOffMaxMs int
FatalStatusCodes []string
}

Expand Down Expand Up @@ -571,6 +573,8 @@ func createSyncProvider(cfg Configuration, log *logger.Logger) (isync.ISync, str
Selector: cfg.Selector,
URI: uri,
FatalStatusCodes: cfg.FatalStatusCodes,
RetryBackOffMaxMs: cfg.RetryBackOffMaxMs,
RetryBackOffMs: cfg.RetryBackOffMs,
}, uri
}

Expand Down
19 changes: 11 additions & 8 deletions providers/flagd/pkg/service/in_process/service_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ func TestInProcessProviderEvaluation(t *testing.T) {
}

inProcessService := NewInProcessService(Configuration{
Host: host,
Port: port,
Selector: scope,
TLSEnabled: false,
Host: host,
Port: port,
Selector: scope,
TLSEnabled: false,
RetryBackOffMaxMs: 5000,
RetryBackOffMs: 1000,
})

// when
Expand Down Expand Up @@ -138,9 +140,11 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}

inProcessService := NewInProcessService(Configuration{
TargetUri: "envoy://localhost:9211/foo.service",
Selector: scope,
TLSEnabled: false,
TargetUri: "envoy://localhost:9211/foo.service",
Selector: scope,
TLSEnabled: false,
RetryBackOffMaxMs: 5000,
RetryBackOffMs: 1000,
})

// when
Expand Down Expand Up @@ -201,7 +205,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}
}


// bufferedServer - a mock grpc service backed by buffered connection
type bufferedServer struct {
listener net.Listener
Expand Down
2 changes: 0 additions & 2 deletions tests/flagd/testframework/config_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ var ignoredOptions = []string{
"deadlineMs",
"streamDeadlineMs",
"keepAliveTime",
"retryBackoffMs",
"retryBackoffMaxMs",
"offlinePollIntervalMs",
}

Expand Down
Loading