Skip to content

Commit 9c2fa14

Browse files
committed
chore: Use FDv1 DataSystem in the ldclient
1 parent 2532600 commit 9c2fa14

File tree

2 files changed

+43
-82
lines changed

2 files changed

+43
-82
lines changed

lib/ldclient-rb/impl/data_system/fdv1.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ def initialize(sdk_key, config)
4545
@config.logger
4646
)
4747

48+
# Update config to use wrapped store so data sources can access it
49+
@config.instance_variable_set(:@feature_store, @store_wrapper)
50+
4851
# Create status provider with store wrapper
4952
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
5053
@store_wrapper,
@@ -156,6 +159,8 @@ def target_availability
156159
end
157160

158161
# Polling processor
162+
@config.logger.info { "Disabling streaming API" }
163+
@config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
159164
requestor = LaunchDarkly::Impl::DataSource::Requestor.new(@sdk_key, @config)
160165
LaunchDarkly::Impl::DataSource::PollingProcessor.new(@config, requestor)
161166
end

lib/ldclient-rb/ldclient.rb

Lines changed: 38 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "ldclient-rb/impl/data_source/requestor"
88
require "ldclient-rb/impl/data_source/stream"
99
require "ldclient-rb/impl/data_store"
10+
require "ldclient-rb/impl/data_system/fdv1"
1011
require "ldclient-rb/impl/diagnostic_events"
1112
require "ldclient-rb/impl/evaluation_with_hook_result"
1213
require "ldclient-rb/impl/evaluator"
@@ -57,11 +58,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
5758
# Note that sdk_key is normally a required parameter, and a nil value would cause the SDK to
5859
# fail in most configurations. However, there are some configurations where it would be OK
5960
# (offline = true, *or* we are using LDD mode or the file data source and events are disabled
60-
# so we're not connecting to any LD services) so rather than try to check for all of those
61-
# up front, we will let the constructors for the data source implementations implement this
62-
# fail-fast as appropriate, and just check here for the part regarding events.
63-
if !config.offline? && config.send_events
64-
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?
61+
# so we're not connecting to any LD services).
62+
if !config.offline? && sdk_key.nil?
63+
# SDK key can be nil only if using LDD or custom data source with events disabled
64+
if config.send_events || (!config.use_ldd? && config.data_source.nil?)
65+
raise ArgumentError, "sdk_key must not be nil"
66+
end
6567
end
6668

6769
@sdk_key = sdk_key
@@ -89,9 +91,10 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5)
8991
# @param wait_for_sec [Float] maximum time (in seconds) to wait for initialization
9092
#
9193
def postfork(wait_for_sec = 5)
92-
@data_source = nil
94+
@data_system = nil
9395
@event_processor = nil
9496
@big_segment_store_manager = nil
97+
@flag_tracker = nil
9598

9699
start_up(wait_for_sec)
97100
end
@@ -102,32 +105,22 @@ def postfork(wait_for_sec = 5)
102105

103106
@hooks = Concurrent::Array.new(@config.hooks + plugin_hooks)
104107

105-
@shared_executor = Concurrent::SingleThreadExecutor.new
106-
107-
data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
108-
store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster)
109-
110-
# We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add
111-
# some necessary logic around updates. Unfortunately, we have code elsewhere that accesses
112-
# the feature store through the Config object, so we need to make a new Config that uses
113-
# the wrapped store.
114-
@store = Impl::FeatureStoreClientWrapper.new(@config.feature_store, store_sink, @config.logger)
115-
updated_config = @config.clone
116-
updated_config.instance_variable_set(:@feature_store, @store)
117-
@config = updated_config
118-
119-
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink)
108+
# Initialize the data system (FDv1 for now, will support FDv2 in the future)
109+
# Note: FDv1 will update @config.feature_store to use its wrapped store
110+
@data_system = Impl::DataSystem::FDv1.new(@sdk_key, @config)
120111

112+
# Components not managed by data system
121113
@big_segment_store_manager = Impl::BigSegmentStoreManager.new(@config.big_segments, @config.logger)
122114
@big_segment_store_status_provider = @big_segment_store_manager.status_provider
123115

124-
get_flag = lambda { |key| @store.get(Impl::DataStore::FEATURES, key) }
125-
get_segment = lambda { |key| @store.get(Impl::DataStore::SEGMENTS, key) }
116+
get_flag = lambda { |key| @data_system.store.get(Impl::DataStore::FEATURES, key) }
117+
get_segment = lambda { |key| @data_system.store.get(Impl::DataStore::SEGMENTS, key) }
126118
get_big_segments_membership = lambda { |key| @big_segment_store_manager.get_context_membership(key) }
127119
@evaluator = LaunchDarkly::Impl::Evaluator.new(get_flag, get_segment, get_big_segments_membership, @config.logger)
128120

129121
if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out?
130122
diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(@sdk_key))
123+
@data_system.set_diagnostic_accumulator(diagnostic_accumulator)
131124
else
132125
diagnostic_accumulator = nil
133126
end
@@ -138,38 +131,14 @@ def postfork(wait_for_sec = 5)
138131
@event_processor = EventProcessor.new(@sdk_key, @config, nil, diagnostic_accumulator)
139132
end
140133

141-
if @config.use_ldd?
142-
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
143-
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
144-
return # requestor and update processor are not used in this mode
145-
end
146-
147-
flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
148-
@flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) })
149-
150-
data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
151-
152-
# Make the update sink available on the config so that our data source factory can access the sink with a shared executor.
153-
@config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster)
154-
155-
@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink)
156-
157-
data_source_or_factory = @config.data_source || self.method(:create_default_data_source)
158-
if data_source_or_factory.respond_to? :call
159-
# Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in
160-
# which case they take three parameters. This will be changed in the future to use a less awkware mechanism.
161-
if data_source_or_factory.arity == 3
162-
@data_source = data_source_or_factory.call(@sdk_key, @config, diagnostic_accumulator)
163-
else
164-
@data_source = data_source_or_factory.call(@sdk_key, @config)
165-
end
166-
else
167-
@data_source = data_source_or_factory
168-
end
134+
# Create the flag tracker using the broadcaster from the data system
135+
eval_fn = lambda { |key, context| variation(key, context, nil) }
136+
@flag_tracker = Impl::FlagTracker.new(@data_system.flag_change_broadcaster, eval_fn)
169137

170138
register_plugins(environment_metadata)
171139

172-
ready = @data_source.start
140+
# Start the data system
141+
ready = @data_system.start
173142

174143
return unless wait_for_sec > 0
175144

@@ -180,7 +149,7 @@ def postfork(wait_for_sec = 5)
180149
ok = ready.wait(wait_for_sec)
181150
if !ok
182151
@config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" }
183-
elsif !@data_source.initialized?
152+
elsif !initialized?
184153
@config.logger.error { "[LDClient] LaunchDarkly client initialization failed" }
185154
end
186155
end
@@ -295,7 +264,7 @@ def secure_mode_hash(context)
295264
# @return [Boolean] true if the client has been initialized
296265
#
297266
def initialized?
298-
@config.offline? || @config.use_ldd? || @data_source.initialized?
267+
@data_system.data_availability != Impl::DataSystem::DataAvailability::DEFAULTS
299268
end
300269

301270
#
@@ -601,7 +570,7 @@ def all_flags_state(context, options={})
601570
return FeatureFlagsState.new(false) if @config.offline?
602571

603572
unless initialized?
604-
if @store.initialized?
573+
if @data_system.store.initialized?
605574
@config.logger.warn { "Called all_flags_state before client initialization; using last known values from data store" }
606575
else
607576
@config.logger.warn { "Called all_flags_state before client initialization. Data store not available; returning empty state" }
@@ -616,7 +585,7 @@ def all_flags_state(context, options={})
616585
end
617586

618587
begin
619-
features = @store.all(Impl::DataStore::FEATURES)
588+
features = @data_system.store.all(Impl::DataStore::FEATURES)
620589
rescue => exn
621590
Impl::Util.log_exception(@config.logger, "Unable to read flags for all_flags_state", exn)
622591
return FeatureFlagsState.new(false)
@@ -663,11 +632,9 @@ def all_flags_state(context, options={})
663632
# @return [void]
664633
def close
665634
@config.logger.info { "[LDClient] Closing LaunchDarkly client..." }
666-
@data_source.stop
635+
@data_system.stop
667636
@event_processor.stop
668637
@big_segment_store_manager.stop
669-
@store.stop
670-
@shared_executor.shutdown
671638
end
672639

673640
#
@@ -690,7 +657,9 @@ def close
690657
#
691658
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
692659
#
693-
attr_reader :data_store_status_provider
660+
def data_store_status_provider
661+
@data_system.data_store_status_provider
662+
end
694663

695664
#
696665
# Returns an interface for tracking the status of the data source.
@@ -703,7 +672,9 @@ def close
703672
#
704673
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
705674
#
706-
attr_reader :data_source_status_provider
675+
def data_source_status_provider
676+
@data_system.data_source_status_provider
677+
end
707678

708679
#
709680
# Returns an interface for tracking changes in feature flag configurations.
@@ -712,23 +683,8 @@ def close
712683
# requesting notifications about feature flag changes using an event
713684
# listener model.
714685
#
715-
attr_reader :flag_tracker
716-
717-
private
718-
719-
def create_default_data_source(sdk_key, config, diagnostic_accumulator)
720-
if config.offline?
721-
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
722-
end
723-
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
724-
if config.stream?
725-
Impl::DataSource::StreamProcessor.new(sdk_key, config, diagnostic_accumulator)
726-
else
727-
config.logger.info { "Disabling streaming API" }
728-
config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" }
729-
requestor = Impl::DataSource::Requestor.new(sdk_key, config)
730-
Impl::DataSource::PollingProcessor.new(config, requestor)
731-
end
686+
def flag_tracker
687+
@flag_tracker
732688
end
733689

734690
#
@@ -738,7 +694,7 @@ def create_default_data_source(sdk_key, config, diagnostic_accumulator)
738694
#
739695
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
740696
#
741-
def variation_with_flag(key, context, default)
697+
private def variation_with_flag(key, context, default)
742698
evaluate_internal(key, context, default, false)
743699
end
744700

@@ -750,7 +706,7 @@ def variation_with_flag(key, context, default)
750706
#
751707
# @return [Array<EvaluationDetail, [LaunchDarkly::Impl::Model::FeatureFlag, nil], [String, nil]>]
752708
#
753-
def evaluate_internal(key, context, default, with_reasons)
709+
private def evaluate_internal(key, context, default, with_reasons)
754710
if @config.offline?
755711
return Evaluator.error_result(EvaluationReason::ERROR_CLIENT_NOT_READY, default), nil, nil
756712
end
@@ -768,7 +724,7 @@ def evaluate_internal(key, context, default, with_reasons)
768724
end
769725

770726
unless initialized?
771-
if @store.initialized?
727+
if @data_system.store.initialized?
772728
@config.logger.warn { "[LDClient] Client has not finished initializing; using last known values from feature store" }
773729
else
774730
@config.logger.error { "[LDClient] Client has not finished initializing; feature store unavailable, returning default value" }
@@ -779,7 +735,7 @@ def evaluate_internal(key, context, default, with_reasons)
779735
end
780736

781737
begin
782-
feature = @store.get(Impl::DataStore::FEATURES, key)
738+
feature = @data_system.store.get(Impl::DataStore::FEATURES, key)
783739
rescue
784740
# Ignored
785741
end

0 commit comments

Comments
 (0)