diff --git a/lib/ldclient-rb/impl/data_system/fdv1.rb b/lib/ldclient-rb/impl/data_system/fdv1.rb index a2fcdd13..1eb3fa25 100644 --- a/lib/ldclient-rb/impl/data_system/fdv1.rb +++ b/lib/ldclient-rb/impl/data_system/fdv1.rb @@ -38,13 +38,22 @@ def initialize(sdk_key, config) @data_store_broadcaster ) - # Wrap the data store with client wrapper (must be created before status provider) + # Preserve the original unwrapped store to avoid nested wrappers on postfork + original_store = @config.feature_store + if original_store.is_a?(LaunchDarkly::Impl::FeatureStoreClientWrapper) + original_store = original_store.instance_variable_get(:@store) + end + + # Wrap the original data store with client wrapper (must be created before status provider) @store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new( - @config.feature_store, + original_store, @data_store_update_sink, @config.logger ) + # Update config to use wrapped store so data sources can access it + @config.instance_variable_set(:@feature_store, @store_wrapper) + # Create status provider with store wrapper @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new( @store_wrapper, @@ -83,6 +92,7 @@ def start # (see DataSystem#stop) def stop @update_processor&.stop + @store_wrapper.stop @shared_executor.shutdown end @@ -156,6 +166,8 @@ def target_availability end # Polling processor + @config.logger.info { "Disabling streaming API" } + @config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config) LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor) end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 825813c4..b521f041 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -2,20 +2,15 @@ require "ldclient-rb/impl/broadcaster" require "ldclient-rb/impl/context" require "ldclient-rb/impl/data_source" -require "ldclient-rb/impl/data_source/null_processor" -require "ldclient-rb/impl/data_source/polling" -require "ldclient-rb/impl/data_source/requestor" -require "ldclient-rb/impl/data_source/stream" require "ldclient-rb/impl/data_store" +require "ldclient-rb/impl/data_system/fdv1" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/evaluation_with_hook_result" require "ldclient-rb/impl/evaluator" require "ldclient-rb/impl/flag_tracker" require "ldclient-rb/impl/migrations/tracker" -require "ldclient-rb/impl/store_client_wrapper" require "ldclient-rb/impl/util" require "ldclient-rb/events" -require "ldclient-rb/in_memory_store" require "concurrent" require "concurrent/atomics" require "digest/sha1" @@ -57,11 +52,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) # Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to # fail in most configurations. However, there are some configurations where it would be OK # (offline = true, *or* we are using LDD mode or the file data source and events are disabled - # so we're not connecting to any LD services) so rather than try to check for all of those - # up front, we will let the constructors for the data source implementations implement this - # fail-fast as appropriate, and just check here for the part regarding events. - if !config.offline? && config.send_events - raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? + # so we're not connecting to any LD services). + if !config.offline? && sdk_key.nil? + # SDK key can be nil only if using LDD or custom data source with events disabled + if config.send_events || (!config.use_ldd? && config.data_source.nil?) + raise ArgumentError, "sdk_key must not be nil" + end end @sdk_key = sdk_key @@ -89,9 +85,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) # @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization # def postfork(wait_for_sec = 5) - @data_source = nil + @data_system = nil @event_processor = nil @big_segment_store_manager = nil + @flag_tracker = nil start_up(wait_for_sec) end @@ -102,32 +99,22 @@ def postfork(wait_for_sec = 5) @hooks = Concurrent::Array.new(@config.hooks + plugin_hooks) - @shared_executor = Concurrent::SingleThreadExecutor.new - - data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster) - - # We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add - # some necessary logic around updates. Unfortunately, we have code elsewhere that accesses - # the feature store through the Config object, so we need to make a new Config that uses - # the wrapped store. - @store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger) - updated_config = @config.clone - updated_config.instance_variable_set(:@feature_store, @store) - @config = updated_config - - @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink) + # Initialize the data system (FDv1 for now, will support FDv2 in the future) + # Note: FDv1 will update @config.feature_store to use its wrapped store + @data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config) + # Components not managed by data system @big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger) @big_segment_store_status_provider = @big_segment_store_manager.status_provider - get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) } - get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) } + get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) } + get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) } get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) } @evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger) if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out? diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key)) + @data_system.set_diagnostic_accumulator(diagnostic_accumulator) else diagnostic_accumulator = nil end @@ -138,38 +125,14 @@ def postfork(wait_for_sec = 5) @event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator) end - if @config.use_ldd? - @config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" } - @data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new - return # requestor and update processor are not used in this mode - end - - flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - @flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) }) - - data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) - - # Make the update sink available on the config so that our data source factory can access the sink with a shared executor. - @config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster) - - @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink) - - data_source_or_factory = @config.data_source || self.method(:create_default_data_source) - if data_source_or_factory.respond_to? :call - # Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in - # which case they take three parameters. This will be changed in the future to use a less awkware mechanism. - if data_source_or_factory.arity == 3 - @data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator) - else - @data_source = data_source_or_factory.call(@sdk_key, @config) - end - else - @data_source = data_source_or_factory - end + # Create the flag tracker using the broadcaster from the data system + eval_fn = lambda { |key, context| variation(key, context, nil) } + @flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn) register_plugins(environment_metadata) - ready = @data_source.start + # Start the data system + ready = @data_system.start return unless wait_for_sec > 0 @@ -180,7 +143,7 @@ def postfork(wait_for_sec = 5) ok = ready.wait(wait_for_sec) if !ok @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } - elsif !@data_source.initialized? + elsif !initialized? @config.logger.error { "[LDClient] LaunchDarkly client initialization failed" } end end @@ -295,7 +258,7 @@ def secure_mode_hash(context) # @return [Boolean] true if the client has been initialized # def initialized? - @config.offline? || @config.use_ldd? || @data_source.initialized? + @data_system.data_availability == @data_system.target_availability end # @@ -601,7 +564,7 @@ def all_flags_state(context, options={}) return FeatureFlagsState.new(false) if @config.offline? unless initialized? - if @store.initialized? + if @data_system.store.initialized? @config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" } else @config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" } @@ -616,7 +579,7 @@ def all_flags_state(context, options={}) end begin - features = @store.all(Impl::DataStore::FEATURES) + features = @data_system.store.all(Impl::DataStore::FEATURES) rescue => exn Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn) return FeatureFlagsState.new(false) @@ -663,11 +626,9 @@ def all_flags_state(context, options={}) # @return [void] def close @config.logger.info { "[LDClient] Closing LaunchDarkly client..." } - @data_source.stop + @data_system.stop @event_processor.stop @big_segment_store_manager.stop - @store.stop - @shared_executor.shutdown end # @@ -690,7 +651,9 @@ def close # # @return [LaunchDarkly::Interfaces::DataStore::StatusProvider] # - attr_reader :data_store_status_provider + def data_store_status_provider + @data_system.data_store_status_provider + end # # Returns an interface for tracking the status of the data source. @@ -703,7 +666,9 @@ def close # # @return [LaunchDarkly::Interfaces::DataSource::StatusProvider] # - attr_reader :data_source_status_provider + def data_source_status_provider + @data_system.data_source_status_provider + end # # Returns an interface for tracking changes in feature flag configurations. @@ -712,23 +677,8 @@ def close # requesting notifications about feature flag changes using an event # listener model. # - attr_reader :flag_tracker - - private - - def create_default_data_source(sdk_key, config, diagnostic_accumulator) - if config.offline? - return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new - end - raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key - if config.stream? - Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator) - else - config.logger.info { "Disabling streaming API" } - config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } - requestor = Impl::DataSource::Requestor.new(sdk_key, config) - Impl::DataSource::PollingProcessor.new(config, requestor) - end + def flag_tracker + @flag_tracker end # @@ -738,7 +688,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator) # # @return [Array] # - def variation_with_flag(key, context, default) + private def variation_with_flag(key, context, default) evaluate_internal(key, context, default, false) end @@ -750,7 +700,7 @@ def variation_with_flag(key, context, default) # # @return [Array] # - def evaluate_internal(key, context, default, with_reasons) + private def evaluate_internal(key, context, default, with_reasons) if @config.offline? return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil end @@ -768,7 +718,7 @@ def evaluate_internal(key, context, default, with_reasons) end unless initialized? - if @store.initialized? + if @data_system.store.initialized? @config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" } else @config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" } @@ -779,7 +729,7 @@ def evaluate_internal(key, context, default, with_reasons) end begin - feature = @store.get(Impl::DataStore::FEATURES, key) + feature = @data_system.store.get(Impl::DataStore::FEATURES, key) rescue # Ignored end diff --git a/spec/impl/data_system/fdv1_spec.rb b/spec/impl/data_system/fdv1_spec.rb index 40f8c349..e76aaf17 100644 --- a/spec/impl/data_system/fdv1_spec.rb +++ b/spec/impl/data_system/fdv1_spec.rb @@ -15,6 +15,33 @@ module DataSystem subject # Force creation of FDv1 instance expect(config.data_source_update_sink).to be_a(LaunchDarkly::Impl::DataSource::UpdateSink) end + + it "wraps the feature store with FeatureStoreClientWrapper" do + original_store = config.feature_store + subject # Force creation of FDv1 instance + + wrapped_store = config.feature_store + expect(wrapped_store).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + expect(wrapped_store.instance_variable_get(:@store)).to eq(original_store) + end + + it "avoids nested wrappers when config.feature_store is already wrapped" do + # First initialization wraps the store + original_store = config.feature_store + first_fdv1 = FDv1.new(sdk_key, config) + first_wrapper = config.feature_store + expect(first_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + + # Second initialization (simulating postfork) should unwrap and re-wrap the original + second_fdv1 = FDv1.new(sdk_key, config) + second_wrapper = config.feature_store + expect(second_wrapper).to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + + # The inner store should be the original, not the first wrapper + inner_store = second_wrapper.instance_variable_get(:@store) + expect(inner_store).to eq(original_store) + expect(inner_store).not_to be_a(LaunchDarkly::Impl::FeatureStoreClientWrapper) + end end describe "#start" do