Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/ldclient-rb/impl/data_system/fdv1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def initialize(sdk_key, config)
@config.logger
)

# Update config to use wrapped store so data sources can access it
@config.instance_variable_set(:@feature_store, @store_wrapper)

# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
Expand Down Expand Up @@ -156,6 +159,8 @@ def target_availability
end

# Polling processor
@config.logger.info { "Disabling streaming API" }
@config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config)
LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor)
end
Expand Down
126 changes: 38 additions & 88 deletions lib/ldclient-rb/ldclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@
require "ldclient-rb/impl/broadcaster"
require "ldclient-rb/impl/context"
require "ldclient-rb/impl/data_source"
require "ldclient-rb/impl/data_source/null_processor"
require "ldclient-rb/impl/data_source/polling"
require "ldclient-rb/impl/data_source/requestor"
require "ldclient-rb/impl/data_source/stream"
require "ldclient-rb/impl/data_store"
require "ldclient-rb/impl/data_system/fdv1"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/evaluation_with_hook_result"
require "ldclient-rb/impl/evaluator"
require "ldclient-rb/impl/flag_tracker"
require "ldclient-rb/impl/migrations/tracker"
require "ldclient-rb/impl/store_client_wrapper"
require "ldclient-rb/impl/util"
require "ldclient-rb/events"
require "ldclient-rb/in_memory_store"
require "concurrent"
require "concurrent/atomics"
require "digest/sha1"
Expand Down Expand Up @@ -57,11 +52,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to
# fail in most configurations. However, there are some configurations where it would be OK
# (offline = true, *or* we are using LDD mode or the file data source and events are disabled
# so we're not connecting to any LD services) so rather than try to check for all of those
# up front, we will let the constructors for the data source implementations implement this
# fail-fast as appropriate, and just check here for the part regarding events.
if !config.offline? && config.send_events
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?
# so we're not connecting to any LD services).
if !config.offline? && sdk_key.nil?
# SDK key can be nil only if using LDD or custom data source with events disabled
if config.send_events || (!config.use_ldd? && config.data_source.nil?)
raise ArgumentError, "sdk_key must not be nil"
end
end

@sdk_key = sdk_key
Expand Down Expand Up @@ -89,9 +85,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
# @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization
#
def postfork(wait_for_sec = 5)
@data_source = nil
@data_system = nil
@event_processor = nil
@big_segment_store_manager = nil
@flag_tracker = nil

start_up(wait_for_sec)
end
Expand All @@ -102,32 +99,22 @@ def postfork(wait_for_sec = 5)

@hooks = Concurrent::Array.new(@config.hooks + plugin_hooks)

@shared_executor = Concurrent::SingleThreadExecutor.new

data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster)

# We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add
# some necessary logic around updates. Unfortunately, we have code elsewhere that accesses
# the feature store through the Config object, so we need to make a new Config that uses
# the wrapped store.
@store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger)
updated_config = @config.clone
updated_config.instance_variable_set(:@feature_store, @store)
@config = updated_config

@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink)
# Initialize the data system (FDv1 for now, will support FDv2 in the future)
# Note: FDv1 will update @config.feature_store to use its wrapped store
@data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config)

# Components not managed by data system
@big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger)
@big_segment_store_status_provider = @big_segment_store_manager.status_provider

get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) }
get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) }
get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) }
get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) }
@evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger)

if [email protected]? && @config.send_events && [email protected]_opt_out?
diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key))
@data_system.set_diagnostic_accumulator(diagnostic_accumulator)
else
diagnostic_accumulator = nil
end
Expand All @@ -138,38 +125,14 @@ def postfork(wait_for_sec = 5)
@event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator)
end

if @config.use_ldd?
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
return # requestor and update processor are not used in this mode
end

flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) })

data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)

# Make the update sink available on the config so that our data source factory can access the sink with a shared executor.
@config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster)

@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink)

data_source_or_factory = @config.data_source || self.method(:create_default_data_source)
if data_source_or_factory.respond_to? :call
# Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in
# which case they take three parameters. This will be changed in the future to use a less awkware mechanism.
if data_source_or_factory.arity == 3
@data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator)
else
@data_source = data_source_or_factory.call(@sdk_key, @config)
end
else
@data_source = data_source_or_factory
end
# Create the flag tracker using the broadcaster from the data system
eval_fn = lambda { |key, context| variation(key, context, nil) }
@flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn)

register_plugins(environment_metadata)

ready = @data_source.start
# Start the data system
ready = @data_system.start

return unless wait_for_sec > 0

Expand All @@ -180,7 +143,7 @@ def postfork(wait_for_sec = 5)
ok = ready.wait(wait_for_sec)
if !ok
@config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" }
elsif !@data_source.initialized?
elsif !initialized?
@config.logger.error { "[LDClient] LaunchDarkly client initialization failed" }
end
end
Expand Down Expand Up @@ -295,7 +258,7 @@ def secure_mode_hash(context)
# @return [Boolean] true if the client has been initialized
#
def initialized?
@config.offline? || @config.use_ldd? || @data_source.initialized?
@data_system.data_availability == @data_system.target_availability
end

#
Expand Down Expand Up @@ -601,7 +564,7 @@ def all_flags_state(context, options={})
return FeatureFlagsState.new(false) if @config.offline?

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" }
else
@config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" }
Expand All @@ -616,7 +579,7 @@ def all_flags_state(context, options={})
end

begin
features = @store.all(Impl::DataStore::FEATURES)
features = @data_system.store.all(Impl::DataStore::FEATURES)
rescue => exn
Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn)
return FeatureFlagsState.new(false)
Expand Down Expand Up @@ -663,11 +626,9 @@ def all_flags_state(context, options={})
# @return [void]
def close
@config.logger.info { "[LDClient] Closing LaunchDarkly client..." }
@data_source.stop
@data_system.stop
@event_processor.stop
@big_segment_store_manager.stop
@store.stop
@shared_executor.shutdown
end

#
Expand All @@ -690,7 +651,9 @@ def close
#
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
#
attr_reader :data_store_status_provider
def data_store_status_provider
@data_system.data_store_status_provider
end

#
# Returns an interface for tracking the status of the data source.
Expand All @@ -703,7 +666,9 @@ def close
#
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
#
attr_reader :data_source_status_provider
def data_source_status_provider
@data_system.data_source_status_provider
end

#
# Returns an interface for tracking changes in feature flag configurations.
Expand All @@ -712,23 +677,8 @@ def close
# requesting notifications about feature flag changes using an event
# listener model.
#
attr_reader :flag_tracker

private

def create_default_data_source(sdk_key, config, diagnostic_accumulator)
if config.offline?
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
end
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
if config.stream?
Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator)
else
config.logger.info { "Disabling streaming API" }
config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
requestor = Impl::DataSource::Requestor.new(sdk_key, config)
Impl::DataSource::PollingProcessor.new(config, requestor)
end
def flag_tracker
@flag_tracker
end

#
Expand All @@ -738,7 +688,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def variation_with_flag(key, context, default)
private def variation_with_flag(key, context, default)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file used a mix of per method private and all methods after the declaration on line 717 of the original file. Moving to defining per method.

evaluate_internal(key, context, default, false)
end

Expand All @@ -750,7 +700,7 @@ def variation_with_flag(key, context, default)
#
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
#
def evaluate_internal(key, context, default, with_reasons)
private def evaluate_internal(key, context, default, with_reasons)
if @config.offline?
return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil
end
Expand All @@ -768,7 +718,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

unless initialized?
if @store.initialized?
if @data_system.store.initialized?
@config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" }
else
@config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" }
Expand All @@ -779,7 +729,7 @@ def evaluate_internal(key, context, default, with_reasons)
end

begin
feature = @store.get(Impl::DataStore::FEATURES, key)
feature = @data_system.store.get(Impl::DataStore::FEATURES, key)
rescue
# Ignored
end
Expand Down