Skip to content

Commit 9a80dac

Browse files
authored
Merge pull request #6 from C-Pro/feature/tx
Locker wrapper for atomic cache operations
2 parents d2b00cd + 637fbf8 commit 9a80dac

8 files changed

+440
-36
lines changed

.github/workflows/test.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
go-version: "1.20"
1919

2020
- name: Test
21-
run: go test -v -bench . -covermode=atomic -coverprofile=coverage.out -race ./...
21+
run: go test -v -covermode=atomic -coverprofile=coverage.out -race ./...
2222

2323
- name: Upload coverage reports to Codecov
2424
uses: codecov/codecov-action@v3

README.md

+82-35
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ if err != nil {
9999
fmt.Println(v)
100100
```
101101

102+
`Updater` provides `ListByPrefix` function, but it can be used only if underlying cache supports it (is a `KV` wrapper).
103+
Otherwize it will panic.
104+
102105
### Sharding
103106

104107
If you intend to use cache in *higlhy* concurrent manner (16+ cores and 100k+ RPS). It may make sense to shard it.
@@ -140,7 +143,7 @@ Internally `KV` maintains trie structure to store keys to be able to quickly fin
140143
This wrapper has some limitations:
141144
* `KV` only supports keys of type `string`.
142145
* Lexicographical order is maintained on the byte level, so it will work as expected for ASCII strings, but may not work for other encodings.
143-
* If you wrap `KV` with another wrapper you can't use `ListByPrefix`. Don't do it!
146+
* `Updater` and `Locker` wrappers provide `ListByPrefix` function, that will call underlying `KV` implementation. But if you wrap `KV` with `Sharded` wrapper, you will loose this functionality. In other words it would not make sense to wrap `KV` with `Sharded` wrapper.
144147

145148
```go
146149
cache := NewMapCache[string, string]()
@@ -156,6 +159,42 @@ This wrapper has some limitations:
156159
// Output: [bar bar1 bar2 bar3]
157160
```
158161

162+
### Locker
163+
164+
This wrapper is useful when you need to make several operations on the cache atomically. For example you store account balances in the cache and want to transfer some amount from one account to another:
165+
166+
```go
167+
locker := NewLocker[int, int](NewMapCache[int, int]())
168+
// Acquire RW lock "transaction".
169+
tx := locker.Lock()
170+
171+
balA, _ := tx.Get(accA)
172+
balB, _ := tx.Get(accB)
173+
174+
amount := 100
175+
176+
balA += amount
177+
balB -= amount
178+
179+
tx.Set(accA, balA)
180+
tx.Set(accB, balB)
181+
182+
// Unlock the cache.
183+
tx.Unlock()
184+
```
185+
186+
The `Locker` itself does not implement `Geche` interface, but `Tx` object returned by `Lock` or `RLock` method does.
187+
Be careful to follow these rules (will lead to panics):
188+
* do not use `Set` and `Del` on read-only `Tx` acquired with `RLock`.
189+
* do not use `Tx` after `Unlock` call.
190+
* do not `Unlock` `Tx` that was unlocked before.
191+
And do not forget to `Unlock` the `Tx` object, otherwise it will lead to lock to be held forever.
192+
193+
Returned `Tx` object is not a transaction in a sense that it does not
194+
allow rollback, but it provides atomicity and isolation guarantees.
195+
196+
`Locker` provides `ListByPrefix` function, but it can only be used if underlying cache implementation supports it (is a `KV` wrapper). Otherwize it will panic.
197+
159198
## Benchmarks
160199

161200
Test suite contains a couple of benchmarks to compare the speed difference between old-school generic implementation using `interface{}` or `any` to hold cache values versus using generics.
@@ -166,6 +205,8 @@ There are two types of benchmarks:
166205
* `BenchmarkSet` only times the `Set` operation that allocates all the memory, and usually is the most resource intensive.
167206
* `BenchmarkEverything` repeatedly does one of three operations (Get/Set/Del). The probability for each type of operation to be executed is 0.9/0.05/0.05 respectively. Each operation is executed on randomly generated key, there are totally 1 million distinct keys, so total cache size will be limited too.
168207

208+
Another benchmark `BenchmarkKVListByPrefix` lists `KV` wrapper's `ListByPrefix` operation. It times getting all values matching particular prefix in a cache with 1 million keys. Benchmark is arranged so each call returns 10 records.
209+
169210
Benchmarking four simple cache implementations shows that generic cache (`MapCache`) is faster than cache that uses an empty interface to store any type of values (`AnyCache`), but slower than implementations that use concrete types (`StringCache`) and skip on thread safety (`UnsafeCache`).
170211
Generic `MapTTLCache` is on par with `AnyCache` but it is to be expected as it does more work keeping linked list for fast invalidation. `RingBuffer` performs the best because all the space it needs is preallocated during the initialization, and actual cache size is limited.
171212

@@ -174,24 +215,27 @@ Note that `stringCache`, `unsafeCache`, `anyCache` implementations are unexporte
174215
The results below are not to be treated as absolute values. Actual cache operation latency will depend on many variables such as CPU speed, key cardinality, number of concurrent operations, whether the allocation happen during the operation or underlying structure already has the allocated space and so on.
175216

176217
```shell
177-
$ go test -bench . -benchmem -benchtime=30s
178-
goos: darwin
179-
goarch: arm64
218+
$ go test -bench=. -benchmem -benchtime=10s .
219+
goos: linux
220+
goarch: amd64
180221
pkg: github.com/c-pro/geche
181-
BenchmarkSet/MapCache-10 238400880 151.4 ns/op 7 B/op 0 allocs/op
182-
BenchmarkSet/StringCache-10 240488745 149.7 ns/op 7 B/op 0 allocs/op
183-
BenchmarkSet/UnsafeCache-10 348324978 102.9 ns/op 7 B/op 0 allocs/op
184-
BenchmarkSet/MapTTLCache-10 89931351 338.2 ns/op 7 B/op 0 allocs/op
185-
BenchmarkSet/RingBuffer-10 215545424 166.1 ns/op 7 B/op 0 allocs/op
186-
BenchmarkSet/AnyCache-10 241277830 149.4 ns/op 8 B/op 1 allocs/op
187-
BenchmarkEverything/MapCache-10 333596707 110.7 ns/op 0 B/op 0 allocs/op
188-
BenchmarkEverything/StringCache-10 327069014 112.7 ns/op 0 B/op 0 allocs/op
189-
BenchmarkEverything/UnsafeCache-10 535376823 68.41 ns/op 0 B/op 0 allocs/op
190-
BenchmarkEverything/MapTTLCache-10 222688748 166.0 ns/op 0 B/op 0 allocs/op
191-
BenchmarkEverything/RingBuffer-10 671402931 53.76 ns/op 0 B/op 0 allocs/op
192-
BenchmarkEverything/AnyCache-10 195838436 195.8 ns/op 8 B/op 1 allocs/op
193-
PASS
194-
ok github.com/c-pro/geche 577.123s
222+
cpu: Intel(R) Xeon(R) Platinum 8358 CPU @ 2.60GHz
223+
BenchmarkSet/MapCache-32 41473179 284.4 ns/op 1 B/op 0 allocs/op
224+
BenchmarkSet/StringCache-32 64817786 182.5 ns/op 1 B/op 0 allocs/op
225+
BenchmarkSet/UnsafeCache-32 80224212 125.2 ns/op 1 B/op 0 allocs/op
226+
BenchmarkSet/MapTTLCache-32 14296934 758.3 ns/op 15 B/op 0 allocs/op
227+
BenchmarkSet/RingBuffer-32 64152157 244.9 ns/op 0 B/op 0 allocs/op
228+
BenchmarkSet/KVMapCache-32 10701508 1152 ns/op 10 B/op 0 allocs/op
229+
BenchmarkSet/AnyCache-32 67699846 288.9 ns/op 2 B/op 0 allocs/op
230+
BenchmarkEverything/MapCache-32 100000000 106.7 ns/op 0 B/op 0 allocs/op
231+
BenchmarkEverything/StringCache-32 100000000 100.3 ns/op 0 B/op 0 allocs/op
232+
BenchmarkEverything/UnsafeCache-32 135556000 87.31 ns/op 0 B/op 0 allocs/op
233+
BenchmarkEverything/MapTTLCache-32 100000000 175.6 ns/op 0 B/op 0 allocs/op
234+
BenchmarkEverything/RingBuffer-32 121507983 94.82 ns/op 0 B/op 0 allocs/op
235+
BenchmarkEverything/ShardedRingBufferUpdater-32 32976999 371.6 ns/op 18 B/op 0 allocs/op
236+
BenchmarkEverything/KVMapCache-32 90192560 199.9 ns/op 1 B/op 0 allocs/op
237+
BenchmarkEverything/AnyCache-32 100000000 231.1 ns/op 8 B/op 1 allocs/op
238+
BenchmarkKVListByPrefix-32 3167788 3720 ns/op 131 B/op 3 allocs/op
195239
```
196240

197241
# Parallel benchmarks
@@ -201,27 +245,31 @@ ok github.com/c-pro/geche 577.123s
201245
I implemented sharding anyway because why not. But it is a separate wrapper, so does not complicate existing codebase.
202246

203247
```shell
204-
$ go test -benchtime=30s -benchmem -bench .
205-
goos: darwin
206-
goarch: arm64
248+
$ go test -benchtime=10s -benchmem -bench .
249+
goos: linux
250+
goarch: amd64
207251
pkg: cache_bench
208-
BenchmarkEverythingParallel/MapCache-10 332130052 133.3 ns/op 0 B/op 0 allocs/op
209-
BenchmarkEverythingParallel/MapTTLCache-10 234690624 205.4 ns/op 0 B/op 0 allocs/op
210-
BenchmarkEverythingParallel/RingBuffer-10 441694302 86.82 ns/op 0 B/op 0 allocs/op
211-
BenchmarkEverythingParallel/github.com/Code-Hex/go-generics-cache-10 191366336 198.8 ns/op 7 B/op 0 allocs/op
212-
BenchmarkEverythingParallel/github.com/Yiling-J/theine-go-10 367538067 100.7 ns/op 0 B/op 0 allocs/op
213-
BenchmarkEverythingParallel/github.com/jellydator/ttlcache-10 136785907 262.4 ns/op 43 B/op 0 allocs/op
214-
BenchmarkEverythingParallel/github.com/erni27/imcache-10 226084180 179.2 ns/op 2 B/op 0 allocs/op
215-
BenchmarkEverythingParallel/github.com/dgraph-io/ristretto-10 466729495 80.03 ns/op 30 B/op 1 allocs/op
216-
BenchmarkEverythingParallel/github.com/hashicorp/golang-lru/v2-10 193697901 216.5 ns/op 0 B/op 0 allocs/op
217-
PASS
218-
ok cache_bench 496.390s
252+
cpu: Intel(R) Xeon(R) Platinum 8358 CPU @ 2.60GHz
253+
BenchmarkEverythingParallel/MapCache-32 100000000 170.1 ns/op 0 B/op 0 allocs/op
254+
BenchmarkEverythingParallel/MapTTLCache-32 90510988 198.9 ns/op 0 B/op 0 allocs/op
255+
BenchmarkEverythingParallel/RingBuffer-32 85731428 196.8 ns/op 0 B/op 0 allocs/op
256+
BenchmarkEverythingParallel/ShardedMapCache-32 273706551 43.51 ns/op 0 B/op 0 allocs/op
257+
BenchmarkEverythingParallel/ShardedMapTTLCache-32 282491904 44.37 ns/op 0 B/op 0 allocs/op
258+
BenchmarkEverythingParallel/ShardedRingBuffer-32 284756061 40.78 ns/op 0 B/op 0 allocs/op
259+
BenchmarkEverythingParallel/github.com/Code-Hex/go-generics-cache-32 43165059 294.2 ns/op 7 B/op 0 allocs/op
260+
BenchmarkEverythingParallel/github.com/Yiling-J/theine-go-32 186976719 64.51 ns/op 0 B/op 0 allocs/op
261+
BenchmarkEverythingParallel/github.com/jellydator/ttlcache-32 29943469 376.3 ns/op 43 B/op 0 allocs/op
262+
BenchmarkEverythingParallel/github.com/erni27/imcache-32 531496862 23.35 ns/op 50 B/op 1 allocs/op
263+
BenchmarkEverythingParallel/github.com/dgraph-io/ristretto-32 100000000 108.5 ns/op 27 B/op 1 allocs/op
264+
BenchmarkEverythingParallel/github.com/hashicorp/golang-lru/v2-32 43857675 307.1 ns/op 0 B/op 0 allocs/op
265+
BenchmarkEverythingParallel/github.com/egregors/kesh-32 33866130 428.7 ns/op 83 B/op 2 allocs/op
266+
BenchmarkEverythingParallel/KVMapCache-32 43328151 401.2 ns/op 112 B/op 0 allocs/op
219267
```
220268

221269
And now on 32 CPU machine we clearly see performance degradation due to lock contention. Sharded implementations are about 4 times faster.
222-
Notice the Imcache result. Is too good to be true 😅
270+
Notice the Imcache result. Crazy fast! 😅
223271

224-
KV wrapper result is worse then other caches, but it is expected as it keeps key level ordering on insert and does extra work to cleanup the key in trie on delete.
272+
KV wrapper result is worse then other caches, but it is expected as it keeps key index allowing prefix search with deterministic order, that other caches do not allow. It updates trie structure on `Set` and does extra work to cleanup the key on `Del`.
225273

226274
```shell
227275
$ go test -benchtime=10s -benchmem -bench .
@@ -246,5 +294,4 @@ BenchmarkEverythingParallel/KVMapCache-32 33
246294
PASS
247295
```
248296

249-
250297
Concurrent comparison benchmark is located in a [separate repository](https://github.com/C-Pro/cache-benchmarks) to avoid pulling unnecessary dependencies in the library.

bench_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ func BenchmarkEverything(b *testing.B) {
158158
"KVMapCache",
159159
NewKV[string](NewMapCache[string, string]()),
160160
},
161+
{
162+
"LockerMapCache",
163+
NewLocker[string, string](NewMapCache[string, string]()).Lock(),
164+
},
161165
}
162166

163167
data := genTestData(10_000_000)

common_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ func TestCommon(t *testing.T) {
159159
{"MapTTLCache", func() Geche[string, string] { return NewMapTTLCache[string, string](ctx, time.Minute, time.Minute) }},
160160
{"RingBuffer", func() Geche[string, string] { return NewRingBuffer[string, string](100) }},
161161
{"KVMapCache", func() Geche[string, string] { return NewKV[string](NewMapCache[string, string]()) }},
162+
{"LockerMapCache", func() Geche[string, string] {
163+
return NewLocker[string, string](NewMapCache[string, string]()).Lock()
164+
}},
162165
{
163166
"ShardedMapCache", func() Geche[string, string] {
164167
return NewSharded[string](

locker.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package geche
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
)
7+
8+
// Locker is a wrapper for any Geche interface implementation,
9+
// that provides Lock() and RLock() methods that return Tx object
10+
// implementing Geche interface.
11+
// Returned object is not a transaction in a sense that it does not
12+
// allow commit/rollback or isolation level higher than READ COMMITTED.
13+
// It only provides a way to do multiple cache operations atomically.
14+
type Locker[K comparable, V any] struct {
15+
cache Geche[K, V]
16+
mux *sync.RWMutex
17+
}
18+
19+
// NewLocker creates a new Locker instance.
20+
func NewLocker[K comparable, V any](
21+
cache Geche[K, V],
22+
) *Locker[K, V] {
23+
t := Locker[K, V]{
24+
cache: cache,
25+
mux: &sync.RWMutex{},
26+
}
27+
28+
return &t
29+
}
30+
31+
// Tx is a "transaction" object returned by Locker.Lock() and Locker.RLock() methods.
32+
// See Locker for more details.
33+
type Tx[K comparable, V any] struct {
34+
cache Geche[K, V]
35+
mux *sync.RWMutex
36+
writable bool
37+
unlocked int32
38+
}
39+
40+
// Retuns read/write locked cache object.
41+
func (t *Locker[K, V]) Lock() *Tx[K, V] {
42+
t.mux.Lock()
43+
return &Tx[K, V]{
44+
cache: t.cache,
45+
mux: t.mux,
46+
writable: true,
47+
}
48+
}
49+
50+
// Retuns read-only locked cache object.
51+
func (t *Locker[K, V]) RLock() *Tx[K, V] {
52+
t.mux.RLock()
53+
return &Tx[K, V]{
54+
cache: t.cache,
55+
mux: t.mux,
56+
writable: false,
57+
}
58+
}
59+
60+
// Unlock underlying cache.
61+
func (tx *Tx[K, V]) Unlock() {
62+
if atomic.LoadInt32(&tx.unlocked) == 1 {
63+
panic("unlocking already unlocked transaction")
64+
}
65+
atomic.StoreInt32(&tx.unlocked, 1)
66+
if tx.writable {
67+
tx.mux.Unlock()
68+
return
69+
}
70+
tx.mux.RUnlock()
71+
}
72+
73+
// Set key-value pair in the underlying locked cache.
74+
// Will panic if called on RLocked Tx.
75+
func (tx *Tx[K, V]) Set(key K, value V) {
76+
if atomic.LoadInt32(&tx.unlocked) == 1 {
77+
panic("cannot use unlocked transaction")
78+
}
79+
if !tx.writable {
80+
panic("cannot set in read-only transaction")
81+
}
82+
tx.cache.Set(key, value)
83+
}
84+
85+
// Get value by key from the underlying sharded cache.
86+
func (tx *Tx[K, V]) Get(key K) (V, error) {
87+
if atomic.LoadInt32(&tx.unlocked) == 1 {
88+
panic("cannot use unlocked transaction")
89+
}
90+
return tx.cache.Get(key)
91+
}
92+
93+
// Del key from the underlying locked cache.
94+
// Will panic if called on RLocked Tx.
95+
func (tx *Tx[K, V]) Del(key K) error {
96+
if atomic.LoadInt32(&tx.unlocked) == 1 {
97+
panic("cannot use unlocked transaction")
98+
}
99+
if !tx.writable {
100+
panic("cannot del in read-only transaction")
101+
}
102+
return tx.cache.Del(key)
103+
}
104+
105+
// Snapshot returns a shallow copy of the cache data.
106+
func (tx *Tx[K, V]) Snapshot() map[K]V {
107+
if atomic.LoadInt32(&tx.unlocked) == 1 {
108+
panic("cannot use unlocked transaction")
109+
}
110+
return tx.cache.Snapshot()
111+
}
112+
113+
// Len returns total number of elements in the cache.
114+
func (tx *Tx[K, V]) Len() int {
115+
if atomic.LoadInt32(&tx.unlocked) == 1 {
116+
panic("cannot use unlocked transaction")
117+
}
118+
return tx.cache.Len()
119+
}
120+
121+
// ListByPrefix should only be called if underlying cache is KV.
122+
// Otherwise it will panic.
123+
func (tx *Tx[K, V]) ListByPrefix(prefix string) ([]V, error) {
124+
if atomic.LoadInt32(&tx.unlocked) == 1 {
125+
panic("cannot use unlocked transaction")
126+
}
127+
kv, ok := any(tx.cache).(*KV[V])
128+
if !ok {
129+
panic("cache does not support ListByPrefix")
130+
}
131+
132+
return kv.ListByPrefix(prefix)
133+
}

0 commit comments

Comments
 (0)