Skip to content

Commit

Permalink
Local cache
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Nov 4, 2022
1 parent bddb60b commit cd8b380
Show file tree
Hide file tree
Showing 20 changed files with 480 additions and 222 deletions.
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ setup-local: reset-db
server: export DL_ENV=dev
server: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT)

server-profile: export DL_ENV=dev
server-profile: internal/pb/fs.pb.go internal/pb/fs_grpc.pb.go
go run cmd/server/main.go --dburi $(DB_URI) --port $(GRPC_PORT) --profile cpu.prof --log-level info
Expand Down Expand Up @@ -236,5 +236,11 @@ load-test-new: reset-db
load-test-update:
$(call load-test,Update,update.json,10000,20)

load-test-update-large:
$(call load-test,Update,update-large.json,10000,20)

load-test-get:
$(call load-test,Get,get_all.json,100000,40,5000)
$(call load-test,Get,get.json,100000,40,5000)

load-test-get-compress:
$(call load-test,GetCompress,get-compress.json,100000,40,5000)
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ buildGoModule rec {
version = "0.3.5";
src = ./.;
proxyVendor = true; # Fixes: cannot query module due to -mod=vendor running make install
vendorSha256 = "sha256-s0mChRHEwRkK1Gc8IioWbRMJoVAexNTA9y6WeTl41ZM=";
vendorSha256 = "sha256-h5FEdWFBr2IUP9A/XeZ7KPFNmSdUVa6+3YEZXfYTJyU=";

outputs = [ "out" "client" "server" "webui" "assets" "migrations" ];

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"project": "{{ randomInt 0 9 }}{{ randomInt 0 9 }}",
"project": "{{ randInt 0 99 }}",
"queries": [{ "path": "", "is_prefix": true }]
}
]
6 changes: 6 additions & 0 deletions development/scripts/load-tests/get.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"project": "{{ randInt 0 99 }}",
"queries": [{ "path": "", "is_prefix": true }]
}
]
12 changes: 12 additions & 0 deletions development/scripts/load-tests/update-large.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[
{
"project": "{{ randInt 0 99 }}",
"object": {
"path": "{{ randAlpha 16 }}",
"mode": 420,
"size": 10,
"deleted": false,
"content": "{{ b64enc (randAlpha (randInt 500 10_000)) }}"
}
}
]
6 changes: 3 additions & 3 deletions development/scripts/load-tests/update.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[
{
"project": "{{ randomInt 0 9 }}{{ randomInt 0 9 }}",
"project": "{{ randInt 0 99 }}",
"object": {
"path": "{{ randomInt 0 9 }}{{ randomInt 0 9 }}",
"path": "{{ randAlpha 16 }}",
"mode": 420,
"size": 10,
"deleted": false,
"content": "{{ randomInt 0 9 }}{{ randomInt 0 9 }}{{ randomInt 0 9 }}{{ randomInt 0 9 }}"
"content": "{{ b64enc (randAlpha 4) }}"
}
}
]
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/gadget-inc/dateilager
go 1.18

require (
github.com/dgraph-io/ristretto v0.1.1
github.com/gadget-inc/fsdiff v0.4.4
github.com/go-chi/chi/v5 v5.0.7
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/jackc/pgx/v5 v5.0.2
github.com/jackc/puddle/v2 v2.0.0
github.com/klauspost/compress v1.15.11
github.com/minio/sha256-simd v1.0.0
github.com/o1egl/paseto v1.0.0
Expand All @@ -32,16 +34,18 @@ require (
github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle/v2 v2.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.1.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand All @@ -68,6 +69,12 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -376,6 +383,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 h1:OK7RB6t2WQX54srQQYSXMW8dF5C6/8+oA/s5QBmmto4=
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
152 changes: 142 additions & 10 deletions internal/db/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package db
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"

"github.com/dgraph-io/ristretto"
"github.com/jackc/pgx/v5"
"github.com/jackc/puddle/v2"
"github.com/klauspost/compress/s2"
"github.com/minio/sha256-simd"
)

const (
MB = 1000 * 1000
DecoderPoolSize = 1000
)

type Hash struct {
H1 [16]byte
H2 [16]byte
Expand All @@ -32,8 +38,27 @@ func (h *Hash) Bytes() []byte {
return hash
}

// Stolen from Go's standard library
// And optimized for the Hash type
const hextable = "0123456789abcdef"

func (h *Hash) Hex() string {
return hex.EncodeToString(h.Bytes())
buffer := make([]byte, 64)
idx := 0

for _, v := range h.H1 {
buffer[idx] = hextable[v>>4]
buffer[idx+1] = hextable[v&0x0f]
idx += 2
}

for _, v := range h.H2 {
buffer[idx] = hextable[v>>4]
buffer[idx+1] = hextable[v&0x0f]
idx += 2
}

return string(buffer)
}

type ContentEncoder struct {
Expand All @@ -51,7 +76,7 @@ func NewContentEncoder() *ContentEncoder {
}
}

func (c *ContentEncoder) Encode(content []byte) ([]byte, error) {
func (c *ContentEncoder) Encode(content DecodedContent) (EncodedContent, error) {
_, err := c.writer.Write(content)
if err != nil {
return nil, err
Expand All @@ -62,40 +87,147 @@ func (c *ContentEncoder) Encode(content []byte) ([]byte, error) {
return nil, err
}

output := c.buffer.Bytes()
tmpOutput := c.buffer.Bytes()

c.buffer.Truncate(0)
c.buffer.Reset()
c.writer.Reset(c.buffer)

if output == nil {
output = []byte("")
if tmpOutput == nil {
return []byte(""), nil
}

output := make([]byte, len(tmpOutput))
copy(output, tmpOutput)
return output, nil
}

func (c *ContentEncoder) Close() error {
return c.writer.Close()
}

type ContentDecoder struct {
buffer *bytes.Buffer
reader *s2.Reader
}

func NewContentDecoder() *ContentDecoder {
var buffer bytes.Buffer
reader := s2.NewReader(nil)

return &ContentDecoder{
buffer: &buffer,
reader: reader,
}
}

func (c *ContentDecoder) Decoder(encoded []byte) ([]byte, error) {
c.reader.Reset(bytes.NewBuffer(encoded))
output, err := io.ReadAll(c.reader)
func (c *ContentDecoder) Decode(encoded EncodedContent) (DecodedContent, error) {
c.buffer.Reset()
c.reader.Reset(bytes.NewReader(encoded))

_, err := io.Copy(c.buffer, c.reader)
if err != nil {
return nil, err
}

output := make([]byte, c.buffer.Len())
copy(output, c.buffer.Bytes())
return output, nil
}

type ContentLookup struct {
cache *ristretto.Cache
decoders *puddle.Pool[*ContentDecoder]
}

func NewContentLookup() (*ContentLookup, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 100_000,
MaxCost: 1_000 * MB,
BufferItems: 64,
})
if err != nil {
return nil, err
}

constructor := func(context.Context) (*ContentDecoder, error) {
return NewContentDecoder(), nil
}

decoders, err := puddle.NewPool(&puddle.Config[*ContentDecoder]{Constructor: constructor, MaxSize: DecoderPoolSize})
if err != nil {
return nil, err
}

return &ContentLookup{
cache: cache,
decoders: decoders,
}, nil
}

func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup map[Hash]bool) (map[Hash]DecodedContent, error) {
var notFound []Hash
contents := make(map[Hash]DecodedContent, len(hashesToLookup))

decoder, err := cl.decoders.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("cannot acquire content decoder: %w", err)
}
defer decoder.Release()

for hash, isEncoded := range hashesToLookup {
value, found := cl.cache.Get(hash.Hex())
if found {
if isEncoded {
decoded, err := decoder.Value().Decode(value.(EncodedContent))
if err != nil {
return nil, fmt.Errorf("cannot decode value from cache %v: %w", hash.Hex(), err)
}
contents[hash] = decoded
} else {
contents[hash] = value.(DecodedContent)
}
} else {
notFound = append(notFound, hash)
}
}

if len(notFound) > 0 {
rows, err := tx.Query(ctx, `
SELECT (hash).h1, (hash).h2, bytes
FROM dl.contents
WHERE hash = ANY($1::hash[])
`, notFound)
if err != nil {
return nil, fmt.Errorf("lookup missing hash contents: %w", err)
}

for rows.Next() {
var hash Hash
var value []byte

err = rows.Scan(&hash.H1, &hash.H2, &value)
if err != nil {
return nil, fmt.Errorf("content lookup scan: %w", err)
}

// This is a content addressable cache, any cached value will never be updated
cl.cache.Set(hash.Hex(), value, int64(len(value)))

if hashesToLookup[hash] {
decoded, err := decoder.Value().Decode(value)
if err != nil {
return nil, fmt.Errorf("cannot decode value from content table %v: %w", hash.Hex(), err)
}
contents[hash] = decoded
} else {
contents[hash] = value
}
}
}

return contents, nil
}

func RandomContents(ctx context.Context, tx pgx.Tx, sample float32) ([]Hash, error) {
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT (hash).h1, (hash).h2
Expand Down
6 changes: 3 additions & 3 deletions internal/db/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func CloneToProject(ctx context.Context, tx pgx.Tx, source int64, target int64,

sourceBuilder := newQueryBuilder(source, VersionRange{
To: version,
}, objectQuery).withHashes(true)
}, objectQuery)
sourceSql, sourceArgs := sourceBuilder.build()

targetBuilder := newQueryBuilder(target, VersionRange{
To: newTargetVersion - 1,
}, objectQuery).withHashes(true).withArgsOffset(len(sourceArgs))
}, objectQuery).withArgsOffset(len(sourceArgs))
targetSql, targetArgs := targetBuilder.build()

sql := fmt.Sprintf(`
Expand All @@ -62,7 +62,7 @@ func CloneToProject(ctx context.Context, tx pgx.Tx, source int64, target int64,
), to_remove AS (
%s
EXCEPT
SELECT path, mode, size, is_cached, bytes, packed, deleted, h1, h2
SELECT path, mode, size, is_cached, packed, deleted, h1, h2
FROM live_source_objects
)
UPDATE dl.objects o
Expand Down
Loading

0 comments on commit cd8b380

Please sign in to comment.