Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small refactor: remove QueryProcessor #1162

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/top_hits.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (query *TopHits) TranslateSqlResponseToJson(rows []model.QueryResultRow) mo

for _, col := range valuesForHits {

value := col.ExtractValue(query.ctx)
value := col.ExtractValue()

sourceMap[col.ColName] = value

Expand Down
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/top_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (query *TopMetrics) TranslateSqlResponseToJson(rows []model.QueryResultRow)
withoutQuotes = col.ColName
}
colName, _ := strings.CutPrefix(withoutQuotes, `windowed_`)
metrics[colName] = col.ExtractValue(query.ctx)
metrics[colName] = col.ExtractValue()
}
elem := model.JsonMap{
"sort": sortVal,
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ func (query AverageBucket) CalculateResultWhenMissing(parentRows []model.QueryRe
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
for _, parentRowsOneBucket := range model.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket))
}
return resultRows
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/max_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ func (query MaxBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
for _, parentRowsOneBucket := range model.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleMaxBucket(parentRowsOneBucket))
}
return resultRows
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/min_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ func (query MinBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
for _, parentRowsOneBucket := range model.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleMinBucket(parentRowsOneBucket))
}
return resultRows
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/sum_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ func (query SumBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
for _, parentRowsOneBucket := range model.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleSumBucket(parentRowsOneBucket))
}
return resultRows
Expand Down
67 changes: 0 additions & 67 deletions quesma/model/query_processor.go

This file was deleted.

104 changes: 89 additions & 15 deletions quesma/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@ import (
"quesma/schema"
"quesma/util"
"reflect"
"slices"
"strings"
"time"
)

type FieldAtIndex = int // for facets/histogram what Cols[i] means
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused


type QueryResultCol struct {
ColName string // quoted, e.g. `"message"`
Value interface{}
ColType schema.QuesmaType
}
type (
QueryResultCol struct {
ColName string // quoted, e.g. `"message"`
Value interface{}
ColType schema.QuesmaType
}
QueryResultRow struct {
Index string
Cols []QueryResultCol
}
)

func NewQueryResultCol(colName string, value interface{}) QueryResultCol {
return QueryResultCol{ColName: colName, Value: value}
}

type QueryResultRow struct {
Index string
Cols []QueryResultCol
}

// String returns the string representation of the column in format `"<colName>": <value>`, properly quoted.
func (c QueryResultCol) String(ctx context.Context) string {
valueExtracted := c.ExtractValue(ctx)
func (c *QueryResultCol) String(ctx context.Context) string {
valueExtracted := c.ExtractValue()
if valueExtracted == nil {
return ""
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func (c QueryResultCol) String(ctx context.Context) string {

// ExtractValue returns the value of the column. If it is a pointer, it returns the value of the pointer.
// Care: it's untested how it works with '[]type' or '[]*type'.
func (c QueryResultCol) ExtractValue(ctx context.Context) any {
func (c *QueryResultCol) ExtractValue() any {
if c.Value == nil {
return nil
}
Expand All @@ -78,6 +78,14 @@ func (c QueryResultCol) ExtractValue(ctx context.Context) any {
return c.Value
}

func (c *QueryResultCol) isArray() bool {
if c.Value == nil {
return false
}
v := reflect.ValueOf(c.Value)
return v.Kind() == reflect.Slice || v.Kind() == reflect.Array
}

func (r *QueryResultRow) String(ctx context.Context) string {
str := strings.Builder{}
str.WriteString(util.Indent(1) + "{\n")
Expand Down Expand Up @@ -112,6 +120,72 @@ func (r *QueryResultRow) LastColValue() any {
return r.Cols[len(r.Cols)-1].Value
}

// SameSubsetOfColumns returns if r and other have the same values for columns with names in colNames
// They are results of the same query, so we can assume that the columns are in the same order.
func (r *QueryResultRow) SameSubsetOfColumns(other *QueryResultRow, colNames []string) bool {
for i := range min(len(r.Cols), len(other.Cols)) {
if slices.Contains(colNames, r.Cols[i].ColName) {
isArray1 := r.Cols[i].isArray()
isArray2 := other.Cols[i].isArray()

if !isArray1 && !isArray2 {
if r.Cols[i].ExtractValue() != other.Cols[i].ExtractValue() {
return false
}
} else if isArray1 && isArray2 {
if !reflect.DeepEqual(r.Cols[i].Value, other.Cols[i].Value) {
return false
}
} else {
return false
}
}
}
return true
}

// firstNColumnsHaveSameValues returns if 2 rows have the same values for the first N columns
func (r *QueryResultRow) firstNColumnsHaveSameValues(other *QueryResultRow, N int) bool {
for i := 0; i < N; i++ {
isArray1 := r.Cols[i].isArray()
isArray2 := other.Cols[i].isArray()

if !isArray1 && !isArray2 {
if r.Cols[i].ExtractValue() != other.Cols[i].ExtractValue() {
return false
}
} else if isArray1 && isArray2 {
if !reflect.DeepEqual(r.Cols[i].Value, other.Cols[i].Value) {
return false
}
} else {
return false
}
}
return true
}

// SplitResultSetIntoBuckets splits ResultSet into buckets, based on the first N + 1 columns
// E.g. if N == 0, we split into buckets based on the first field,
// e.g. [row(1, ...), row(1, ...), row(2, ...), row(2, ...), row(3, ...)] -> [[row(1, ...), row(1, ...)], [row(2, ...), row(2, ...)], [row(3, ...)]]
func SplitResultSetIntoBuckets(ResultSet []QueryResultRow, N int) [][]QueryResultRow {
if len(ResultSet) == 0 {
return [][]QueryResultRow{{}}
}

lastRow := ResultSet[0]
buckets := [][]QueryResultRow{{lastRow}}
for _, row := range ResultSet[1:] {
if row.firstNColumnsHaveSameValues(&lastRow, N) {
buckets[len(buckets)-1] = append(buckets[len(buckets)-1], row)
} else {
buckets = append(buckets, []QueryResultRow{row})
}
lastRow = row
}
return buckets
}

func FirstNonNilIndex(rows []QueryResultRow) int {
for i, row := range rows {
if row.LastColValue() != nil {
Expand Down
4 changes: 2 additions & 2 deletions quesma/model/query_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func TestQueryResultCol_ExtractValue(t *testing.T) {
for i, tt := range testcases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
col := QueryResultCol{ColName: "name", Value: tt.value}
if col.ExtractValue(context.Background()) != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, col.ExtractValue(context.Background()))
if col.ExtractValue() != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, col.ExtractValue())
}
})
}
Expand Down
Loading