Skip to content

Commit

Permalink
Merge branch 'master' into redis#3139-add-encoding-BinaryUnmarshaler-…
Browse files Browse the repository at this point in the history
…scan
  • Loading branch information
zeze1004 authored Feb 7, 2025
2 parents bba1e2f + d0f9213 commit 1de7ef7
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/spellcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.45.0
uses: rojopolis/spellcheck-github-actions@0.46.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Options struct {
// Add suffix to client name. Default is empty.
IdentitySuffix string

// Enable Unstable mode for Redis Search module with RESP3.
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
UnstableResp3 bool
}

Expand Down
6 changes: 5 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type ClusterOptions struct {
DisableIndentity bool // Disable set-lib on connect. Default is false.

IdentitySuffix string // Add suffix to client name. Default is empty.

// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
UnstableResp3 bool
}

func (opt *ClusterOptions) init() {
Expand Down Expand Up @@ -308,7 +311,8 @@ func (opt *ClusterOptions) clientOptions() *Options {
// much use for ClusterSlots config). This means we cannot execute the
// READONLY command against that node -- setting readOnly to false in such
// situations in the options below will prevent that from happening.
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
UnstableResp3: opt.UnstableResp3,
}
}

Expand Down
81 changes: 58 additions & 23 deletions search_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,20 @@ type FTAggregateWithCursor struct {
}

type FTAggregateOptions struct {
Verbatim bool
LoadAll bool
Load []FTAggregateLoad
Timeout int
GroupBy []FTAggregateGroupBy
SortBy []FTAggregateSortBy
SortByMax int
Verbatim bool
LoadAll bool
Load []FTAggregateLoad
Timeout int
GroupBy []FTAggregateGroupBy
SortBy []FTAggregateSortBy
SortByMax int
// Scorer is used to set scoring function, if not set passed, a default will be used.
// The default scorer depends on the Redis version:
// - `BM25` for Redis >= 8
// - `TFIDF` for Redis < 8
Scorer string
// AddScores is available in Redis CE 8
AddScores bool
Apply []FTAggregateApply
LimitOffset int
Limit int
Expand Down Expand Up @@ -490,6 +497,15 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
if options.Verbatim {
queryArgs = append(queryArgs, "VERBATIM")
}

if options.Scorer != "" {
queryArgs = append(queryArgs, "SCORER", options.Scorer)
}

if options.AddScores {
queryArgs = append(queryArgs, "ADDSCORES")
}

if options.LoadAll && options.Load != nil {
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
}
Expand All @@ -505,9 +521,18 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
}
}
}

if options.Timeout > 0 {
queryArgs = append(queryArgs, "TIMEOUT", options.Timeout)
}

for _, apply := range options.Apply {
queryArgs = append(queryArgs, "APPLY", apply.Field)
if apply.As != "" {
queryArgs = append(queryArgs, "AS", apply.As)
}
}

if options.GroupBy != nil {
for _, groupBy := range options.GroupBy {
queryArgs = append(queryArgs, "GROUPBY", len(groupBy.Fields))
Expand Down Expand Up @@ -549,12 +574,6 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
if options.SortByMax > 0 {
queryArgs = append(queryArgs, "MAX", options.SortByMax)
}
for _, apply := range options.Apply {
queryArgs = append(queryArgs, "APPLY", apply.Field)
if apply.As != "" {
queryArgs = append(queryArgs, "AS", apply.As)
}
}
if options.LimitOffset > 0 {
queryArgs = append(queryArgs, "LIMIT", options.LimitOffset)
}
Expand All @@ -581,6 +600,7 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
queryArgs = append(queryArgs, key, value)
}
}

if options.DialectVersion > 0 {
queryArgs = append(queryArgs, "DIALECT", options.DialectVersion)
}
Expand Down Expand Up @@ -661,11 +681,12 @@ func (cmd *AggregateCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
cmd.err = err
return nil
return err
}
cmd.val, err = ProcessAggregateResult(data)
if err != nil {
cmd.err = err
return err
}
return nil
}
Expand All @@ -681,6 +702,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.Verbatim {
args = append(args, "VERBATIM")
}
if options.Scorer != "" {
args = append(args, "SCORER", options.Scorer)
}
if options.AddScores {
args = append(args, "ADDSCORES")
}
if options.LoadAll && options.Load != nil {
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
}
Expand All @@ -699,6 +726,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.Timeout > 0 {
args = append(args, "TIMEOUT", options.Timeout)
}
for _, apply := range options.Apply {
args = append(args, "APPLY", apply.Field)
if apply.As != "" {
args = append(args, "AS", apply.As)
}
}
if options.GroupBy != nil {
for _, groupBy := range options.GroupBy {
args = append(args, "GROUPBY", len(groupBy.Fields))
Expand Down Expand Up @@ -740,12 +773,6 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
if options.SortByMax > 0 {
args = append(args, "MAX", options.SortByMax)
}
for _, apply := range options.Apply {
args = append(args, "APPLY", apply.Field)
if apply.As != "" {
args = append(args, "AS", apply.As)
}
}
if options.LimitOffset > 0 {
args = append(args, "LIMIT", options.LimitOffset)
}
Expand Down Expand Up @@ -1693,7 +1720,8 @@ func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {

// FTSearch - Executes a search query on an index.
// The 'index' parameter specifies the index to search, and the 'query' parameter specifies the search query.
// For more information, please refer to the Redis documentation:
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSearchCmd {
args := []interface{}{"FT.SEARCH", index, query}
Expand All @@ -1704,6 +1732,12 @@ func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSe

type SearchQuery []interface{}

// FTSearchQuery - Executes a search query on an index with additional options.
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
// and the 'options' parameter specifies additional options for the search.
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
queryArgs := []interface{}{query}
if options != nil {
Expand Down Expand Up @@ -1816,7 +1850,8 @@ func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
// FTSearchWithArgs - Executes a search query on an index with additional options.
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
// and the 'options' parameter specifies additional options for the search.
// For more information, please refer to the Redis documentation:
// For more information, please refer to the Redis documentation about [FT.SEARCH].
//
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query string, options *FTSearchOptions) *FTSearchCmd {
args := []interface{}{"FT.SEARCH", index, query}
Expand Down Expand Up @@ -1908,7 +1943,7 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
}
}
if options.SortByWithCount {
args = append(args, "WITHCOUT")
args = append(args, "WITHCOUNT")
}
}
if options.LimitOffset >= 0 && options.Limit > 0 {
Expand Down
102 changes: 100 additions & 2 deletions search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package redis_test

import (
"context"
"fmt"
"strconv"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -127,8 +129,11 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {

res3, err := client.FTSearchWithArgs(ctx, "num", "foo", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res3.Total).To(BeEquivalentTo(int64(0)))
Expect(res3.Total).To(BeEquivalentTo(int64(3)))

res4, err := client.FTSearchWithArgs(ctx, "num", "notpresentf00", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res4.Total).To(BeEquivalentTo(int64(0)))
})

It("should FTCreate and FTSearch example", Label("search", "ftcreate", "ftsearch"), func() {
Expand Down Expand Up @@ -640,6 +645,100 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Rows[0].Fields["t2"]).To(BeEquivalentTo("world"))
})

It("should FTAggregate with scorer and addscores", Label("search", "ftaggregate", "NonRedisEnterprise"), func() {
SkipBeforeRedisMajor(8, "ADDSCORES is available in Redis CE 8")
title := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: false}
description := &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, Sortable: false}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true, Prefix: []interface{}{"product:"}}, title, description).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")

client.HSet(ctx, "product:1", "title", "New Gaming Laptop", "description", "this is not a desktop")
client.HSet(ctx, "product:2", "title", "Super Old Not Gaming Laptop", "description", "this laptop is not a new laptop but it is a laptop")
client.HSet(ctx, "product:3", "title", "Office PC", "description", "office desktop pc")

options := &redis.FTAggregateOptions{
AddScores: true,
Scorer: "BM25",
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@__score",
Desc: true,
}},
}

res, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).ToNot(BeNil())
Expect(len(res.Rows)).To(BeEquivalentTo(2))
score1, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[0].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
score2, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[1].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
Expect(score1).To(BeNumerically(">", score2))

optionsDM := &redis.FTAggregateOptions{
AddScores: true,
Scorer: "DISMAX",
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@__score",
Desc: true,
}},
}

resDM, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", optionsDM).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resDM).ToNot(BeNil())
Expect(len(resDM.Rows)).To(BeEquivalentTo(2))
score1DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[0].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
score2DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[1].Fields["__score"]), 64)
Expect(err).NotTo(HaveOccurred())
Expect(score1DM).To(BeNumerically(">", score2DM))

Expect(score1DM).To(BeEquivalentTo(float64(4)))
Expect(score2DM).To(BeEquivalentTo(float64(1)))
Expect(score1).NotTo(BeEquivalentTo(score1DM))
Expect(score2).NotTo(BeEquivalentTo(score2DM))
})

It("should FTAggregate apply and groupby", Label("search", "ftaggregate"), func() {
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(BeEquivalentTo("OK"))
WaitForIndexing(client, "idx1")

// 6 feb
client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "1738823999")

// 12 feb
client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739342399")
client.HSet(ctx, "doc3", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739353199")

reducer := redis.FTAggregateReducer{Reducer: redis.SearchCount, As: "perDay"}

options := &redis.FTAggregateOptions{
Apply: []redis.FTAggregateApply{{Field: "floor(@CreatedDateTimeUTC /(60*60*24))", As: "TimestampAsDay"}},
GroupBy: []redis.FTAggregateGroupBy{{
Fields: []interface{}{"@TimestampAsDay"},
Reduce: []redis.FTAggregateReducer{reducer},
}},
SortBy: []redis.FTAggregateSortBy{{
FieldName: "@perDay",
Desc: true,
}},
}

res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).ToNot(BeNil())
Expect(len(res.Rows)).To(BeEquivalentTo(2))
Expect(res.Rows[0].Fields["perDay"]).To(BeEquivalentTo("2"))
Expect(res.Rows[1].Fields["perDay"]).To(BeEquivalentTo("1"))
})

It("should FTAggregate apply", Label("search", "ftaggregate"), func() {
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
Expand Down Expand Up @@ -684,7 +783,6 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
Expect(res.Rows[0].Fields["age"]).To(BeEquivalentTo("19"))
Expect(res.Rows[1].Fields["age"]).To(BeEquivalentTo("25"))
}

})

It("should FTSearch SkipInitialScan", Label("search", "ftsearch"), func() {
Expand Down
1 change: 1 addition & 0 deletions universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {

DisableIndentity: o.DisableIndentity,
IdentitySuffix: o.IdentitySuffix,
UnstableResp3: o.UnstableResp3,
}
}

Expand Down
22 changes: 22 additions & 0 deletions universal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,26 @@ var _ = Describe("UniversalClient", func() {
})
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
})

It("connect to clusters with UniversalClient and UnstableResp3", Label("NonRedisEnterprise"), func() {
client = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: cluster.addrs(),
Protocol: 3,
UnstableResp3: true,
})
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
a := func() { client.FTInfo(ctx, "all").Result() }
Expect(a).ToNot(Panic())
})

It("connect to clusters with ClusterClient and UnstableResp3", Label("NonRedisEnterprise"), func() {
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: cluster.addrs(),
Protocol: 3,
UnstableResp3: true,
})
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
a := func() { client.FTInfo(ctx, "all").Result() }
Expect(a).ToNot(Panic())
})
})

0 comments on commit 1de7ef7

Please sign in to comment.