Skip to content

Commit

Permalink
SelectCursor becomes SelectRange
Browse files Browse the repository at this point in the history
`cursor` parameter becomes `start`, and `stopcursor` becomes `stop`.
Order is strictly newest-to-oldest.

Also, cursors for each element are given in every response, regardless
if it was a SelectOffset- or SelectRange-style query.
  • Loading branch information
peterbourgon committed Aug 9, 2014
1 parent d49baaf commit 43eac87
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 77 deletions.
8 changes: 4 additions & 4 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Inserter interface {
// Selecter defines the methods to retrieve elements from a sorted set.
type Selecter interface {
SelectOffset(keys []string, offset, limit int) <-chan Element
SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) <-chan Element
SelectRange(keys []string, start, stop common.Cursor, limit int) <-chan Element
}

// Deleter defines the method to delete elements from a sorted set. A key-
Expand Down Expand Up @@ -188,11 +188,11 @@ func (c *cluster) SelectOffset(keys []string, offset, limit int) <-chan Element
})
}

// SelectCursor uses ZREVRANGEBYSCORE to do a cursor-based select, similar to
// SelectRange uses ZREVRANGEBYSCORE to do a cursor-based select, similar to
// SelectOffset.
func (c *cluster) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) <-chan Element {
func (c *cluster) SelectRange(keys []string, start, stop common.Cursor, limit int) <-chan Element {
return c.selectCommon(keys, func(conn redis.Conn, myKeys []string) (map[string][]common.KeyScoreMember, error) {
return pipelineRangeByScore(conn, myKeys, cursor, stopcursor, limit)
return pipelineRangeByScore(conn, myKeys, start, stop, limit)
})
}

Expand Down
24 changes: 12 additions & 12 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestJSONMarshalling(t *testing.T) {
}
}

func TestSelectCursor(t *testing.T) {
func TestSelectRange(t *testing.T) {
addresses := os.Getenv("TEST_REDIS_ADDRESSES")
if addresses == "" {
t.Logf("To run this test, set the TEST_REDIS_ADDRESSES environment variable")
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestSelectCursor(t *testing.T) {
}

// Middle of the list, a real element cursor.
ch := c.SelectCursor([]string{"foo"}, common.Cursor{Score: 45.4, Member: "gamma"}, common.Cursor{}, 100)
ch := c.SelectRange([]string{"foo"}, common.Cursor{Score: 45.4, Member: "gamma"}, common.Cursor{}, 100)
expected := []common.KeyScoreMember{
{"foo", 35.9, "nu"},
{"foo", 34.8, "omicron"},
Expand All @@ -393,7 +393,7 @@ func TestSelectCursor(t *testing.T) {
}

// Top of the list.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{}, 100)
ch = c.SelectRange([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{}, 100)
expected = []common.KeyScoreMember{
{"foo", 99.2, "beta"},
{"foo", 76.6, "iota"},
Expand All @@ -417,7 +417,7 @@ func TestSelectCursor(t *testing.T) {
}

// Restricted limit.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: 50.1, Member: "alpha"}, common.Cursor{}, 3)
ch = c.SelectRange([]string{"foo"}, common.Cursor{Score: 50.1, Member: "alpha"}, common.Cursor{}, 3)
expected = []common.KeyScoreMember{
{"foo", 45.4, "gamma"},
{"foo", 35.9, "nu"},
Expand All @@ -435,7 +435,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, top of the list, all elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: math.MaxFloat64, Member: ""}, common.Cursor{}, 100)
ch = c.SelectRange([]string{"bar", "foo"}, common.Cursor{Score: math.MaxFloat64, Member: ""}, common.Cursor{}, 100)
m := map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, middle of the list, all elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 100)
ch = c.SelectRange([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 100)
m = map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestSelectCursor(t *testing.T) {
}

// Multiple keys, middle of the list, limited elements.
ch = c.SelectCursor([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 1)
ch = c.SelectRange([]string{"bar", "foo"}, common.Cursor{Score: 66.6, Member: "rho"}, common.Cursor{}, 1)
m = map[string][]common.KeyScoreMember{}
for e := range ch {
if e.Error != nil {
Expand All @@ -520,7 +520,7 @@ func TestSelectCursor(t *testing.T) {
}

// Top of the list, using the stopcursor.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{Score: 45.4, Member: "gamma"}, 100)
ch = c.SelectRange([]string{"foo"}, common.Cursor{Score: math.MaxFloat64}, common.Cursor{Score: 45.4, Member: "gamma"}, 100)
expected = []common.KeyScoreMember{
{"foo", 99.2, "beta"},
{"foo", 76.6, "iota"},
Expand All @@ -538,7 +538,7 @@ func TestSelectCursor(t *testing.T) {
}

// Middle of the list, using the stopcursor.
ch = c.SelectCursor([]string{"foo"}, common.Cursor{Score: 35.9, Member: "nu"}, common.Cursor{Score: 21.5, Member: "kappa"}, 100)
ch = c.SelectRange([]string{"foo"}, common.Cursor{Score: 35.9, Member: "nu"}, common.Cursor{Score: 21.5, Member: "kappa"}, 100)
expected = []common.KeyScoreMember{
{"foo", 34.8, "omicron"},
{"foo", 33.7, "sigma"},
Expand Down Expand Up @@ -590,23 +590,23 @@ func TestCursorRetries(t *testing.T) {
// a low limit. A hard-coded, low maxRetries means this will fail. Note
// that this is testing the specific implementation: not a great unit
// test.
element := <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 2)
element := <-c.SelectRange([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 2)
if element.Error == nil {
t.Error("expected error, got none")
} else {
t.Logf("got expected error (%s)", element.Error)
}

// If we choose a higher limit, it should work.
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 5)
element = <-c.SelectRange([]string{"foo"}, common.Cursor{Score: 1.23, Member: "bbb"}, common.Cursor{}, 5)
if element.Error != nil {
t.Errorf("got unexpected error: %s", element.Error)
} else {
t.Logf("OK: %v", element.KeyScoreMembers)
}

// If we choose a slightly earlier cursor, it should also work.
element = <-c.SelectCursor([]string{"foo"}, common.Cursor{Score: 1.23, Member: "hhh"}, common.Cursor{}, 2)
element = <-c.SelectRange([]string{"foo"}, common.Cursor{Score: 1.23, Member: "hhh"}, common.Cursor{}, 2)
if element.Error != nil {
t.Errorf("got unexpected error: %s", element.Error)
} else {
Expand Down
2 changes: 1 addition & 1 deletion common/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"math"
)

// Cursor is used as part of SelectCursor.
// Cursor is used as part of SelectRange.
type Cursor struct {
Score float64
Member string
Expand Down
8 changes: 4 additions & 4 deletions farm/farm.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (f *Farm) Insert(tuples []common.KeyScoreMember) error {
// Selecter defines a synchronous Select API, implemented by Farm.
type Selecter interface {
SelectOffset(keys []string, offset, limit int) (map[string][]common.KeyScoreMember, error)
SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error)
SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error)
}

// SelectOffset satisfies Selecter and invokes the ReadStrategy of the farm.
Expand All @@ -82,13 +82,13 @@ func (f *Farm) SelectOffset(keys []string, offset, limit int) (map[string][]comm
return f.selecter.SelectOffset(keys, offset, limit)
}

// SelectCursor satisfies Selecter and invokes the ReadStrategy of the farm.
func (f *Farm) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
// SelectRange satisfies Selecter and invokes the ReadStrategy of the farm.
func (f *Farm) SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
// High performance optimization.
if len(keys) <= 0 {
return map[string][]common.KeyScoreMember{}, nil
}
return f.selecter.SelectCursor(keys, cursor, stopcursor, limit)
return f.selecter.SelectRange(keys, start, stop, limit)
}

// Delete removes each tuple from the underlying clusters, if the score is
Expand Down
2 changes: 1 addition & 1 deletion farm/mock_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *mockCluster) SelectOffset(keys []string, offset, limit int) <-chan clus
return ch
}

func (c *mockCluster) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) <-chan cluster.Element {
func (c *mockCluster) SelectRange(keys []string, start, stop common.Cursor, limit int) <-chan cluster.Element {
ch := make(chan cluster.Element)
go func() { close(ch) }()
return ch
Expand Down
18 changes: 9 additions & 9 deletions farm/read_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (s sendOneReadOne) SelectOffset(keys []string, offset, limit int) (map[stri
})
}

// SelectCursor implements farm.Selecter.
func (s sendOneReadOne) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
// SelectRange implements farm.Selecter.
func (s sendOneReadOne) SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
return s.read(len(keys), func(c cluster.Cluster) <-chan cluster.Element {
return c.SelectCursor(keys, cursor, stopcursor, limit)
return c.SelectRange(keys, start, stop, limit)
})
}

Expand Down Expand Up @@ -98,10 +98,10 @@ func (s sendAllReadAll) SelectOffset(keys []string, offset, limit int) (map[stri
}, limit)
}

// SelectCursor implements farm.Selecter.
func (s sendAllReadAll) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
// SelectRange implements farm.Selecter.
func (s sendAllReadAll) SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
return s.read(len(keys), func(c cluster.Cluster) <-chan cluster.Element {
return c.SelectCursor(keys, cursor, stopcursor, limit)
return c.SelectRange(keys, start, stop, limit)
}, limit)
}

Expand Down Expand Up @@ -232,10 +232,10 @@ func (s sendVarReadFirstLinger) SelectOffset(keys []string, offset, limit int) (
}, limit)
}

// SelectCursor implements farm.Selecter.
func (s sendVarReadFirstLinger) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
// SelectRange implements farm.Selecter.
func (s sendVarReadFirstLinger) SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
return s.read(keys, func(c cluster.Cluster, keys []string) <-chan cluster.Element {
return c.SelectCursor(keys, cursor, stopcursor, limit)
return c.SelectRange(keys, start, stop, limit)
}, limit)
}

Expand Down
95 changes: 50 additions & 45 deletions roshi-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,56 +217,47 @@ func handleSelect(selecter farm.Selecter) http.HandlerFunc {
}

var (
offset, offsetGiven = parseInt(r.Form, "offset", 0)
cursorStr, cursorGiven = parseStr(r.Form, "cursor", "")
stopcursorStr, stopcursorGiven = parseStr(r.Form, "stopcursor", "")
limit, _ = parseInt(r.Form, "limit", 10)
coalesce, _ = parseBool(r.Form, "coalesce", false)
offset, offsetGiven = parseInt(r.Form, "offset", 0)
startStr, startGiven = parseStr(r.Form, "start", "")
stopStr, stopGiven = parseStr(r.Form, "stop", "")
limit, _ = parseInt(r.Form, "limit", 10)
coalesce, _ = parseBool(r.Form, "coalesce", false)
)

switch {
case !offsetGiven && cursorGiven:
// SelectCursor. The presence of `coalesce` has no impact.
var cursor, stopcursor common.Cursor
if err := cursor.Parse(cursorStr); err != nil {
case !offsetGiven && startGiven:
// SelectRange. `coalesce` has no impact on the request, only the
// handling of the response.
var start, stop common.Cursor
if err := start.Parse(startStr); err != nil {
respondError(w, r.Method, r.URL.String(), http.StatusBadRequest, err)
return
}

if stopcursorGiven {
if err := stopcursor.Parse(stopcursorStr); err != nil {
if stopGiven {
if err := stop.Parse(stopStr); err != nil {
respondError(w, r.Method, r.URL.String(), http.StatusBadRequest, err)
return
}
}

results, err := selecter.SelectCursor(keyStrings, cursor, stopcursor, limit)
results, err := selecter.SelectRange(keyStrings, start, stop, limit)
if err != nil {
respondError(w, r.Method, r.URL.String(), http.StatusInternalServerError, err)
return
}

cursorResults := map[string][]keyScoreMemberCursor{}
for key, keyScoreMembers := range results {
keyScoreMemberCursors := make([]keyScoreMemberCursor, len(keyScoreMembers))
for i, keyScoreMember := range keyScoreMembers {
keyScoreMemberCursors[i] = keyScoreMemberCursor{
KeyScoreMember: keyScoreMember,
Cursor: keyScoreMember.Cursor().String(),
}
}
cursorResults[key] = keyScoreMemberCursors
}
cursorResults := addCursor(results)

if coalesce {
respondSelected(w, flattenCursor(cursorResults, limit), time.Since(began))
respondSelected(w, flatten(cursorResults, 0, limit), time.Since(began))
return
}

respondSelected(w, cursorResults, time.Since(began))
respondSelected(w, results, time.Since(began))
return

case offsetGiven && !cursorGiven, !offsetGiven && !cursorGiven:
case offsetGiven && !startGiven, !offsetGiven && !startGiven:
// SelectOffset. The offset/limit may be altered by `coalesce`.
var (
selectOffset = offset
Expand All @@ -284,15 +275,17 @@ func handleSelect(selecter farm.Selecter) http.HandlerFunc {
return
}

cursorResults := addCursor(results)

if coalesce {
respondSelected(w, flattenOffset(results, offset, limit), time.Since(began))
respondSelected(w, flatten(cursorResults, offset, limit), time.Since(began))
return
}

respondSelected(w, results, time.Since(began))
respondSelected(w, cursorResults, time.Since(began))
return

case offsetGiven && cursorGiven:
case offsetGiven && startGiven:
respondError(w, r.Method, r.URL.String(), http.StatusBadRequest, fmt.Errorf("cannot specify both offset and cursor"))
return

Expand Down Expand Up @@ -340,31 +333,43 @@ func handleDelete(deleter cluster.Deleter) http.HandlerFunc {
}
}

func flattenOffset(m map[string][]common.KeyScoreMember, offset, limit int) []common.KeyScoreMember {
a := []common.KeyScoreMember{}
for _, tuples := range m {
a = append(a, tuples...)
}
sort.Sort(keyScoreMembers(a))
if len(a) < offset {
return []common.KeyScoreMember{}
}
a = a[offset:]
if len(a) > limit {
a = a[:limit]
func addCursor(in map[string][]common.KeyScoreMember) map[string][]keyScoreMemberCursor {
out := map[string][]keyScoreMemberCursor{}

for key, keyScoreMembers := range in {
keyScoreMemberCursors := make([]keyScoreMemberCursor, len(keyScoreMembers))

for i, keyScoreMember := range keyScoreMembers {
keyScoreMemberCursors[i] = keyScoreMemberCursor{
KeyScoreMember: keyScoreMember,
Cursor: keyScoreMember.Cursor().String(),
}
}

out[key] = keyScoreMemberCursors
}
return a

return out
}

func flattenCursor(m map[string][]keyScoreMemberCursor, limit int) []keyScoreMemberCursor {
func flatten(m map[string][]keyScoreMemberCursor, offset, limit int) []keyScoreMemberCursor {
a := []keyScoreMemberCursor{}
for _, tuples := range m {
a = append(a, tuples...)
for _, slice := range m {
a = append(a, slice...)
}

sort.Sort(keyScoreMemberCursors(a))

if len(a) < offset {
return []keyScoreMemberCursor{}
}

a = a[offset:]

if len(a) > limit {
a = a[:limit]
}

return a
}

Expand Down
2 changes: 1 addition & 1 deletion roshi-server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (f *mockFarm) SelectOffset(keys []string, offset, limit int) (map[string][]
return m, nil
}

func (f *mockFarm) SelectCursor(keys []string, cursor, stopcursor common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
func (f *mockFarm) SelectRange(keys []string, start, stop common.Cursor, limit int) (map[string][]common.KeyScoreMember, error) {
return map[string][]common.KeyScoreMember{}, fmt.Errorf("not yet implemented")
}

Expand Down

0 comments on commit 43eac87

Please sign in to comment.