Skip to content

Commit

Permalink
Removed reference to gopkg.in/getlantern/yaml.v1
Browse files Browse the repository at this point in the history
  • Loading branch information
oxtoacart committed Aug 1, 2019
2 parents e47afda + e12ecc5 commit 3b12eb0
Show file tree
Hide file tree
Showing 14 changed files with 439 additions and 167 deletions.
17 changes: 13 additions & 4 deletions cluster_follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,17 @@ func (db *DB) processFollowers(stop <-chan interface{}) {

for stream := range newlyJoinedStreams {
var earliestOffset wal.Offset
hasOffset := false
for _, partition := range streams[stream] {
for _, table := range partition.tables {
for _, specs := range table.followersByPartition {
for _, spec := range specs {
if earliestOffset == nil || earliestOffset.After(spec.offset) {
for followerID, spec := range specs {
db.log.Debugf("Offset on %v for %v is %v", stream, followerID, spec.offset)
if !hasOffset {
db.log.Debugf("Follower %v has first offset on %v, using it: %v", followerID, stream, spec.offset)
hasOffset = true
} else if earliestOffset.After(spec.offset) {
db.log.Debugf("Follower %v has earlier offset on %v than %v, using it: %v", followerID, stream, earliestOffset, spec.offset)
earliestOffset = spec.offset
}
}
Expand All @@ -271,10 +277,11 @@ func (db *DB) processFollowers(stop <-chan interface{}) {

stopWALReader := stopWALReaders[stream]
if stopWALReader != nil {
db.log.Debugf("Stopping WAL reader for %v", stream)
stopWALReader()
}

// Start following wal
db.log.Debugf("Start following WAL for %v", stream)
stopWALReader, err := db.followWAL(stream, earliestOffset, streams[stream], requests)
if err != nil {
db.log.Errorf("Unable to start following wal: %v", err)
Expand Down Expand Up @@ -553,12 +560,14 @@ func (db *DB) followWAL(stream string, offset wal.Offset, partitions map[string]
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
db.log.Debugf("Unable to read from stream '%v': %v", stream, err)
db.log.Debugf("Unable to read from stream '%v', continuing: %v", stream, err)
continue
}
select {
case <-stop:
return
case <-stopDB:
return
default:
// keep going
}
Expand Down
7 changes: 4 additions & 3 deletions core/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/getlantern/bytemap"
"github.com/getlantern/goexpr"
"github.com/getlantern/zenodb/bytetree"
"github.com/getlantern/zenodb/encoding"
"github.com/getlantern/zenodb/expr"
"sort"
"strings"
"time"
)

var (
Expand Down
6 changes: 3 additions & 3 deletions expr/floatequals.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ const epsilon float64 = 0.00001

// AssertFloatEquals does a fuzzy comparison of floats.
func AssertFloatEquals(t *testing.T, a, b float64) bool {
return assert.True(t, fuzzyEquals(epsilon, a, b), fmt.Sprintf("Floats did not match. Expected: %f Actual: %f", a, b))
return assert.True(t, FuzzyEquals(epsilon, a, b), fmt.Sprintf("Floats did not match. Expected: %f Actual: %f", a, b))
}

// AssertFloatWithin checks whether a given float is within e error (decimal) of
// another float
func AssertFloatWithin(t *testing.T, e, expected float64, actual float64, msg string) bool {
return assert.True(t, fuzzyEquals(e, expected, actual), fmt.Sprintf("%v -- Floats not within %f of each other. Expected: %f Actual: %f", msg, e, expected, actual))
return assert.True(t, FuzzyEquals(e, expected, actual), fmt.Sprintf("%v -- Floats not within %f of each other. Expected: %f Actual: %f", msg, e, expected, actual))
}

// courtesy of https://gist.github.com/cevaris/bc331cbe970b03816c6b
func fuzzyEquals(e, a, b float64) bool {
func FuzzyEquals(e, a, b float64) bool {
if a == b {
return true
}
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/getlantern/waitforserver v1.0.1
github.com/getlantern/wal v0.0.0-20190717163743-5ddbeeeef8a2
github.com/getlantern/withtimeout v0.0.0-20160829163843-511f017cd913
github.com/getlantern/yaml v0.0.0-20160317154340-79303eb9c0d9
github.com/getlantern/yaml v0.0.0-20190801163808-0c9bb1ebf426
github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-redis/redis v6.15.2+incompatible // indirect
Expand Down Expand Up @@ -78,7 +78,6 @@ require (
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/grpc v1.20.1
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/getlantern/yaml.v1 v1.0.0-20140912054538-97d86b60f57e // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/redis.v5 v5.2.9
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
Expand Down
8 changes: 3 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ github.com/getlantern/wal v0.0.0-20190717163743-5ddbeeeef8a2 h1:aSA+hmLCAVNgyZV9
github.com/getlantern/wal v0.0.0-20190717163743-5ddbeeeef8a2/go.mod h1:Hze0AAPSbJ1yhANLfxUSVTX1u1ULuAOkSmpjImoeHyk=
github.com/getlantern/withtimeout v0.0.0-20160829163843-511f017cd913 h1:YK3YNyTsa+1BPWOoN0F79rrjHyfNx4MAoFEvzwQU+dY=
github.com/getlantern/withtimeout v0.0.0-20160829163843-511f017cd913/go.mod h1:bwttrA0oacoHdL476F60prypY1oC++WLtVexumgZozY=
github.com/getlantern/yaml v0.0.0-20140912054538-97d86b60f57e h1:tMFqZ7fjEhZ/DwXdODpaMZldTdLT8KrdTkZxwsZivOs=
github.com/getlantern/yaml v0.0.0-20140912054538-97d86b60f57e/go.mod h1:SoTXbOvaDC1bH3QrlkU5kz/h12tU/hN54wSMUCdgEXs=
github.com/getlantern/yaml v0.0.0-20160317154340-79303eb9c0d9 h1:NZ55Ks6D+CNH4BQwsv/j/Vp6sXMaBgqqsauCq37TIAU=
github.com/getlantern/yaml v0.0.0-20160317154340-79303eb9c0d9/go.mod h1:SoTXbOvaDC1bH3QrlkU5kz/h12tU/hN54wSMUCdgEXs=
github.com/getlantern/yaml v0.0.0-20190801163808-0c9bb1ebf426 h1:lb2OElfhZzfgvNQym79ONvv4yvDh/gShHkLQI6qzriA=
github.com/getlantern/yaml v0.0.0-20190801163808-0c9bb1ebf426/go.mod h1:SoTXbOvaDC1bH3QrlkU5kz/h12tU/hN54wSMUCdgEXs=
github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c h1:iRTj5SRYwbvsygdwVp+y9kZT145Y1s6xOPpeOEIeGc4=
github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c/go.mod h1:Gja1A+xZ9BoviGJNA2E9vFkPjjsl+CoJxSXiQM1UXtw=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
Expand Down Expand Up @@ -222,7 +220,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/getlantern/yaml.v1 v1.0.0-20140912054538-97d86b60f57e/go.mod h1:QLMVhmQbZYZI9Ctc9MVAXEPxAbYCPub4mBuhAf6kPWE=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/redis.v5 v5.2.9 h1:MNZYOLPomQzZMfpN3ZtD1uyJ2IDonTTlxYiV/pEApiw=
Expand All @@ -232,5 +229,6 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 h1:kb0VV7NuIojvRfzwslQeP3yArBqJHW9tOl4t38VS1jM=
gopkg.in/vmihailenco/msgpack.v2 v2.9.1/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
2 changes: 1 addition & 1 deletion insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (t *table) processWALInserts() {
for {
data, err := t.wal.Read()
if err != nil {
panic(fmt.Errorf("Unable to read from WAL: %v", err))
t.db.Panic(fmt.Errorf("Unable to read from WAL: %v", err))
}
in <- &walRead{data, t.wal.Offset(), 0}
}
Expand Down
42 changes: 19 additions & 23 deletions row_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (t *table) readWALOffsets(filename string) (common.OffsetsBySource, bool, e
defer file.Close()
opened = true

fileVersion := versionFor(filename)
fileVersion := t.versionFor(filename)

r := snappy.NewReader(file)

Expand Down Expand Up @@ -341,9 +341,6 @@ func (rs *rowStore) iterate(ctx context.Context, outFields core.Fields, includeM
func (rs *rowStore) processFlush(ms *memstore, allowSort bool) (*memstore, time.Duration) {
shouldSort := allowSort && rs.t.shouldSort()
willSort := "not sorted"
if shouldSort {
}

if shouldSort {
defer rs.t.stopSorting()
willSort = "sorted"
Expand All @@ -365,27 +362,27 @@ func (rs *rowStore) processFlush(ms *memstore, allowSort bool) (*memstore, time.

out, err := ioutil.TempFile("", "nextrowstore")
if err != nil {
panic(err)
rs.t.db.Panic(err)
}
defer out.Close()

highWaterMark := fs.flush(out, rs.fields, nil, ms.offsetsBySource, ms, shouldSort, disallowRaw)
highWaterMark, rowCount := fs.flush(out, rs.fields, nil, ms.offsetsBySource, ms, shouldSort, disallowRaw)

fi, err := out.Stat()
if err != nil {
fs.t.log.Errorf("Unable to stat output file to get size: %v", err)
}

if closeErr := out.Close(); closeErr != nil {
panic(closeErr)
rs.t.db.Panic(closeErr)
}

// Note - we left-pad the unix nano value to the widest possible length to
// ensure lexicographical sort matches time-based sort (e.g. on directory
// listing).
newFileStoreName := filepath.Join(rs.opts.dir, fmt.Sprintf("filestore_%020d_%d.dat", time.Now().UnixNano(), CurrentFileVersion))
if renameErr := os.Rename(out.Name(), newFileStoreName); renameErr != nil {
panic(renameErr)
rs.t.db.Panic(renameErr)
}

fs = &fileStore{rs.t, rs, rs.fields, newFileStoreName}
Expand All @@ -397,41 +394,43 @@ func (rs *rowStore) processFlush(ms *memstore, allowSort bool) (*memstore, time.

flushDuration := time.Now().Sub(start)
if fi != nil {
rs.t.log.Debugf("Flushed to %v in %v, size %v. %v.", newFileStoreName, flushDuration, humanize.Bytes(uint64(fi.Size())), willSort)
rs.t.log.Debugf("Flushed %d rows to %v in %v, size %v. %v.", rowCount, newFileStoreName, flushDuration, humanize.Bytes(uint64(fi.Size())), willSort)
} else {
rs.t.log.Debugf("Flushed to %v in %v. %v.", newFileStoreName, flushDuration, willSort)
rs.t.log.Debugf("Flushed %d rows to %v in %v. %v.", rowCount, newFileStoreName, flushDuration, willSort)
}

rs.t.updateHighWaterMarkDisk(highWaterMark)
return ms, flushDuration
}

func (fs *fileStore) flush(out *os.File, fields core.Fields, filter goexpr.Expr, offsetsBySource common.OffsetsBySource, ms *memstore, shouldSort bool, disallowRaw bool) int64 {
func (fs *fileStore) flush(out *os.File, fields core.Fields, filter goexpr.Expr, offsetsBySource common.OffsetsBySource, ms *memstore, shouldSort bool, disallowRaw bool) (int64, int) {
cout, err := fs.createOutWriter(out, fields, offsetsBySource, shouldSort)
if err != nil {
panic(err)
fs.t.db.Panic(err)
}

highWaterMark := int64(0)
truncateBefore := fs.t.truncateBefore()
rowCount := 0
write := func(key bytemap.ByteMap, columns []encoding.Sequence, raw []byte) (bool, error) {
nextHighWaterMark, err := fs.doWrite(cout, fields, filter, truncateBefore, shouldSort, key, columns, raw)
if err != nil {
panic(err)
fs.t.db.Panic(err)
}
if nextHighWaterMark > highWaterMark {
highWaterMark = nextHighWaterMark
}
rowCount++
return true, nil
}

fs.iterate(fields, ms, !shouldSort, !disallowRaw, write)
err = cout.Close()
if err != nil {
panic(err)
fs.t.db.Panic(err)
}

return highWaterMark
return highWaterMark, rowCount
}

func (fs *fileStore) createOutWriter(out *os.File, fields core.Fields, offsetsBySource common.OffsetsBySource, shouldSort bool) (io.WriteCloser, error) {
Expand Down Expand Up @@ -479,7 +478,7 @@ func (fs *fileStore) createOutWriter(out *os.File, fields core.Fields, offsetsBy

cout, sortErr := emsort.New(sout, chunk, less, int(fs.t.db.maxMemoryBytes())/10)
if sortErr != nil {
panic(sortErr)
fs.t.db.Panic(sortErr)
}

return cout, nil
Expand Down Expand Up @@ -580,7 +579,7 @@ func (fs *fileStore) doWrite(cout io.WriteCloser, fields core.Fields, filter goe
func (rs *rowStore) writeOffsets(offsetsBySource common.OffsetsBySource) error {
out, err := ioutil.TempFile("", "nextoffset")
if err != nil {
panic(err)
rs.t.db.Panic(err)
}
defer out.Close()

Expand Down Expand Up @@ -611,9 +610,6 @@ func (rs *rowStore) removeOldFiles(stop <-chan interface{}) {
if err != nil {
rs.t.log.Errorf("Unable to list data files in %v: %v", rs.opts.dir, err)
}
for _, file := range files {
rs.t.log.Debug(file.Name())
}
// Note - the list of files is sorted by name, which in our case is the
// timestamp, so that means they're sorted chronologically. We don't want
// to delete the last file in the list because that's the current one.
Expand Down Expand Up @@ -840,7 +836,7 @@ func (fs *fileStore) iterate(outFields []core.Field, ms *memstore, okayToReuseBu

func (fs *fileStore) info(r io.Reader) (common.OffsetsBySource, string, core.Fields, error) {
var offsetsBySource common.OffsetsBySource
fileVersion := versionFor(fs.filename)
fileVersion := fs.t.versionFor(fs.filename)
// File contains header with field info, use it
headerLength := uint32(0)
lengthErr := binary.Read(r, encoding.Binary, &headerLength)
Expand Down Expand Up @@ -874,15 +870,15 @@ func (fs *fileStore) info(r io.Reader) (common.OffsetsBySource, string, core.Fie
return offsetsBySource, fieldsString, fileFields, nil
}

func versionFor(filename string) int {
func (t *table) versionFor(filename string) int {
fileVersion := 0
parts := strings.Split(filepath.Base(filename), "_")
if len(parts) == 3 {
versionString := strings.Split(parts[2], ".")[0]
var versionErr error
fileVersion, versionErr = strconv.Atoi(versionString)
if versionErr != nil {
panic(fmt.Errorf("Unable to determine file version for file %v: %v", filename, versionErr))
t.db.Panic(fmt.Errorf("Unable to determine file version for file %v: %v", filename, versionErr))
}
}
return fileVersion
Expand Down
8 changes: 7 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Server struct {
RPCKeepaliveInterval time.Duration
RPCKeepAliveTimeout time.Duration
ListenTimeout time.Duration
MaxReconnectWaitTime time.Duration
Panic func(err interface{})

Schema string
AliasesFile string
Expand Down Expand Up @@ -158,6 +160,7 @@ func (s *Server) Serve() (func() error, error) {
ClusterQueryConcurrency: s.ClusterQueryConcurrency,
ClusterQueryTimeout: s.ClusterQueryTimeout,
MaxFollowAge: s.MaxFollowAge,
Panic: s.Panic,
}

s.log = dbOpts.BuildLogger()
Expand Down Expand Up @@ -187,7 +190,10 @@ func (s *Server) Serve() (func() error, error) {
s.log.Debugf("Handling queries for: %v", s.Feed)
dbOpts.RegisterRemoteQueryHandler = func(db *zenodb.DB, partition int, query planner.QueryClusterFN) {
minWaitTime := 50 * time.Millisecond
maxWaitTime := 5 * time.Second
maxWaitTime := s.MaxReconnectWaitTime
if maxWaitTime <= 0 {
maxWaitTime = 5 * time.Second
}

for _, _client := range clients {
client := _client
Expand Down
Loading

0 comments on commit 3b12eb0

Please sign in to comment.