Skip to content

data cache + batch timeout #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: v7
Choose a base branch
from
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: "1.20"

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
run: make test
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- 1.18
- 1.20

env:
- GO111MODULE=on
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.PHONY: test
test:
$(info * testing ...)
go test ./...
cd example && go test ./...
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# DataLoader
[![GoDoc](https://godoc.org/gopkg.in/graph-gophers/dataloader.v3?status.svg)](https://godoc.org/github.com/graph-gophers/dataloader)
[![GoDoc](https://godoc.org/gopkg.in/graph-gophers/dataloader.v7?status.svg)](https://pkg.go.dev/github.com/graph-gophers/dataloader/v7)
[![Build Status](https://travis-ci.org/graph-gophers/dataloader.svg?branch=master)](https://travis-ci.org/graph-gophers/dataloader)

This is an implementation of [Facebook's DataLoader](https://github.com/facebook/dataloader) in Golang.
Expand Down
19 changes: 19 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,22 @@ func (c *NoCache[K, V]) Delete(context.Context, K) bool { return false }

// Clear is a NOOP
func (c *NoCache[K, V]) Clear() { return }

// DataCache interface for cache data on batchFunc level
type DataCache[K comparable, V any] interface {
Get(context.Context, K) (V, bool)
Set(context.Context, K, V)
Delete(context.Context, K) bool
Clear()
}

type DataCacheMany[K comparable, V any] interface {
GetMany(context.Context, []K) (map[K]V, error)
}

type nocache[K comparable, V any] struct{}

func (nocache[K, V]) Get(context.Context, K) (V, bool) { var v V; return v, false }
func (nocache[K, V]) Set(context.Context, K, V) {}
func (nocache[K, V]) Delete(context.Context, K) bool { return false }
func (nocache[K, V]) Clear() {}
23 changes: 23 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dataloader

import "context"

// DetachedContext returns a new context detached from the lifetime
// of ctx, but which still returns the values of ctx.
//
// DetachedContext can be used to maintain the trace context required
// to correlate events, but where the operation is "fire-and-forget",
// and should not be affected by the deadline or cancellation of ctx.
func DetachedContext(ctx context.Context) context.Context {
return &detachedContext{Context: context.Background(), orig: ctx}
}

type detachedContext struct {
context.Context
orig context.Context
}

// Value returns c.orig.Value(key).
func (c *detachedContext) Value(key interface{}) interface{} {
return c.orig.Value(key)
}
123 changes: 119 additions & 4 deletions dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Loader[K comparable, V any] struct {
// implementation could be used as long as it implements the `Cache` interface.
cacheLock sync.Mutex
cache Cache[K, V]

dataCache DataCache[K, V]
// should we clear the cache on each batch?
// this would allow batching but no long term caching
clearCacheOnBatch bool
Expand Down Expand Up @@ -99,6 +101,9 @@ type Loader[K comparable, V any] struct {

// can be set to trace calls to dataloader
tracer Tracer[K, V]

// timeout for batchFunc
timeout time.Duration
}

// Thunk is a function that will block until the value (*Result) it contains is resolved.
Expand All @@ -112,6 +117,7 @@ type ThunkMany[V any] func() ([]V, []error)

// type used to on input channel
type batchRequest[K comparable, V any] struct {
ctx context.Context
key K
channel chan *Result[V]
}
Expand All @@ -126,6 +132,13 @@ func WithCache[K comparable, V any](c Cache[K, V]) Option[K, V] {
}
}

// WithDataCache sets the BatchLoader cache for data (not thunk)
func WithDataCache[K comparable, V any](c DataCache[K, V]) Option[K, V] {
return func(l *Loader[K, V]) {
l.dataCache = c
}
}

// WithBatchCapacity sets the batch capacity. Default is 0 (unbounded).
func WithBatchCapacity[K comparable, V any](c int) Option[K, V] {
return func(l *Loader[K, V]) {
Expand Down Expand Up @@ -172,6 +185,15 @@ func WithTracer[K comparable, V any](tracer Tracer[K, V]) Option[K, V] {
}
}

// WithTimeout set timeout for batchFunc
func WithTimeout[K comparable, V any](t time.Duration) Option[K, V] {
return func(l *Loader[K, V]) {
if t > 0 {
l.timeout = t
}
}
}

// NewBatchedLoader constructs a new Loader with given options.
func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] {
loader := &Loader[K, V]{
Expand All @@ -190,6 +212,10 @@ func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Opti
loader.cache = NewCache[K, V]()
}

if loader.dataCache == nil {
loader.dataCache = &nocache[K, V]{}
}

if loader.tracer == nil {
loader.tracer = NoopTracer[K, V]{}
}
Expand Down Expand Up @@ -244,12 +270,12 @@ func (l *Loader[K, V]) Load(originalContext context.Context, key K) Thunk[V] {

// this is sent to batch fn. It contains the key and the channel to return
// the result on
req := &batchRequest[K, V]{key, c}
req := &batchRequest[K, V]{ctx, key, c}

l.batchLock.Lock()
// start the batch window if it hasn't already started.
if l.curBatcher == nil {
l.curBatcher = l.newBatcher(l.silent, l.tracer)
l.curBatcher = l.newBatcher(l.silent, l.tracer, l.dataCache, l.timeout)
// start the current batcher batch function
go l.curBatcher.batch(originalContext)
// start a sleeper for the current batcher
Expand Down Expand Up @@ -351,6 +377,7 @@ func (l *Loader[K, V]) LoadMany(originalContext context.Context, keys []K) Thunk
func (l *Loader[K, V]) Clear(ctx context.Context, key K) Interface[K, V] {
l.cacheLock.Lock()
l.cache.Delete(ctx, key)
l.dataCache.Delete(ctx, key)
l.cacheLock.Unlock()
return l
}
Expand All @@ -360,6 +387,7 @@ func (l *Loader[K, V]) Clear(ctx context.Context, key K) Interface[K, V] {
func (l *Loader[K, V]) ClearAll() Interface[K, V] {
l.cacheLock.Lock()
l.cache.Clear()
l.dataCache.Clear()
l.cacheLock.Unlock()
return l
}
Expand All @@ -382,6 +410,7 @@ func (l *Loader[K, V]) reset() {

if l.clearCacheOnBatch {
l.cache.Clear()
l.dataCache.Clear()
}
}

Expand All @@ -391,16 +420,20 @@ type batcher[K comparable, V any] struct {
finished bool
silent bool
tracer Tracer[K, V]
cache DataCache[K, V]
timeout time.Duration
}

// newBatcher returns a batcher for the current requests
// all the batcher methods must be protected by a global batchLock
func (l *Loader[K, V]) newBatcher(silent bool, tracer Tracer[K, V]) *batcher[K, V] {
func (l *Loader[K, V]) newBatcher(silent bool, tracer Tracer[K, V], cache DataCache[K, V], timeout time.Duration) *batcher[K, V] {
return &batcher[K, V]{
input: make(chan *batchRequest[K, V], l.inputCap),
batchFn: l.batchFn,
silent: silent,
tracer: tracer,
cache: cache,
timeout: timeout,
}
}

Expand All @@ -412,6 +445,75 @@ func (b *batcher[K, V]) end() {
}
}

// batchWithCache wrap user batchFunc for cache real data
func batchWithCache[K comparable, V any](originalContext context.Context, timeout time.Duration, batchfn BatchFunc[K, V], keys []K, cache DataCache[K, V]) []*Result[V] {
resultMap := make(map[K]*Result[V], len(keys))
uniqK := make(map[K]struct{}, len(keys))
reqK := make([]K, 0, len(keys))

ctx, cancel := context.WithTimeout(DetachedContext(originalContext), timeout)
defer cancel()

cacheMany, cacheManyOk := cache.(DataCacheMany[K, V])
if cacheManyOk {
items, err := cacheMany.GetMany(ctx, keys)
if err != nil {
log.Printf("Dataloader: Error in datacache.GetMany function: %s", err.Error())
} else {
for k, v := range items {
resultMap[k] = &Result[V]{Data: v}
uniqK[k] = struct{}{}
}

for _, k := range keys {
if _, ok := uniqK[k]; !ok {
reqK = append(reqK, k)
}
}
}
} else {
var cacheOk, keyOk bool
var cacheRes V
for _, k := range keys {
_, keyOk = uniqK[k]
if keyOk {
continue
}

cacheRes, cacheOk = cache.Get(originalContext, k)
if cacheOk {
resultMap[k] = &Result[V]{Data: cacheRes}
uniqK[k] = struct{}{}
continue
}

reqK = append(reqK, k)
uniqK[k] = struct{}{}
}
}

if len(reqK) > 0 {
items := batchfn(ctx, reqK)
for i, item := range items {
k := reqK[i]
resultMap[k] = item

if item.Error == nil {
cache.Set(originalContext, k, item.Data)
}
}
}

result := make([]*Result[V], 0, len(keys))
for _, k := range keys {
if val, ok := resultMap[k]; ok {
result = append(result, val)
}
}

return result
}

// execute the batch of all items in queue
func (b *batcher[K, V]) batch(originalContext context.Context) {
var (
Expand All @@ -429,6 +531,14 @@ func (b *batcher[K, V]) batch(originalContext context.Context) {
ctx, finish := b.tracer.TraceBatch(originalContext, keys)
defer finish(items)

// if used WithTimeout, detache original context and add new timeout for batch function
if b.timeout > 0 {
ctx = &detachedContext{Context: context.Background(), orig: ctx}
var cancel func()
ctx, cancel = context.WithTimeout(ctx, b.timeout)
defer cancel()
}

func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -442,7 +552,12 @@ func (b *batcher[K, V]) batch(originalContext context.Context) {
log.Printf("Dataloader: Panic received in batch function: %v\n%s", panicErr, buf)
}
}()
items = b.batchFn(ctx, keys)
// no panic if dataloader create without b.cache
if b.cache != nil {
items = batchWithCache(ctx, b.timeout, b.batchFn, keys, b.cache)
} else {
items = b.batchFn(ctx, keys)
}
}()

if panicErr != nil {
Expand Down
Loading