Skip to content
This repository was archived by the owner on Mar 5, 2024. It is now read-only.

[WIP] Port sequins storage engine to boltdb. #74

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
52 changes: 5 additions & 47 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,17 @@ SOURCES = $(shell find . -name '*.go' -not -name '*_test.go')
TEST_SOURCES = $(shell find . -name '*_test.go')
BUILD = $(shell pwd)/build

VENDORED_LIBS = $(BUILD)/lib/libsparkey.a $(BUILD)/lib/libsnappy.a $(BUILD)/lib/libzookeeper_mt.a

UNAME := $(shell uname)
ifeq ($(UNAME), Darwin)
CGO_PREAMBLE_LDFLAGS = -lstdc++
else
CGO_PREAMBLE_LDFLAGS = -lrt -lm -lstdc++
endif

CGO_PREAMBLE = CGO_CFLAGS="-I$(BUILD)/include -I$(BUILD)/include/zookeeper" CGO_LDFLAGS="$(VENDORED_LIBS) $(CGO_PREAMBLE_LDFLAGS)"


all: sequins

vendor/snappy/configure:
cd vendor/snappy && ./autogen.sh

vendor/snappy/Makefile: vendor/snappy/configure
cd vendor/snappy && ./configure --prefix=$(BUILD)

$(BUILD)/lib/libsnappy.a: vendor/snappy/Makefile
cd vendor/snappy && make install

vendor/sparkey/configure:
cd vendor/sparkey && autoreconf --install

vendor/sparkey/Makefile: vendor/sparkey/configure $(BUILD)/lib/libsnappy.a
cd vendor/sparkey && ./configure --prefix=$(BUILD) LDFLAGS="-L$(BUILD)/lib" CPPFLAGS="-I$(BUILD)/include"

$(BUILD)/lib/libsparkey.a: vendor/sparkey/Makefile
cd vendor/sparkey && make install

vendor/zookeeper/configure:
cd vendor/zookeeper && autoreconf --install -v -I $(CURDIR)/vendor/zookeeper/

vendor/zookeeper/Makefile: vendor/zookeeper/configure
cd vendor/zookeeper && ./configure --prefix=$(BUILD)

$(BUILD)/lib/libzookeeper_mt.a: vendor/zookeeper/Makefile
cd vendor/zookeeper && make install

$(BUILD)/bin/go-bindata:
go build -o $(BUILD)/bin/go-bindata ./vendor/github.com/jteeuwen/go-bindata/go-bindata/

status.tmpl.go: status.tmpl $(BUILD)/bin/go-bindata
$(BUILD)/bin/go-bindata -o status.tmpl.go status.tmpl

sequins: $(SOURCES) status.tmpl.go $(BUILD)/lib/libsparkey.a $(BUILD)/lib/libsnappy.a $(BUILD)/lib/libzookeeper_mt.a
$(CGO_PREAMBLE) go build -ldflags "-X main.sequinsVersion=$(TRAVIS_TAG)"
sequins: $(SOURCES) status.tmpl.go
go build -ldflags "-X main.sequinsVersion=$(TRAVIS_TAG)"

release: sequins
./sequins --version
Expand All @@ -63,20 +25,16 @@ release: sequins
tar -cvzf $(RELEASE_NAME).tar.gz $(RELEASE_NAME)

test: $(TEST_SOURCES)
$(CGO_PREAMBLE) go test -short -race -timeout 2m $(shell go list ./... | grep -v vendor)
go test -short -race -timeout 2m $(shell go list ./... | grep -v vendor)
# This test exercises some sync.Pool code, so it should be run without -race
# as well (sync.Pool doesn't ever share objects under -race).
$(CGO_PREAMBLE) go test -timeout 30s ./blocks -run TestBlockParallelReads
go test -timeout 30s ./blocks -run TestBlockParallelReads

test_functional: sequins $(TEST_SOURCES)
$(CGO_PREAMBLE) go test -timeout 10m -run "^TestCluster"
go test -timeout 10m -run "^TestCluster"

clean:
rm -rf $(BUILD)
rm -f vendor/snappy/configure
cd vendor/snappy && make distclean; true
rm -f vendor/sparkey/configure
cd vendor/sparkey && make distclean; true
rm -f vendor/zookeeper/configure
cd vendor/zookeeper && make distclean; true
rm -f sequins sequins-*.tar.gz
Expand Down
19 changes: 8 additions & 11 deletions blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"path/filepath"
"sync"

"github.com/bsm/go-sparkey"
"github.com/boltdb/bolt"
)

// A block represents a chunk of data, all of the keys of which match a
Expand All @@ -22,11 +22,10 @@ type Block struct {
Partition int
Count int

minKey []byte
maxKey []byte
sparkeyReader *sparkey.HashReader
iterPool iterPool
lock sync.RWMutex
minKey []byte
maxKey []byte
db *bolt.DB
lock sync.RWMutex
}

func loadBlock(storePath string, manifest BlockManifest) (*Block, error) {
Expand All @@ -40,13 +39,12 @@ func loadBlock(storePath string, manifest BlockManifest) (*Block, error) {
maxKey: manifest.MaxKey,
}

reader, err := sparkey.Open(filepath.Join(storePath, b.Name))
db, err := bolt.Open(filepath.Join(storePath, b.Name), 0600, nil)
if err != nil {
return nil, fmt.Errorf("opening block: %s", err)
}

b.sparkeyReader = reader
b.iterPool = newIterPool(reader)
b.db = db
return b, nil
}

Expand All @@ -66,8 +64,7 @@ func (b *Block) Get(key []byte) (*Record, error) {
func (b *Block) Close() {
b.lock.Lock()
defer b.lock.Unlock()

b.sparkeyReader.Close()
b.db.Close()
}

func (b *Block) manifest() BlockManifest {
Expand Down
2 changes: 2 additions & 0 deletions blocks/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blocks

import (
"errors"
"log"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (store *BlockStore) Add(key, value []byte) error {
block, ok := store.newBlocks[partition]
var err error
if !ok {
log.Println("New Block")
block, err = newBlock(store.path, partition, store.compression, store.blockSize)
if err != nil {
return err
Expand Down
63 changes: 33 additions & 30 deletions blocks/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"path/filepath"

"github.com/bsm/go-sparkey"
"github.com/boltdb/bolt"
"github.com/pborman/uuid"
)

Expand All @@ -17,9 +17,9 @@ type blockWriter struct {
count int
partition int

path string
id string
sparkeyWriter *sparkey.LogWriter
path string
id string
db *bolt.DB
}

func newBlock(storePath string, partition int, compression Compression, blockSize int) (*blockWriter, error) {
Expand All @@ -29,21 +29,25 @@ func newBlock(storePath string, partition int, compression Compression, blockSiz
path := filepath.Join(storePath, name)
log.Println("Initializing block at", path)

c := sparkey.COMPRESSION_NONE
if compression == SnappyCompression {
c = sparkey.COMPRESSION_SNAPPY
}
options := &sparkey.Options{Compression: c, CompressionBlockSize: blockSize}
sparkeyWriter, err := sparkey.CreateLogWriter(path, options)
// TODO: Compression
/*
c := sparkey.COMPRESSION_NONE
if compression == SnappyCompression {
c = sparkey.COMPRESSION_SNAPPY
}
options := &sparkey.Options{Compression: c, CompressionBlockSize: blockSize}
sparkeyWriter, err := sparkey.CreateLogWriter(path, options)
*/
db, err := bolt.Open(path, 0600, nil)
if err != nil {
return nil, fmt.Errorf("initializing block %s: %s", path, err)
}

bw := &blockWriter{
partition: partition,
path: path,
id: id,
sparkeyWriter: sparkeyWriter,
partition: partition,
path: path,
id: id,
db: db,
}

return bw, nil
Expand All @@ -64,21 +68,21 @@ func (bw *blockWriter) add(key, value []byte) error {
copy(bw.minKey, key)
}

return bw.sparkeyWriter.Put(key, value)
return bw.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(filepath.Base(bw.path)))
if err != nil {
return err
}
err = bucket.Put(key, value)
return err

})
}

func (bw *blockWriter) save() (*Block, error) {
err := bw.sparkeyWriter.WriteHashFile(0)
if err != nil {
return nil, err
}

err = bw.sparkeyWriter.Close()
if err != nil {
return nil, err
}
err := bw.db.Close()

reader, err := sparkey.Open(bw.path)
db, err := bolt.Open(bw.path, 0600, nil)
if err != nil {
return nil, fmt.Errorf("opening block: %s", err)
}
Expand All @@ -89,17 +93,16 @@ func (bw *blockWriter) save() (*Block, error) {
Partition: bw.partition,
Count: bw.count,

minKey: bw.minKey,
maxKey: bw.maxKey,
sparkeyReader: reader,
iterPool: newIterPool(reader),
minKey: bw.minKey,
maxKey: bw.maxKey,
db: db,
}

return b, nil
}

func (bw *blockWriter) close() {
bw.sparkeyWriter.Close()
bw.db.Close()
}

func (bw *blockWriter) delete() {
Expand Down
46 changes: 0 additions & 46 deletions blocks/iterpool.go

This file was deleted.

48 changes: 19 additions & 29 deletions blocks/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,36 @@ package blocks
import (
"io"

"github.com/bsm/go-sparkey"
"bytes"

"github.com/boltdb/bolt"
)

// A Record is one key/value pair loaded from a block.
type Record struct {
ValueLen uint64

iterPool iterPool
iter *sparkey.HashIter
reader io.Reader
closed bool
value []byte
reader io.Reader
closed bool
}

func (b *Block) get(key []byte) (*Record, error) {
iter, err := b.iterPool.getIter()
if err != nil {
// In the case of an error, the iter is no longer considered valid.
return nil, err
}

if err := iter.Seek(key); err != nil {
return nil, err
}

if iter.State() != sparkey.ITERATOR_ACTIVE {
// The key doesn't exist, so put the iterator back in the pool.
b.iterPool.Put(iter)
r := &Record{}

b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(b.Name))
value := bucket.Get(key)
r.value = value
r.ValueLen = uint64(len(value))
r.reader = bytes.NewReader(value)
return nil
})

if r.value == nil {
return nil, nil
}

return &Record{
ValueLen: iter.ValueLen(),
iterPool: b.iterPool,
iter: iter,
reader: iter.ValueReader(),
}, nil
return r, nil
}

func (r *Record) Read(b []byte) (int, error) {
Expand All @@ -50,9 +44,5 @@ func (r *Record) WriteTo(w io.Writer) (n int64, err error) {
}

func (r *Record) Close() error {
if r.iter != nil {
r.iterPool.Put(r.iter)
}

return nil
}
Loading