Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed optimized (wrapped) percentiles #84

Merged
merged 1 commit into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions expr/percentile.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,26 @@ import (
// expressions) so it is best to keep these relatively low cardinality.
func PERCENTILE(value interface{}, percentile interface{}, min float64, max float64, precision int) Expr {
valueExpr := exprFor(value)
switch t := valueExpr.(type) {
case *ptile:
return PERCENTILEOPT(t, exprFor(percentile))
case *ptileOptimized:
return PERCENTILEOPT(t, exprFor(percentile))
default:
// Remove aggregates
valueExpr = valueExpr.DeAggregate()
// Figure out what precision to use for HDR
hdrPrecision := precision
if hdrPrecision < 1 {
hdrPrecision = 1
} else if hdrPrecision > 5 {
hdrPrecision = 5
}
// Remove aggregates
valueExpr = valueExpr.DeAggregate()
// Figure out what precision to use for HDR
hdrPrecision := precision
if hdrPrecision < 1 {
hdrPrecision = 1
} else if hdrPrecision > 5 {
hdrPrecision = 5
}

sampleHisto := hdrhistogram.New(scaleToInt(min, precision), scaleToInt(max, precision), hdrPrecision)
numCounts := len(sampleHisto.Export().Counts)
return &ptile{
Value: BOUNDED(valueExpr, min, max),
Percentile: exprFor(percentile),
Min: scaleToInt(min, precision),
Max: scaleToInt(max, precision),
Precision: precision,
HDRPrecision: hdrPrecision,
Width: (1+numCounts)*width64bits + valueExpr.EncodedWidth(),
}
sampleHisto := hdrhistogram.New(scaleToInt(min, precision), scaleToInt(max, precision), hdrPrecision)
numCounts := len(sampleHisto.Export().Counts)
return &ptile{
Value: BOUNDED(valueExpr, min, max),
Percentile: exprFor(percentile),
Min: scaleToInt(min, precision),
Max: scaleToInt(max, precision),
Precision: precision,
HDRPrecision: hdrPrecision,
Width: (1+numCounts)*width64bits + valueExpr.EncodedWidth(),
}
}

Expand Down
50 changes: 8 additions & 42 deletions expr/percentile_optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,74 +2,40 @@ package expr

import (
"fmt"
"time"

"github.com/getlantern/goexpr"
"github.com/getlantern/msgpack"
)

// PERCENTILEOPT returns an optimized PERCENTILE that wraps an existing
// PERCENTILE and reuses its storage.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer attempt to reuse the existing percentile's storage, we assume separate storage for this percentile.

// PERCENTILE.
func PERCENTILEOPT(wrapped interface{}, percentile interface{}) Expr {
var expr Expr
switch t := wrapped.(type) {
case *ptileOptimized:
expr = t.wrapped
expr = &t.ptile
default:
expr = wrapped.(*ptile)
}
return &ptileOptimized{Wrapped: expr, wrapped: expr.(*ptile), Percentile: exprFor(percentile)}
return &ptileOptimized{Wrapped: expr, ptile: *expr.(*ptile), Percentile: exprFor(percentile)}
}

type ptileOptimized struct {
ptile
Wrapped Expr
wrapped *ptile
Percentile Expr
}

func (e *ptileOptimized) Validate() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this stuff is now inherited from ptile.

return e.wrapped.Validate()
}

func (e *ptileOptimized) EncodedWidth() int {
return 0
}

func (e *ptileOptimized) Shift() time.Duration {
return 0
}

func (e *ptileOptimized) Update(b []byte, params Params, metadata goexpr.Params) ([]byte, float64, bool) {
return b, 0, false
}

func (e *ptileOptimized) Merge(b []byte, x []byte, y []byte) ([]byte, []byte, []byte) {
return b, x, y
}

func (e *ptileOptimized) SubMergers(subs []Expr) []SubMerge {
return nil
}

func (e *ptileOptimized) Get(b []byte) (float64, bool, []byte) {
histo, wasSet, remain := e.wrapped.load(b)
histo, wasSet, remain := e.ptile.load(b)
percentile, _, remain := e.Percentile.Get(remain)
if !wasSet {
return 0, wasSet, remain
}
return e.wrapped.calc(histo, percentile), wasSet, remain
}

func (e *ptileOptimized) IsConstant() bool {
return false
}

func (e *ptileOptimized) DeAggregate() Expr {
return PERCENTILE(e.wrapped.DeAggregate(), e.Percentile.DeAggregate(), scaleFromInt(e.wrapped.Min, e.wrapped.Precision), scaleFromInt(e.wrapped.Max, e.wrapped.Precision), e.wrapped.Precision)
return e.ptile.calc(histo, percentile), wasSet, remain
}

func (e *ptileOptimized) String() string {
return fmt.Sprintf("PERCENTILE(%v, %v)", e.wrapped.String(), e.Percentile)
return fmt.Sprintf("PERCENTILE(%v, %v)", e.Wrapped.String(), e.Percentile)
}

func (e *ptileOptimized) DecodeMsgpack(dec *msgpack.Decoder) error {
Expand All @@ -81,7 +47,7 @@ func (e *ptileOptimized) DecodeMsgpack(dec *msgpack.Decoder) error {
wrapped := m["Wrapped"].(*ptile)
percentile := m["Percentile"].(Expr)
e.Wrapped = wrapped
e.wrapped = wrapped
e.ptile = *wrapped
e.Percentile = percentile
return nil
}
5 changes: 1 addition & 4 deletions expr/percentile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestPercentile(t *testing.T) {
e := msgpacked(t, PERCENTILE(SUM("a"), 99, 0, 100, 1))
expected := float64(99)

eo := msgpacked(t, PERCENTILE(e, 50, 0, 100, 1))
eo := msgpacked(t, PERCENTILEOPT(e, 50))
expectedO := float64(51)

eo2 := msgpacked(t, PERCENTILEOPT(eo, 1))
Expand Down Expand Up @@ -58,9 +58,6 @@ func TestPercentile(t *testing.T) {
// Do some direct updates
for k := float64(1); k <= 50; k++ {
e.Update(b, Map{"a": k}, md)
// Also update the wrapped expressions to make sure this is a noop
eo.Update(b, Map{"a": k}, md)
eo2.Update(b, Map{"a": k}, md)
}

// Do some point merges
Expand Down
1 change: 1 addition & 0 deletions sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ func (f *fielded) percentileExprFor(e *sqlparser.FuncExpr, fname string, default
} else {
if isOptimized {
// tried to wrap a non-percentile expression in a PERCENTILEOPT
log.Errorf("Tried to wrap %v of type %v with optimized PERCENTILE", valueField, reflect.TypeOf(valueField.Expr))
return nil, ErrPercentileOptWrap
}
// existing expression is not a percentile, need to get the field
Expand Down
27 changes: 25 additions & 2 deletions zenodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ view_a:
wg.Add(1)
go testSimpleQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testPercentileOptimizedQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testCrosstabWithHavingQuery(&wg, t, db, includeMemStore, epoch, resolution)
wg.Add(1)
go testStrideQuery(&wg, t, db, includeMemStore, epoch, resolution)
Expand Down Expand Up @@ -463,6 +465,27 @@ ORDER BY _time`
})
}

func testPercentileOptimizedQuery(wg *sync.WaitGroup, t *testing.T, db *DB, includeMemStore bool, epoch time.Time, resolution time.Duration) {
defer wg.Done()

sqlString := `
SELECT PERCENTILE(pp_5p, 90) AS pp_opt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows an example of what's now possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Thanks also, the context is helpful.

FROM test_a
GROUP BY _
ORDER BY _time`

epoch = encoding.RoundTimeUp(epoch, resolution)
assertExpectedResult(t, db, sqlString, includeMemStore, testsupport.ExpectedResult{
testsupport.ExpectedRow{
epoch,
map[string]interface{}{},
map[string]float64{
"pp_opt": 90,
},
},
})
}

// testHavingQuery makes sure that a HAVING clause works even when the field in
// that clause does not appear in the SELECT clause
func testCrosstabWithHavingQuery(wg *sync.WaitGroup, t *testing.T, db *DB, includeMemStore bool, epoch time.Time, resolution time.Duration) {
Expand Down Expand Up @@ -714,7 +737,7 @@ func assertExpectedResult(t *testing.T, db *DB, sqlString string, includeMemStor
var rows []*core.FlatRow
source, err := db.Query(sqlString, false, nil, includeMemStore)
if err != nil {
return nil, errors.New("Unable to plan SQL query")
return nil, errors.New("Unable to plan SQL query: %v", err)
}

var fields core.Fields
Expand All @@ -728,7 +751,7 @@ func assertExpectedResult(t *testing.T, db *DB, sqlString string, includeMemStor
return true, nil
})
if err != nil {
return nil, errors.New("Unable to plan SQL query")
return nil, errors.New("Unable to plan SQL query: %v", err)
}

if false {
Expand Down