Skip to content

Commit

Permalink
Merge pull request #77 from getlantern/ox/issue76
Browse files Browse the repository at this point in the history
Attempt at fixing unit test
  • Loading branch information
oxtoacart authored May 9, 2019
2 parents 800115b + 806281c commit 6ee5882
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
19 changes: 13 additions & 6 deletions zenodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"sync/atomic"
"time"

"github.com/cloudfoundry/gosigar"
sigar "github.com/cloudfoundry/gosigar"
"github.com/dustin/go-humanize"
"github.com/getlantern/goexpr/geo"
"github.com/getlantern/goexpr/isp"
Expand Down Expand Up @@ -266,6 +266,17 @@ func NewDB(opts *DBOpts) (*DB, error) {
return db, err
}

// FlushAll flushes all tables
func (db *DB) FlushAll() {
db.tablesMutex.Lock()
for name, table := range db.tables {
log.Debugf("Force flushing table: %v", name)
table.forceFlush()
}
db.tablesMutex.Unlock()
log.Debug("Done force flushing tables")
}

func (db *DB) Close() {
log.Debug("Closing")
db.tablesMutex.Lock()
Expand All @@ -274,12 +285,8 @@ func (db *DB) Close() {
stream.Close()
delete(db.streams, name)
}
for name, table := range db.tables {
log.Debugf("Force flushing table: %v", name)
table.forceFlush()
}
log.Debug("Done force flushing tables")
db.tablesMutex.Unlock()
db.FlushAll()
}

func registerAliases(aliasesFile string) {
Expand Down
53 changes: 31 additions & 22 deletions zenodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
. "github.com/getlantern/zenodb/expr"
"github.com/getlantern/zenodb/planner"

"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

const (
Expand All @@ -39,19 +40,20 @@ func TestRoundTimeUp(t *testing.T) {
}

func TestSingleDB(t *testing.T) {
doTest(t, false, nil, func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(string, func(*table, bool))) {
doTest(t, false, nil, func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(), func(string, func(*table, bool))) {
db, err := NewDB(&DBOpts{
Dir: filepath.Join(tmpDir, "leader"),
SchemaFile: tmpFile,
VirtualTime: true,
MaxMemoryRatio: 0.00001,
ClusterQueryConcurrency: clusterQueryConcurrency,
})
if !assert.NoError(t, err, "Unable to create leader DB") {
t.Fatal()
}
return db, func(t time.Time) {
db.clock.Advance(t)
}, func() {
db.FlushAll()
}, func(tableName string, cb func(tbl *table, isFollower bool)) {
cb(db.getTable(tableName), false)
}
Expand All @@ -75,14 +77,13 @@ func TestClusterNoPushdownMultiPartition(t *testing.T) {
}

func doTestCluster(t *testing.T, numPartitions int, partitionBy []string) {
doTest(t, true, partitionBy, func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(string, func(*table, bool))) {
doTest(t, true, partitionBy, func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(), func(string, func(*table, bool))) {
leader, err := NewDB(&DBOpts{
Dir: filepath.Join(tmpDir, "leader"),
SchemaFile: tmpFile,
VirtualTime: true,
Passthrough: true,
NumPartitions: numPartitions,
MaxMemoryRatio: 0.00001,
Dir: filepath.Join(tmpDir, "leader"),
SchemaFile: tmpFile,
VirtualTime: true,
Passthrough: true,
NumPartitions: numPartitions,
})
if !assert.NoError(t, err, "Unable to create leader DB") {
t.Fatal()
Expand All @@ -92,12 +93,11 @@ func doTestCluster(t *testing.T, numPartitions int, partitionBy []string) {
for i := 0; i < numPartitions; i++ {
part := i
follower, followerErr := NewDB(&DBOpts{
Dir: filepath.Join(tmpDir, fmt.Sprintf("follower%d", i)),
SchemaFile: tmpFile,
VirtualTime: true,
NumPartitions: numPartitions,
Partition: part,
MaxMemoryRatio: 0.00001,
Dir: filepath.Join(tmpDir, fmt.Sprintf("follower%d", i)),
SchemaFile: tmpFile,
VirtualTime: true,
NumPartitions: numPartitions,
Partition: part,
Follow: func(f func() *common.Follow, cb func(data []byte, newOffset wal.Offset) error) {
leader.Follow(f(), cb)
},
Expand Down Expand Up @@ -141,6 +141,10 @@ func doTestCluster(t *testing.T, numPartitions int, partitionBy []string) {
for _, follower := range followers {
follower.clock.Advance(t)
}
}, func() {
for _, follower := range followers {
follower.FlushAll()
}
}, func(tableName string, cb func(tbl *table, isFollower bool)) {
cb(leader.getTable(tableName), false)
for _, follower := range followers {
Expand All @@ -150,7 +154,7 @@ func doTestCluster(t *testing.T, numPartitions int, partitionBy []string) {
})
}

func doTest(t *testing.T, isClustered bool, partitionKeys []string, buildDB func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(string, func(*table, bool)))) {
func doTest(t *testing.T, isClustered bool, partitionKeys []string, buildDB func(tmpDir string, tmpFile string) (*DB, func(time.Time), func(), func(string, func(*table, bool)))) {
rand.Seed(0)
epoch := time.Date(2015, time.January, 1, 2, 3, 4, 5, time.UTC)

Expand Down Expand Up @@ -196,7 +200,7 @@ Test_a:
return
}

db, advanceClock, modifyTable := buildDB(tmpDir, tmpFile.Name())
db, advanceClock, flushTables, modifyTable := buildDB(tmpDir, tmpFile.Name())

// TODO: verify that we can actually select successfully from the view.
schemaB := schemaA + `
Expand Down Expand Up @@ -404,9 +408,8 @@ view_a:
})
}

var wg sync.WaitGroup

for _, includeMemStore := range []bool{true, false} {
runTests := func(includeMemStore bool) {
var wg sync.WaitGroup
wg.Add(1)
go testSimpleQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
Expand All @@ -421,9 +424,15 @@ view_a:
wg.Add(1)
go testAggregateQuery(&wg, t, db, includeMemStore, now, epoch, resolution, asOf, until, scalingFactor)
}
wg.Wait()
}

wg.Wait()
log.Debug("Running tests with mem stores")
runTests(true)
log.Debug("Flushing tables")
flushTables()
log.Debug("Running tests with disk only")
runTests(false)
}

func testSimpleQuery(wg *sync.WaitGroup, t *testing.T, db *DB, includeMemStore bool, epoch time.Time, resolution time.Duration) {
Expand Down

0 comments on commit 6ee5882

Please sign in to comment.