Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(aggregate, search): ft.aggregate bugfixes #3263

Merged
merged 9 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions search_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,20 @@ type FTAggregateWithCursor struct {
}

type FTAggregateOptions struct {
Verbatim bool
LoadAll bool
Load []FTAggregateLoad
Timeout int
GroupBy []FTAggregateGroupBy
SortBy []FTAggregateSortBy
SortByMax int
Verbatim bool
LoadAll bool
Load []FTAggregateLoad
Timeout int
GroupBy []FTAggregateGroupBy
SortBy []FTAggregateSortBy
SortByMax int
// Scorer is used to set scoring function, if not set passed, a default will be used.
// The default scorer depends on the Redis version:
// - `BM25` for Redis >= 8
// - `TFIDF` for Redis < 8
Scorer string
// AddScores is available in Redis CE 8
AddScores bool
Apply []FTAggregateApply
LimitOffset int
Limit int
Expand Down Expand Up @@ -490,6 +497,15 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
if options.Verbatim {
queryArgs = append(queryArgs, "VERBATIM")
}

if options.Scorer != "" {
queryArgs = append(queryArgs, "SCORER", options.Scorer)
}

if options.AddScores {
queryArgs = append(queryArgs, "ADDSCORES")
}

if options.LoadAll && options.Load != nil {
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
}
Expand All @@ -505,9 +521,18 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
}
}
}

if options.Timeout > 0 {
queryArgs = append(queryArgs, "TIMEOUT", options.Timeout)
}

for _, apply := range options.Apply {
queryArgs = append(queryArgs, "APPLY", apply.Field)
if apply.As != "" {
queryArgs = append(queryArgs, "AS", apply.As)
}
}

if options.GroupBy != nil {
for _, groupBy := range options.GroupBy {
queryArgs = append(queryArgs, "GROUPBY", len(groupBy.Fields))
Expand Down Expand Up @@ -549,12 +574,6 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
if options.SortByMax > 0 {
queryArgs = append(queryArgs, "MAX", options.SortByMax)
}
for _, apply := range options.Apply {
queryArgs = append(queryArgs, "APPLY", apply.Field)
if apply.As != "" {
queryArgs = append(queryArgs, "AS", apply.As)
}
}
if options.LimitOffset > 0 {
queryArgs = append(queryArgs, "LIMIT", options.LimitOffset)
}
Expand All @@ -581,6 +600,7 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
queryArgs = append(queryArgs, key, value)
}
}

if options.DialectVersion > 0 {
queryArgs = append(queryArgs, "DIALECT", options.DialectVersion)
}
Expand Down Expand Up @@ -661,11 +681,12 @@ func (cmd *AggregateCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
cmd.err = err
return nil
return err
}
cmd.val, err = ProcessAggregateResult(data)
if err != nil {
cmd.err = err
return err
}
return nil
}
Expand All @@ -681,6 +702,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.Verbatim {
args = append(args, "VERBATIM")
}
if options.Scorer != "" {
args = append(args, "SCORER", options.Scorer)
}
if options.AddScores {
args = append(args, "ADDSCORES")
}
if options.LoadAll && options.Load != nil {
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
}
Expand All @@ -699,6 +726,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.Timeout > 0 {
args = append(args, "TIMEOUT", options.Timeout)
}
for _, apply := range options.Apply {
args = append(args, "APPLY", apply.Field)
if apply.As != "" {
args = append(args, "AS", apply.As)
}
}
if options.GroupBy != nil {
for _, groupBy := range options.GroupBy {
args = append(args, "GROUPBY", len(groupBy.Fields))
Expand Down Expand Up @@ -740,12 +773,6 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.SortByMax > 0 {
args = append(args, "MAX", options.SortByMax)
}
for _, apply := range options.Apply {
args = append(args, "APPLY", apply.Field)
if apply.As != "" {
args = append(args, "AS", apply.As)
}
}
if options.LimitOffset > 0 {
args = append(args, "LIMIT", options.LimitOffset)
}
Expand Down Expand Up @@ -1693,7 +1720,8 @@ func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {

// FTSearch - Executes a search query on an index.
// The 'index' parameter specifies the index to search, and the 'query' parameter specifies the search query.
// For more information, please refer to the Redis documentation:
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSearchCmd {
args := []interface{}{"FT.SEARCH", index, query}
Expand All @@ -1704,6 +1732,12 @@ func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSe

type SearchQuery []interface{}

// FTSearchQuery - Executes a search query on an index with additional options.
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
// and the 'options' parameter specifies additional options for the search.
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
queryArgs := []interface{}{query}
if options != nil {
Expand Down Expand Up @@ -1816,7 +1850,8 @@ func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
// FTSearchWithArgs - Executes a search query on an index with additional options.
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
// and the 'options' parameter specifies additional options for the search.
// For more information, please refer to the Redis documentation:
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query string, options *FTSearchOptions) *FTSearchCmd {
args := []interface{}{"FT.SEARCH", index, query}
Expand Down Expand Up @@ -1908,7 +1943,7 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
}
}
if options.SortByWithCount {
args = append(args, "WITHCOUT")
args = append(args, "WITHCOUNT")
}
}
if options.LimitOffset >= 0 && options.Limit > 0 {
Expand Down
102 changes: 100 additions & 2 deletions search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package redis_test

import (
"context"
"fmt"
"strconv"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -127,8 +129,11 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {

res3, err := client.FTSearchWithArgs(ctx, "num", "foo", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res3.Total).To(BeEquivalentTo(int64(0)))
Expect(res3.Total).To(BeEquivalentTo(int64(3)))

res4, err := client.FTSearchWithArgs(ctx, "num", "notpresentf00", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res4.Total).To(BeEquivalentTo(int64(0)))
})

It("should FTCreate and FTSearch example", Label("search", "ftcreate", "ftsearch"), func() {
Expand Down Expand Up @@ -640,6 +645,100 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Rows[0].Fields["t2"]).To(BeEquivalentTo("world"))
})

It("should FTAggregate with scorer and addscores", Label("search", "ftaggregate", "NonRedisEnterprise"), func() {
SkipBeforeRedisMajor(8, "ADDSCORES is available in Redis CE 8")
title := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: false}
description := &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, Sortable: false}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true, Prefix: []interface{}{"product:"}}, title, description).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")

client.HSet(ctx, "product:1", "title", "New Gaming Laptop", "description", "this is not a desktop")
client.HSet(ctx, "product:2", "title", "Super Old Not Gaming Laptop", "description", "this laptop is not a new laptop but it is a laptop")
client.HSet(ctx, "product:3", "title", "Office PC", "description", "office desktop pc")

options := &redis.FTAggregateOptions{
AddScores: true,
Scorer: "BM25",
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@__score",
Desc: true,
}},
}

res, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).ToNot(BeNil())
Expect(len(res.Rows)).To(BeEquivalentTo(2))
score1, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[0].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
score2, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[1].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
Expect(score1).To(BeNumerically(">", score2))

optionsDM := &redis.FTAggregateOptions{
AddScores: true,
Scorer: "DISMAX",
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@__score",
Desc: true,
}},
}

resDM, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", optionsDM).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resDM).ToNot(BeNil())
Expect(len(resDM.Rows)).To(BeEquivalentTo(2))
score1DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[0].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
score2DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[1].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
Expect(score1DM).To(BeNumerically(">", score2DM))

Expect(score1DM).To(BeEquivalentTo(float64(4)))
Expect(score2DM).To(BeEquivalentTo(float64(1)))
Expect(score1).NotTo(BeEquivalentTo(score1DM))
Expect(score2).NotTo(BeEquivalentTo(score2DM))
})

It("should FTAggregate apply and groupby", Label("search", "ftaggregate"), func() {
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")

// 6 feb
client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "1738823999")

// 12 feb
client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739342399")
client.HSet(ctx, "doc3", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739353199")

reducer := redis.FTAggregateReducer{Reducer: redis.SearchCount, As: "perDay"}

options := &redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{{Field: "floor(@CreatedDateTimeUTC /(60*60*24))", As: "TimestampAsDay"}},
GroupBy: []redis.FTAggregateGroupBy{{
Fields: []interface{}{"@TimestampAsDay"},
Reduce: []redis.FTAggregateReducer{reducer},
}},
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@perDay",
Desc: true,
}},
}

res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).ToNot(BeNil())
Expect(len(res.Rows)).To(BeEquivalentTo(2))
Expect(res.Rows[0].Fields["perDay"]).To(BeEquivalentTo("2"))
Expect(res.Rows[1].Fields["perDay"]).To(BeEquivalentTo("1"))
})

It("should FTAggregate apply", Label("search", "ftaggregate"), func() {
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
Expand Down Expand Up @@ -684,7 +783,6 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Rows[0].Fields["age"]).To(BeEquivalentTo("19"))
Expect(res.Rows[1].Fields["age"]).To(BeEquivalentTo("25"))
}

})

It("should FTSearch SkipInitialScan", Label("search", "ftsearch"), func() {
Expand Down
Loading