Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/ldclient-rb/impl/data_system/fdv1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,22 @@ def initialize(sdk_key, config)
@data_store_broadcaster
)

# Wrap the data store with client wrapper (must be created before status provider)
# Preserve the original unwrapped store to avoid nested wrappers on postfork
original_store = @config.feature_store
if original_store.is_a?(LaunchDarkly::Impl::FeatureStoreClientWrapper)
original_store = original_store.instance_variable_get(:@store)
end

# Wrap the original data store with client wrapper (must be created before status provider)
@store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new(
@config.feature_store,
original_store,
@data_store_update_sink,
@config.logger
)

# Update config to use wrapped store so data sources can access it
@config.instance_variable_set(:@feature_store, @store_wrapper)

# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
Expand Down Expand Up @@ -83,6 +92,7 @@ def start
# (see DataSystem#stop)
def stop
@update_processor&.stop
@store_wrapper.stop
@shared_executor.shutdown
end

Expand Down Expand Up @@ -156,6 +166,8 @@ def target_availability
end

# Polling processor
@config.logger.info { "Disabling streaming API" }
@config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config)
LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor)
end
Expand Down
126 changes: 38 additions & 88 deletions lib/ldclient-rb/ldclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@
require "ldclient-rb/impl/broadcaster"
require "ldclient-rb/impl/context"
require "ldclient-rb/impl/data_source"
require "ldclient-rb/impl/data_source/null_processor"
require "ldclient-rb/impl/data_source/polling"
require "ldclient-rb/impl/data_source/requestor"
require "ldclient-rb/impl/data_source/stream"
require "ldclient-rb/impl/data_store"
require "ldclient-rb/impl/data_system/fdv1"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/evaluation_with_hook_result"
require "ldclient-rb/impl/evaluator"
require "ldclient-rb/impl/flag_tracker"
require "ldclient-rb/impl/migrations/tracker"
require "ldclient-rb/impl/store_client_wrapper"
require "ldclient-rb/impl/util"
require "ldclient-rb/events"
require "ldclient-rb/in_memory_store"
require "concurrent"
require "concurrent/atomics"
require "digest/sha1"
Expand Down Expand Up @@ -57,11 +52,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to
# fail in most configurations. However, there are some configurations where it would be OK
# (offline = true, *or* we are using LDD mode or the file data source and events are disabled
# so we're not connecting to any LD services) so rather than try to check for all of those
# up front, we will let the constructors for the data source implementations implement this
# fail-fast as appropriate, and just check here for the part regarding events.
if !config.offline? && config.send_events
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?
# so we're not connecting to any LD services).
if !config.offline? && sdk_key.nil?
# SDK key can be nil only if using LDD or custom data source with events disabled
if config.send_events || (!config.use_ldd? && config.data_source.nil?)
raise ArgumentError, "sdk_key must not be nil"
end
end

@sdk_key = sdk_key
Expand Down Expand Up @@ -89,9 +85,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization
#
def postfork(wait_for_sec = 5)
@data_source = nil
@data_system = nil
@event_processor = nil
@big_segment_store_manager = nil
@flag_tracker = nil

start_up(wait_for_sec)
end
Expand All @@ -102,32 +99,22 @@ def postfork(wait_for_sec = 5)

@hooks = Concurrent::Array.new(@config.hooks + plugin_hooks)

@shared_executor = Concurrent::SingleThreadExecutor.new

data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster)

# We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add
# some necessary logic around updates. Unfortunately, we have code elsewhere that accesses
# the feature store through the Config object, so we need to make a new Config that uses
# the wrapped store.
@store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger)
updated_config = @config.clone
updated_config.instance_variable_set(:@feature_store, @store)
@config = updated_config

@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink)
# Initialize the data system (FDv1 for now, will support FDv2 in the future)
# Note: FDv1 will update @config.feature_store to use its wrapped store
@data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config)

# Components not managed by data system
@big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger)
@big_segment_store_status_provider = @big_segment_store_manager.status_provider

get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) }
get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) }
get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) }
@evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger)

if [email protected]? && @config.send_events && [email protected]_opt_out?
diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key))
@data_system.set_diagnostic_accumulator(diagnostic_accumulator)
else
diagnostic_accumulator = nil
end
Expand All @@ -138,38 +125,14 @@ def postfork(wait_for_sec = 5)
@event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator)
end

if @config.use_ldd?
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
return # requestor and update processor are not used in this mode
end

flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) })

data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)

# Make the update sink available on the config so that our data source factory can access the sink with a shared executor.
@config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster)

@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink)

data_source_or_factory = @config.data_source || self.method(:create_default_data_source)
if data_source_or_factory.respond_to? :call
# Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in
# which case they take three parameters. This will be changed in the future to use a less awkware mechanism.
if data_source_or_factory.arity == 3
@data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator)
else
@data_source = data_source_or_factory.call(@sdk_key, @config)
end
else
@data_source = data_source_or_factory
end
# Create the flag tracker using the broadcaster from the data system
eval_fn = lambda { |key, context| variation(key, context, nil) }
@flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn)

register_plugins(environment_metadata)

ready = @data_source.start
# Start the data system
ready = @data_system.start

return unless wait_for_sec > 0

Expand All @@ -180,7 +143,7 @@ def postfork(wait_for_sec = 5)
ok = ready.wait(wait_for_sec)
if !ok
@config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" }
elsif !@data_source.initialized?
elsif !initialized?
@config.logger.error { "[LDClient] LaunchDarkly client initialization failed" }
end
end
Expand Down Expand Up @@ -295,7 +258,7 @@ def secure_mode_hash(context)
# @return [Boolean] true if the client has been initialized
#
def initialized?
@config.offline? || @config.use_ldd? || @data_source.initialized?
@data_system.data_availability == @data_system.target_availability
end

#
Expand Down Expand Up @@ -601,7 +564,7 @@ def all_flags_state(context, options={})
return FeatureFlagsState.new(false) if @config.offline?

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" }
else
@config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" }
Expand All @@ -616,7 +579,7 @@ def all_flags_state(context, options={})
end

begin
features = @store.all(Impl::DataStore::FEATURES)
features = @data_system.store.all(Impl::DataStore::FEATURES)
rescue => exn
Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn)
return FeatureFlagsState.new(false)
Expand Down Expand Up @@ -663,11 +626,9 @@ def all_flags_state(context, options={})
# @return [void]
def close
@config.logger.info { "[LDClient] Closing LaunchDarkly client..." }
@data_source.stop
@data_system.stop
@event_processor.stop
@big_segment_store_manager.stop
@store.stop
@shared_executor.shutdown
end

#
Expand All @@ -690,7 +651,9 @@ def close
#
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
#
attr_reader :data_store_status_provider
def data_store_status_provider
@data_system.data_store_status_provider
end

#
# Returns an interface for tracking the status of the data source.
Expand All @@ -703,7 +666,9 @@ def close
#
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
#
attr_reader :data_source_status_provider
def data_source_status_provider
@data_system.data_source_status_provider
end

#
# Returns an interface for tracking changes in feature flag configurations.
Expand All @@ -712,23 +677,8 @@ def close
# requesting notifications about feature flag changes using an event
# listener model.
#
attr_reader :flag_tracker

private

def create_default_data_source(sdk_key, config, diagnostic_accumulator)
if config.offline?
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
end
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
if config.stream?
Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator)
else
config.logger.info { "Disabling streaming API" }
config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = Impl::DataSource::Requestor.new(sdk_key, config)
Impl::DataSource::PollingProcessor.new(config, requestor)
end
def flag_tracker
@flag_tracker
end

#
Expand All @@ -738,7 +688,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def variation_with_flag(key, context, default)
private def variation_with_flag(key, context, default)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file used a mix of per method private and all methods after the declaration on line 717 of the original file. Moving to defining per method.

evaluate_internal(key, context, default, false)
end

Expand All @@ -750,7 +700,7 @@ def variation_with_flag(key, context, default)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def evaluate_internal(key, context, default, with_reasons)
private def evaluate_internal(key, context, default, with_reasons)
if @config.offline?
return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil
end
Expand All @@ -768,7 +718,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" }
else
@config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" }
Expand All @@ -779,7 +729,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

begin
feature = @store.get(Impl::DataStore::FEATURES, key)
feature = @data_system.store.get(Impl::DataStore::FEATURES, key)
rescue
# Ignored
end
Expand Down
27 changes: 27 additions & 0 deletions spec/impl/data_system/fdv1_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@ module DataSystem
subject # Force creation of FDv1 instance
expect(config.data_source_update_sink).to be_a(LaunchDarkly::Impl::DataSource::UpdateSink)
end

it "wraps the feature store with FeatureStoreClientWrapper" do
original_store = config.feature_store
subject # Force creation of FDv1 instance

wrapped_store = config.feature_store
expect(wrapped_store).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper)
expect(wrapped_store.instance_variable_get(:@store)).to eq(original_store)
end

it "avoids nested wrappers when config.feature_store is already wrapped" do
# First initialization wraps the store
original_store = config.feature_store
first_fdv1 = FDv1.new(sdk_key, config)
first_wrapper = config.feature_store
expect(first_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper)

# Second initialization (simulating postfork) should unwrap and re-wrap the original
second_fdv1 = FDv1.new(sdk_key, config)
second_wrapper = config.feature_store
expect(second_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper)

# The inner store should be the original, not the first wrapper
inner_store = second_wrapper.instance_variable_get(:@store)
expect(inner_store).to eq(original_store)
expect(inner_store).not_to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper)
end
end

describe "#start" do
Expand Down