Skip to content

Commit

Permalink
Merge pull request #84 from getlantern/ox/percentilefix3
Browse files Browse the repository at this point in the history
Fixed optimized (wrapped) percentiles
  • Loading branch information
oxtoacart authored Sep 16, 2019
2 parents 4c383e9 + 323dd95 commit 7b3204c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 74 deletions.
45 changes: 19 additions & 26 deletions expr/percentile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,26 @@ import (
// expressions) so it is best to keep these relatively low cardinality.
func PERCENTILE(value interface{}, percentile interface{}, min float64, max float64, precision int) Expr {
valueExpr := exprFor(value)
switch t := valueExpr.(type) {
case *ptile:
return PERCENTILEOPT(t, exprFor(percentile))
case *ptileOptimized:
return PERCENTILEOPT(t, exprFor(percentile))
default:
// Remove aggregates
valueExpr = valueExpr.DeAggregate()
// Figure out what precision to use for HDR
hdrPrecision := precision
if hdrPrecision < 1 {
hdrPrecision = 1
} else if hdrPrecision > 5 {
hdrPrecision = 5
}
// Remove aggregates
valueExpr = valueExpr.DeAggregate()
// Figure out what precision to use for HDR
hdrPrecision := precision
if hdrPrecision < 1 {
hdrPrecision = 1
} else if hdrPrecision > 5 {
hdrPrecision = 5
}

sampleHisto := hdrhistogram.New(scaleToInt(min, precision), scaleToInt(max, precision), hdrPrecision)
numCounts := len(sampleHisto.Export().Counts)
return &ptile{
Value: BOUNDED(valueExpr, min, max),
Percentile: exprFor(percentile),
Min: scaleToInt(min, precision),
Max: scaleToInt(max, precision),
Precision: precision,
HDRPrecision: hdrPrecision,
Width: (1+numCounts)*width64bits + valueExpr.EncodedWidth(),
}
sampleHisto := hdrhistogram.New(scaleToInt(min, precision), scaleToInt(max, precision), hdrPrecision)
numCounts := len(sampleHisto.Export().Counts)
return &ptile{
Value: BOUNDED(valueExpr, min, max),
Percentile: exprFor(percentile),
Min: scaleToInt(min, precision),
Max: scaleToInt(max, precision),
Precision: precision,
HDRPrecision: hdrPrecision,
Width: (1+numCounts)*width64bits + valueExpr.EncodedWidth(),
}
}

Expand Down
50 changes: 8 additions & 42 deletions expr/percentile_optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,40 @@ package expr

import (
"fmt"
"time"

"github.com/getlantern/goexpr"
"github.com/getlantern/msgpack"
)

// PERCENTILEOPT returns an optimized PERCENTILE that wraps an existing
// PERCENTILE and reuses its storage.
// PERCENTILE.
func PERCENTILEOPT(wrapped interface{}, percentile interface{}) Expr {
var expr Expr
switch t := wrapped.(type) {
case *ptileOptimized:
expr = t.wrapped
expr = &t.ptile
default:
expr = wrapped.(*ptile)
}
return &ptileOptimized{Wrapped: expr, wrapped: expr.(*ptile), Percentile: exprFor(percentile)}
return &ptileOptimized{Wrapped: expr, ptile: *expr.(*ptile), Percentile: exprFor(percentile)}
}

type ptileOptimized struct {
ptile
Wrapped Expr
wrapped *ptile
Percentile Expr
}

func (e *ptileOptimized) Validate() error {
return e.wrapped.Validate()
}

func (e *ptileOptimized) EncodedWidth() int {
return 0
}

func (e *ptileOptimized) Shift() time.Duration {
return 0
}

func (e *ptileOptimized) Update(b []byte, params Params, metadata goexpr.Params) ([]byte, float64, bool) {
return b, 0, false
}

func (e *ptileOptimized) Merge(b []byte, x []byte, y []byte) ([]byte, []byte, []byte) {
return b, x, y
}

func (e *ptileOptimized) SubMergers(subs []Expr) []SubMerge {
return nil
}

func (e *ptileOptimized) Get(b []byte) (float64, bool, []byte) {
histo, wasSet, remain := e.wrapped.load(b)
histo, wasSet, remain := e.ptile.load(b)
percentile, _, remain := e.Percentile.Get(remain)
if !wasSet {
return 0, wasSet, remain
}
return e.wrapped.calc(histo, percentile), wasSet, remain
}

func (e *ptileOptimized) IsConstant() bool {
return false
}

func (e *ptileOptimized) DeAggregate() Expr {
return PERCENTILE(e.wrapped.DeAggregate(), e.Percentile.DeAggregate(), scaleFromInt(e.wrapped.Min, e.wrapped.Precision), scaleFromInt(e.wrapped.Max, e.wrapped.Precision), e.wrapped.Precision)
return e.ptile.calc(histo, percentile), wasSet, remain
}

func (e *ptileOptimized) String() string {
return fmt.Sprintf("PERCENTILE(%v, %v)", e.wrapped.String(), e.Percentile)
return fmt.Sprintf("PERCENTILE(%v, %v)", e.Wrapped.String(), e.Percentile)
}

func (e *ptileOptimized) DecodeMsgpack(dec *msgpack.Decoder) error {
Expand All @@ -81,7 +47,7 @@ func (e *ptileOptimized) DecodeMsgpack(dec *msgpack.Decoder) error {
wrapped := m["Wrapped"].(*ptile)
percentile := m["Percentile"].(Expr)
e.Wrapped = wrapped
e.wrapped = wrapped
e.ptile = *wrapped
e.Percentile = percentile
return nil
}
5 changes: 1 addition & 4 deletions expr/percentile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestPercentile(t *testing.T) {
e := msgpacked(t, PERCENTILE(SUM("a"), 99, 0, 100, 1))
expected := float64(99)

eo := msgpacked(t, PERCENTILE(e, 50, 0, 100, 1))
eo := msgpacked(t, PERCENTILEOPT(e, 50))
expectedO := float64(51)

eo2 := msgpacked(t, PERCENTILEOPT(eo, 1))
Expand Down Expand Up @@ -58,9 +58,6 @@ func TestPercentile(t *testing.T) {
// Do some direct updates
for k := float64(1); k <= 50; k++ {
e.Update(b, Map{"a": k}, md)
// Also update the wrapped expressions to make sure this is a noop
eo.Update(b, Map{"a": k}, md)
eo2.Update(b, Map{"a": k}, md)
}

// Do some point merges
Expand Down
1 change: 1 addition & 0 deletions sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ func (f *fielded) percentileExprFor(e *sqlparser.FuncExpr, fname string, default
} else {
if isOptimized {
// tried to wrap a non-percentile expression in a PERCENTILEOPT
log.Errorf("Tried to wrap %v of type %v with optimized PERCENTILE", valueField, reflect.TypeOf(valueField.Expr))
return nil, ErrPercentileOptWrap
}
// existing expression is not a percentile, need to get the field
Expand Down
27 changes: 25 additions & 2 deletions zenodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ view_a:
wg.Add(1)
go testSimpleQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testPercentileOptimizedQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testCrosstabWithHavingQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testStrideQuery(&wg, t, db, includeMemStore, epoch, resolution)
Expand Down Expand Up @@ -463,6 +465,27 @@ ORDER BY _time`
})
}

func testPercentileOptimizedQuery(wg *sync.WaitGroup, t *testing.T, db *DB, includeMemStore bool, epoch time.Time, resolution time.Duration) {
defer wg.Done()

sqlString := `
SELECT PERCENTILE(pp_5p, 90) AS pp_opt
FROM test_a
GROUP BY _
ORDER BY _time`

epoch = encoding.RoundTimeUp(epoch, resolution)
assertExpectedResult(t, db, sqlString, includeMemStore, testsupport.ExpectedResult{
testsupport.ExpectedRow{
epoch,
map[string]interface{}{},
map[string]float64{
"pp_opt": 90,
},
},
})
}

// testHavingQuery makes sure that a HAVING clause works even when the field in
// that clause does not appear in the SELECT clause
func testCrosstabWithHavingQuery(wg *sync.WaitGroup, t *testing.T, db *DB, includeMemStore bool, epoch time.Time, resolution time.Duration) {
Expand Down Expand Up @@ -714,7 +737,7 @@ func assertExpectedResult(t *testing.T, db *DB, sqlString string, includeMemStor
var rows []*core.FlatRow
source, err := db.Query(sqlString, false, nil, includeMemStore)
if err != nil {
return nil, errors.New("Unable to plan SQL query")
return nil, errors.New("Unable to plan SQL query: %v", err)
}

var fields core.Fields
Expand All @@ -728,7 +751,7 @@ func assertExpectedResult(t *testing.T, db *DB, sqlString string, includeMemStor
return true, nil
})
if err != nil {
return nil, errors.New("Unable to plan SQL query")
return nil, errors.New("Unable to plan SQL query: %v", err)
}

if false {
Expand Down

0 comments on commit 7b3204c

Please sign in to comment.