@@ -103,9 +103,6 @@ def get(kind, key)
103
103
nil
104
104
end
105
105
end
106
- if !f . nil?
107
- put_cache ( kind , key , f )
108
- end
109
106
end
110
107
if f . nil?
111
108
@logger . debug { "RedisFeatureStore: #{ key } not found in '#{ kind [ :namespace ] } '" }
@@ -138,50 +135,36 @@ def all(kind)
138
135
end
139
136
140
137
def delete ( kind , key , version )
141
- with_connection do |redis |
142
- f = get_redis ( kind , redis , key )
143
- if f . nil?
144
- put_redis_and_cache ( kind , redis , key , { deleted : true , version : version } )
145
- else
146
- if f [ :version ] < version
147
- f1 = f . clone
148
- f1 [ :deleted ] = true
149
- f1 [ :version ] = version
150
- put_redis_and_cache ( kind , redis , key , f1 )
151
- else
152
- @logger . warn ( "RedisFeatureStore: attempted to delete #{ key } version: #{ f [ :version ] } \
153
- in '#{ kind [ :namespace ] } ' with a version that is the same or older: #{ version } ")
154
- end
155
- end
156
- end
138
+ update_with_versioning ( kind , { key : key , version : version , deleted : true } )
157
139
end
158
140
159
141
def init ( all_data )
160
142
@cache . clear
161
143
count = 0
162
144
with_connection do |redis |
163
145
all_data . each do |kind , items |
164
- redis . multi do |multi |
165
- multi . del ( items_key ( kind ) )
166
- count = count + items . count
167
- items . each { |k , v | put_redis_and_cache ( kind , multi , k , v ) }
168
- end
146
+ begin
147
+ redis . multi do |multi |
148
+ multi . del ( items_key ( kind ) )
149
+ count = count + items . count
150
+ items . each { |key , item |
151
+ redis . hset ( items_key ( kind ) , key , item . to_json )
152
+ }
153
+ end
154
+ items . each { |key , item |
155
+ put_cache ( kind , key . to_sym , item )
156
+ }
157
+ rescue => e
158
+ @logger . error { "RedisFeatureStore: could not initialize '#{ kind [ :namespace ] } ' in Redis, error: #{ e } " }
159
+ end
169
160
end
170
161
end
171
162
@inited . set ( true )
172
163
@logger . info { "RedisFeatureStore: initialized with #{ count } items" }
173
164
end
174
165
175
166
def upsert ( kind , item )
176
- with_connection do |redis |
177
- redis . watch ( items_key ( kind ) ) do
178
- old = get_redis ( kind , redis , item [ :key ] )
179
- if old . nil? || ( old [ :version ] < item [ :version ] )
180
- put_redis_and_cache ( kind , redis , item [ :key ] , item )
181
- end
182
- redis . unwatch
183
- end
184
- end
167
+ update_with_versioning ( kind , item )
185
168
end
186
169
187
170
def initialized?
@@ -195,13 +178,12 @@ def stop
195
178
end
196
179
end
197
180
181
+ private
182
+
198
183
# exposed for testing
199
- def clear_local_cache ( )
200
- @cache . clear
184
+ def before_update_transaction ( base_key , key )
201
185
end
202
186
203
- private
204
-
205
187
def items_key ( kind )
206
188
@prefix + ":" + kind [ :namespace ]
207
189
end
@@ -217,7 +199,13 @@ def with_connection
217
199
def get_redis ( kind , redis , key )
218
200
begin
219
201
json_item = redis . hget ( items_key ( kind ) , key )
220
- JSON . parse ( json_item , symbolize_names : true ) if json_item
202
+ if json_item
203
+ item = JSON . parse ( json_item , symbolize_names : true )
204
+ put_cache ( kind , key , item )
205
+ item
206
+ else
207
+ nil
208
+ end
221
209
rescue => e
222
210
@logger . error { "RedisFeatureStore: could not retrieve #{ key } from Redis, error: #{ e } " }
223
211
nil
@@ -228,13 +216,39 @@ def put_cache(kind, key, value)
228
216
@cache . store ( cache_key ( kind , key ) , value , expires : @expiration_seconds )
229
217
end
230
218
231
- def put_redis_and_cache ( kind , redis , key , item )
232
- begin
233
- redis . hset ( items_key ( kind ) , key , item . to_json )
234
- rescue => e
235
- @logger . error { "RedisFeatureStore: could not store #{ key } in Redis, error: #{ e } " }
219
+ def update_with_versioning ( kind , new_item )
220
+ base_key = items_key ( kind )
221
+ key = new_item [ :key ]
222
+ try_again = true
223
+ while try_again
224
+ try_again = false
225
+ with_connection do |redis |
226
+ redis . watch ( base_key ) do
227
+ old_item = get_redis ( kind , redis , key )
228
+ before_update_transaction ( base_key , key )
229
+ if old_item . nil? || old_item [ :version ] < new_item [ :version ]
230
+ begin
231
+ result = redis . multi do |multi |
232
+ multi . hset ( base_key , key , new_item . to_json )
233
+ end
234
+ if result . nil?
235
+ @logger . debug { "RedisFeatureStore: concurrent modification detected, retrying" }
236
+ try_again = true
237
+ else
238
+ put_cache ( kind , key . to_sym , new_item )
239
+ end
240
+ rescue => e
241
+ @logger . error { "RedisFeatureStore: could not store #{ key } in Redis, error: #{ e } " }
242
+ end
243
+ else
244
+ action = new_item [ :deleted ] ? "delete" : "update"
245
+ @logger . warn { "RedisFeatureStore: attempted to #{ action } #{ key } version: #{ old_item [ :version ] } \
246
+ in '#{ kind [ :namespace ] } ' with a version that is the same or older: #{ new_item [ :version ] } " }
247
+ end
248
+ redis . unwatch
249
+ end
250
+ end
236
251
end
237
- put_cache ( kind , key . to_sym , item )
238
252
end
239
253
240
254
def query_inited
0 commit comments