@@ -103,9 +103,6 @@ def get(kind, key)
103
103
nil
104
104
end
105
105
end
106
- if !f . nil?
107
- put_cache ( kind , key , f )
108
- end
109
106
end
110
107
if f . nil?
111
108
@logger . debug { "RedisFeatureStore: #{ key } not found in '#{ kind [ :namespace ] } '" }
@@ -138,50 +135,36 @@ def all(kind)
138
135
end
139
136
140
137
def delete ( kind , key , version )
141
- with_connection do |redis |
142
- f = get_redis ( kind , redis , key )
143
- if f . nil?
144
- put_redis_and_cache ( kind , redis , key , { deleted : true , version : version } )
145
- else
146
- if f [ :version ] < version
147
- f1 = f . clone
148
- f1 [ :deleted ] = true
149
- f1 [ :version ] = version
150
- put_redis_and_cache ( kind , redis , key , f1 )
151
- else
152
- @logger . warn ( "RedisFeatureStore: attempted to delete #{ key } version: #{ f [ :version ] } \
153
- in '#{ kind [ :namespace ] } ' with a version that is the same or older: #{ version } ")
154
- end
155
- end
156
- end
138
+ update_with_versioning ( kind , { key : key , version : version , deleted : true } )
157
139
end
158
140
159
141
def init ( all_data )
160
142
@cache . clear
161
143
count = 0
162
144
with_connection do |redis |
163
145
all_data . each do |kind , items |
164
- redis . multi do |multi |
165
- multi . del ( items_key ( kind ) )
166
- count = count + items . count
167
- items . each { |k , v | put_redis_and_cache ( kind , multi , k , v ) }
168
- end
146
+ begin
147
+ redis . multi do |multi |
148
+ multi . del ( items_key ( kind ) )
149
+ count = count + items . count
150
+ items . each { |key , item |
151
+ redis . hset ( items_key ( kind ) , key , item . to_json )
152
+ }
153
+ end
154
+ items . each { |key , item |
155
+ put_cache ( kind , key . to_sym , item )
156
+ }
157
+ rescue => e
158
+ @logger . error { "RedisFeatureStore: could not initialize '#{ kind [ :namespace ] } ' in Redis, error: #{ e } " }
159
+ end
169
160
end
170
161
end
171
162
@inited . set ( true )
172
163
@logger . info { "RedisFeatureStore: initialized with #{ count } items" }
173
164
end
174
165
175
166
def upsert ( kind , item )
176
- with_connection do |redis |
177
- redis . watch ( items_key ( kind ) ) do
178
- old = get_redis ( kind , redis , item [ :key ] )
179
- if old . nil? || ( old [ :version ] < item [ :version ] )
180
- put_redis_and_cache ( kind , redis , item [ :key ] , item )
181
- end
182
- redis . unwatch
183
- end
184
- end
167
+ update_with_versioning ( kind , item )
185
168
end
186
169
187
170
def initialized?
@@ -200,6 +183,11 @@ def clear_local_cache()
200
183
@cache . clear
201
184
end
202
185
186
+ # exposed for testing
187
+ def set_transaction_hook ( proc )
188
+ @transaction_hook = proc
189
+ end
190
+
203
191
private
204
192
205
193
def items_key ( kind )
@@ -217,7 +205,13 @@ def with_connection
217
205
def get_redis ( kind , redis , key )
218
206
begin
219
207
json_item = redis . hget ( items_key ( kind ) , key )
220
- JSON . parse ( json_item , symbolize_names : true ) if json_item
208
+ if json_item
209
+ item = JSON . parse ( json_item , symbolize_names : true )
210
+ put_cache ( kind , key , item )
211
+ item
212
+ else
213
+ nil
214
+ end
221
215
rescue => e
222
216
@logger . error { "RedisFeatureStore: could not retrieve #{ key } from Redis, error: #{ e } " }
223
217
nil
@@ -228,13 +222,41 @@ def put_cache(kind, key, value)
228
222
@cache . store ( cache_key ( kind , key ) , value , expires : @expiration_seconds )
229
223
end
230
224
231
- def put_redis_and_cache ( kind , redis , key , item )
232
- begin
233
- redis . hset ( items_key ( kind ) , key , item . to_json )
234
- rescue => e
235
- @logger . error { "RedisFeatureStore: could not store #{ key } in Redis, error: #{ e } " }
225
+ def update_with_versioning ( kind , new_item )
226
+ base_key = items_key ( kind )
227
+ key = new_item [ :key ]
228
+ try_again = true
229
+ while try_again
230
+ try_again = false
231
+ with_connection do |redis |
232
+ redis . watch ( base_key ) do
233
+ old_item = get_redis ( kind , redis , key )
234
+ if @transaction_hook
235
+ @transaction_hook . call ( base_key , key )
236
+ end
237
+ if old_item . nil? || old_item [ :version ] < new_item [ :version ]
238
+ begin
239
+ result = redis . multi do |multi |
240
+ multi . hset ( base_key , key , new_item . to_json )
241
+ end
242
+ if result . nil?
243
+ @logger . debug { "RedisFeatureStore: concurrent modification detected, retrying" }
244
+ try_again = true
245
+ else
246
+ put_cache ( kind , key . to_sym , new_item )
247
+ end
248
+ rescue => e
249
+ @logger . error { "RedisFeatureStore: could not store #{ key } in Redis, error: #{ e } " }
250
+ end
251
+ else
252
+ action = new_item [ :deleted ] ? "delete" : "update"
253
+ @logger . warn { "RedisFeatureStore: attempted to #{ action } #{ key } version: #{ old_item [ :version ] } \
254
+ in '#{ kind [ :namespace ] } ' with a version that is the same or older: #{ new_item [ :version ] } " }
255
+ end
256
+ redis . unwatch
257
+ end
258
+ end
236
259
end
237
- put_cache ( kind , key . to_sym , item )
238
260
end
239
261
240
262
def query_inited
0 commit comments