Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ provider, err := flagd.NewProvider(flagd.WithInProcessResolver())
openfeature.SetProvider(provider)
```

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).
In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8015` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).

#### Custom sync provider

Expand Down Expand Up @@ -95,7 +95,7 @@ Configuration can be provided as constructor options or as environment variables
| Option name | Environment variable name | Type & supported value | Default | Compatible resolver |
|----------------------------------------------------------|--------------------------------|-----------------------------|-----------|---------------------|
| WithHost | FLAGD_HOST | string | localhost | rpc & in-process |
| WithPort | FLAGD_PORT | number | 8013 | rpc & in-process |
| WithPort | FLAGD_PORT (rpc), FLAGD_SYNC_PORT or FLAGD_PORT (in-process) | number | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| WithTargetUri | FLAGD_TARGET_URI | string | "" | in-process |
| WithTLS | FLAGD_TLS | boolean | false | rpc & in-process |
| WithSocketPath | FLAGD_SOCKET_PATH | string | "" | rpc & in-process |
Expand All @@ -106,6 +106,8 @@ Configuration can be provided as constructor options or as environment variables
| WithProviderID | FLAGD_SOURCE_PROVIDER_ID | string | "" | in-process |
| WithSelector | FLAGD_SOURCE_SELECTOR | string | "" | in-process |

> **Note:** For the in-process resolver, `FLAGD_SYNC_PORT` takes priority over `FLAGD_PORT`. The `FLAGD_PORT` environment variable is still supported for backwards compatibility.

### Overriding behavior

By default, the flagd provider will read non-empty environment variables to set its own configuration with the lowest priority.
Expand Down
49 changes: 35 additions & 14 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (

flagdHostEnvironmentVariableName = "FLAGD_HOST"
flagdPortEnvironmentVariableName = "FLAGD_PORT"
flagdSyncPortEnvironmentVariableName = "FLAGD_SYNC_PORT"
flagdTLSEnvironmentVariableName = "FLAGD_TLS"
flagdSocketPathEnvironmentVariableName = "FLAGD_SOCKET_PATH"
flagdServerCertPathEnvironmentVariableName = "FLAGD_SERVER_CERT_PATH"
Expand Down Expand Up @@ -130,20 +131,6 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {

// updateFromEnvVar is a utility to update configurations based on current environment variables
func (cfg *ProviderConfiguration) updateFromEnvVar() {
portS := os.Getenv(flagdPortEnvironmentVariableName)
if portS != "" {
port, err := strconv.Atoi(portS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf(
"invalid env config for %s provided, using default value: %d or %d depending on resolver",
flagdPortEnvironmentVariableName, defaultRpcPort, defaultInProcessPort,
))
} else {
cfg.Port = uint16(port)
}
}

if host := os.Getenv(flagdHostEnvironmentVariableName); host != "" {
cfg.Host = host
}
Expand Down Expand Up @@ -212,6 +199,9 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
}
}

// Parse port after resolver is known to handle FLAGD_SYNC_PORT for in-process resolver
cfg.updatePortFromEnvVar()

if offlinePath := os.Getenv(flagdOfflinePathEnvironmentVariableName); offlinePath != "" {
cfg.OfflineFlagSourcePath = offlinePath
}
Expand All @@ -238,6 +228,37 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {

}

// updatePortFromEnvVar updates the port configuration from environment variables.
// For in-process resolver, FLAGD_SYNC_PORT takes priority over FLAGD_PORT (backwards compatibility).
// For rpc resolver, only FLAGD_PORT is used.
func (cfg *ProviderConfiguration) updatePortFromEnvVar() {
var portS string
var envVarName string

if cfg.Resolver == inProcess {
portS = os.Getenv(flagdSyncPortEnvironmentVariableName)
envVarName = flagdSyncPortEnvironmentVariableName
}

if portS == "" {
portS = os.Getenv(flagdPortEnvironmentVariableName)
envVarName = flagdPortEnvironmentVariableName
}

if portS != "" {
port, err := strconv.Atoi(portS)
if err != nil {
cfg.log.Error(err,
fmt.Sprintf(
"invalid env config for %s provided, using default value: %d or %d depending on resolver",
envVarName, defaultRpcPort, defaultInProcessPort,
))
} else {
cfg.Port = uint16(port)
}
}
}

// ProviderOptions

type ProviderOption func(*ProviderConfiguration)
Expand Down
100 changes: 100 additions & 0 deletions providers/flagd/pkg/configuration_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package flagd

import (
"os"
"testing"

"github.com/go-logr/logr"
)

func TestConfigureProviderConfigurationInProcessWithOfflineFile(t *testing.T) {
Expand Down Expand Up @@ -64,3 +67,100 @@
t.Errorf("Error expected but check succeeded")
}
}

func TestUpdatePortFromEnvVarInProcessWithSyncPort(t *testing.T) {
// given
os.Setenv("FLAGD_SYNC_PORT", "9999")

Check failure on line 73 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Setenv` is not checked (errcheck)
defer os.Unsetenv("FLAGD_SYNC_PORT")

Check failure on line 74 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Unsetenv` is not checked (errcheck)

providerConfiguration := &ProviderConfiguration{
Resolver: inProcess,
log: logr.Discard(),
}

// when
providerConfiguration.updatePortFromEnvVar()

// then
if providerConfiguration.Port != 9999 {
t.Errorf("incorrect Port, expected %v, got %v", 9999, providerConfiguration.Port)
}
}

func TestUpdatePortFromEnvVarInProcessWithLegacyPort(t *testing.T) {
// given - for backwards compatibility, FLAGD_PORT should work for in-process
os.Setenv("FLAGD_PORT", "8888")

Check failure on line 92 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Setenv` is not checked (errcheck)
defer os.Unsetenv("FLAGD_PORT")

Check failure on line 93 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Unsetenv` is not checked (errcheck)

providerConfiguration := &ProviderConfiguration{
Resolver: inProcess,
log: logr.Discard(),
}

// when
providerConfiguration.updatePortFromEnvVar()

// then
if providerConfiguration.Port != 8888 {
t.Errorf("incorrect Port, expected %v, got %v", 8888, providerConfiguration.Port)
}
}

func TestUpdatePortFromEnvVarInProcessSyncPortPriority(t *testing.T) {
// given - FLAGD_SYNC_PORT takes priority over FLAGD_PORT
os.Setenv("FLAGD_SYNC_PORT", "9999")

Check failure on line 111 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Setenv` is not checked (errcheck)
os.Setenv("FLAGD_PORT", "8888")

Check failure on line 112 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Setenv` is not checked (errcheck)
defer os.Unsetenv("FLAGD_SYNC_PORT")

Check failure on line 113 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Unsetenv` is not checked (errcheck)
defer os.Unsetenv("FLAGD_PORT")

Check failure on line 114 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Unsetenv` is not checked (errcheck)

providerConfiguration := &ProviderConfiguration{
Resolver: inProcess,
log: logr.Discard(),
}

// when
providerConfiguration.updatePortFromEnvVar()

// then
if providerConfiguration.Port != 9999 {
t.Errorf("incorrect Port, expected %v, got %v", 9999, providerConfiguration.Port)
}
}

func TestUpdatePortFromEnvVarRpcWithPort(t *testing.T) {
// given - RPC resolver uses FLAGD_PORT
os.Setenv("FLAGD_PORT", "8888")

Check failure on line 132 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Setenv` is not checked (errcheck)
defer os.Unsetenv("FLAGD_PORT")

Check failure on line 133 in providers/flagd/pkg/configuration_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `os.Unsetenv` is not checked (errcheck)

providerConfiguration := &ProviderConfiguration{
Resolver: rpc,
log: logr.Discard(),
}

// when
providerConfiguration.updatePortFromEnvVar()

// then
if providerConfiguration.Port != 8888 {
t.Errorf("incorrect Port, expected %v, got %v", 8888, providerConfiguration.Port)
}
}

func TestUpdatePortFromEnvVarRpcIgnoresSyncPort(t *testing.T) {
// given - RPC resolver should NOT use FLAGD_SYNC_PORT
os.Setenv("FLAGD_SYNC_PORT", "9999")
defer os.Unsetenv("FLAGD_SYNC_PORT")

providerConfiguration := &ProviderConfiguration{
Resolver: rpc,
log: logr.Discard(),
}

// when
providerConfiguration.updatePortFromEnvVar()

// then - port should remain 0 (unset) since FLAGD_PORT is not set
if providerConfiguration.Port != 0 {
t.Errorf("incorrect Port, expected %v, got %v", 0, providerConfiguration.Port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}

inProcessService := NewInProcessService(Configuration{
TargetUri: "envoy://localhost:9211/foo.service",
TargetUri: "envoy://localhost:9211/foo.service",
Selector: scope,
TLSEnabled: false,
})
Expand Down Expand Up @@ -201,7 +201,6 @@ func TestInProcessProviderEvaluationEnvoy(t *testing.T) {
}
}


// bufferedServer - a mock grpc service backed by buffered connection
type bufferedServer struct {
listener net.Listener
Expand Down
8 changes: 4 additions & 4 deletions providers/flagd/pkg/service/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *Service) EventChannel() <-chan of.Event {
// If retrying is exhausted, an event with openfeature.ProviderError will be emitted.
func (s *Service) startEventStream(ctx context.Context) {
streamReadySignaled := false

// wraps connection with retry attempts
for s.retryCounter.retry() {
s.logger.V(logger.Debug).Info("connecting to event stream")
Expand Down Expand Up @@ -488,12 +488,12 @@ func (s *Service) startEventStream(ctx context.Context) {
// retry attempts exhausted. Disable cache and emit error event
s.cache.Disable()
connErr := fmt.Errorf("grpc connection establishment failed")

// Signal error if we haven't signaled success yet
if !streamReadySignaled {
s.signalStreamReady(connErr)
}

s.sendEvent(ctx, of.Event{
ProviderName: "flagd",
EventType: of.ProviderError,
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) streamClient(ctx context.Context, streamReadySignaled *bool) e
}

s.logger.V(logger.Info).Info("connected to event stream")

// Signal successful connection to Init() - stream is now ready
if !*streamReadySignaled {
s.signalStreamReady(nil) // nil means success
Expand Down
Loading