23
23
from cacheme .interfaces import DoorKeeper , Metrics , Serializer , Node
24
24
from cacheme .models import (
25
25
Cache ,
26
- CachedAwaitable ,
27
26
DynamicNode ,
28
27
Fetcher ,
29
28
_add_node ,
@@ -46,9 +45,7 @@ def __init__(self):
46
45
self .value = None
47
46
48
47
49
- # temp storage for futures which are loading from source now,
50
- # will removed automatically when loading done
51
- _awaits : Dict [str , CachedAwaitable ] = {}
48
+ _awaits : Dict [str , Future ] = {}
52
49
53
50
54
51
def _awaits_len ():
@@ -100,14 +97,25 @@ async def get(node: Node, load_fn=None):
100
97
# remote storages are slow and asynchronous, use tmp cached awaitables to avoid thundering herd
101
98
if result is sentinel :
102
99
key = node .full_key ()
103
- awaitable = _awaits .get (key , None )
104
- if awaitable is None :
105
- awaitable = CachedAwaitable (
106
- _load_from_caches (node , remote_caches , miss , load_fn ), metrics
107
- )
108
- _awaits [node .full_key ()] = awaitable
109
- # wait
110
- result = await awaitable
100
+ future = _awaits .get (key , None )
101
+ if future is None :
102
+ metrics ._miss_count += 1
103
+ future = Future ()
104
+ _awaits [node .full_key ()] = future
105
+ now = time_ns ()
106
+ try :
107
+ result = await _load_from_caches (node , remote_caches , miss , load_fn )
108
+ except Exception as e :
109
+ metrics ._load_failure_count += 1
110
+ metrics ._total_load_time += time_ns () - now
111
+ _awaits .pop (node .full_key (), None )
112
+ raise (e )
113
+ metrics ._load_success_count += 1
114
+ metrics ._total_load_time += time_ns () - now
115
+ future .set_result (result )
116
+ else :
117
+ metrics ._hit_count += 1
118
+ result = await future
111
119
112
120
# fill missing caches
113
121
for cache in miss :
@@ -180,35 +188,32 @@ async def get_all(nodes: Sequence[Node[R]]) -> List[R]:
180
188
fetch : Dict [str , Node ] = {} # missing nodes, need to load from source
181
189
if len (pending ) > 0 :
182
190
wait : List [
183
- Tuple [str , CachedAwaitable ]
191
+ Tuple [str , Future ]
184
192
] = [] # nodes already loading by others, only need to wait here
185
193
for node in pending .values ():
186
- awaitable = _awaits .get (node .full_key (), None )
187
- if awaitable is None :
194
+ future = _awaits .get (node .full_key (), None )
195
+ if future is None :
188
196
fetch [node .full_key ()] = node
189
197
else :
190
- wait .append ((node .full_key (), awaitable ))
198
+ wait .append ((node .full_key (), future ))
191
199
192
200
# update metrics
193
201
metrics ._miss_count += len (fetch )
194
202
metrics ._hit_count += len (nodes ) - len (fetch )
195
203
196
204
if len (fetch ) > 0 :
197
205
fetcher = Fetcher ()
198
- aws : List [Tuple [str , CachedAwaitable ]] = []
206
+ aws : List [Tuple [str , Future ]] = []
199
207
for key , node in fetch .items ():
200
- awaitable = CachedAwaitable (Future (), metrics )
201
- # set event directly
202
- awaitable .event = Event ()
203
- _awaits [key ] = awaitable
204
- aws .append ((key , awaitable ))
208
+ future = Future ()
209
+ _awaits [key ] = future
210
+ aws .append ((key , future ))
205
211
fetcher .data = await _get_multi (
206
212
nodes [0 ], remote_caches , fetch , missing , metrics
207
213
)
208
214
# load done, set all events and results
209
215
for aw in aws :
210
- cast (Event , aw [1 ].event ).set ()
211
- aw [1 ].result = fetcher .data [aw [0 ]]
216
+ aw [1 ].set_result (fetcher .data [aw [0 ]])
212
217
for ks , vs in fetcher .data .items ():
213
218
results [ks ] = vs
214
219
for w in wait :
0 commit comments