Skip to content

Commit

Permalink
Add iteration support to sqlite backend (#52530)
Browse files Browse the repository at this point in the history
This builds on top of #52199 by updating the sqlite backend to
implement backend.BackendWithItems. Only GetRange, and not DeletRange,
was refactored to use Items to retrieve a range.
  • Loading branch information
rosstimothy authored Mar 4, 2025
1 parent 231420b commit 8ecea4a
Showing 1 changed file with 107 additions and 31 deletions.
138 changes: 107 additions & 31 deletions lib/backend/lite/lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"io/fs"
"iter"
"log/slog"
"net/url"
"os"
Expand Down Expand Up @@ -629,6 +630,98 @@ func (l *Backend) getInTransaction(ctx context.Context, key backend.Key, tx *sql
return nil
}

func (l *Backend) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
if params.StartKey.IsZero() {
err := trace.BadParameter("missing parameter startKey")
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
}
if params.EndKey.IsZero() {
err := trace.BadParameter("missing parameter endKey")
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
}

const (
queryAsc = "SELECT key, value, expires, revision FROM kv WHERE (key BETWEEN ? AND ?) AND (? == '' OR key > ?) AND (expires IS NULL OR expires > ?) ORDER BY key ASC LIMIT ?"
queryDesc = "SELECT key, value, expires, revision FROM kv WHERE (key BETWEEN ? AND ?) AND (? == '' OR key < ?) AND (expires IS NULL OR expires > ?) ORDER BY key DESC LIMIT ?"
defaultPageSize = 1000
)
return func(yield func(backend.Item, error) bool) {
limit := params.Limit
if limit <= 0 {
limit = backend.DefaultRangeLimit
}

var exclusiveStartKey string
startKey := params.StartKey.String()
endKey := params.EndKey.String()

query := queryAsc
if params.Descending {
query = queryDesc
}

var pageLimit, totalCount int
items := make([]backend.Item, 0, min(limit, defaultPageSize))
for {
items = items[:0]
pageLimit = min(limit-totalCount, defaultPageSize)
if err := l.inTransaction(ctx, func(tx *sql.Tx) error {
q, err := tx.PrepareContext(ctx, query)
if err != nil {
return trace.Wrap(err)
}
defer q.Close()

rows, err := q.QueryContext(ctx, startKey, endKey, exclusiveStartKey, exclusiveStartKey, l.clock.Now().UTC(), pageLimit)
if err != nil {
return trace.Wrap(err)
}
defer rows.Close()

for rows.Next() {
var item backend.Item
var expires sql.NullTime
if err := rows.Scan(&item.Key, &item.Value, &expires, &item.Revision); err != nil {
return trace.Wrap(err)
}
item.Expires = expires.Time
if item.Revision == "" {
item.Revision = backend.BlankRevision
}

items = append(items, item)
}

// Explicitly call rows.Close() to return the error instead of
// it being ignored in the defer above.
return trace.Wrap(rows.Close())
}); err != nil {
yield(backend.Item{}, trace.Wrap(err))
return
}

if len(items) >= pageLimit {
exclusiveStartKey = items[len(items)-1].Key.String()
}

for _, item := range items {
if !yield(item, nil) {
return
}

totalCount++
if limit != backend.NoLimit && totalCount >= limit {
return
}
}

if len(items) < pageLimit {
return
}
}
}
}

// GetRange returns query range
func (l *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
if startKey.IsZero() {
Expand All @@ -642,37 +735,13 @@ func (l *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, li
}

var result backend.GetResult
err := l.inTransaction(ctx, func(tx *sql.Tx) error {
q, err := tx.PrepareContext(ctx,
"SELECT key, value, expires, revision FROM kv WHERE (key >= ? and key <= ?) AND (expires is NULL or expires > ?) ORDER BY key LIMIT ?")
if err != nil {
return trace.Wrap(err)
}
defer q.Close()

rows, err := q.QueryContext(ctx, startKey.String(), endKey.String(), l.clock.Now().UTC(), limit)
for item, err := range l.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
if err != nil {
return trace.Wrap(err)
}
defer rows.Close()

for rows.Next() {
var i backend.Item
var expires sql.NullTime
if err := rows.Scan(&i.Key, &i.Value, &expires, &i.Revision); err != nil {
return trace.Wrap(err)
}
i.Expires = expires.Time
if i.Revision == "" {
i.Revision = backend.BlankRevision
}
result.Items = append(result.Items, i)
return nil, trace.Wrap(err)
}
return nil
})
if err != nil {
return nil, trace.Wrap(err)
result.Items = append(result.Items, item)
}

if len(result.Items) == backend.DefaultRangeLimit {
l.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit)
}
Expand Down Expand Up @@ -790,18 +859,25 @@ func (l *Backend) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
if err != nil {
return trace.Wrap(err)
}
defer rows.Close()

var keys []backend.Key
defer rows.Close()
for rows.Next() {
var key backend.Key
if err := rows.Scan(&key); err != nil {
return trace.Wrap(err)
}

keys = append(keys, key)
}

for i := range keys {
if err := l.deleteInTransaction(l.ctx, keys[i], tx); err != nil {
// Close rows early before any deletions occur.
if err := rows.Close(); err != nil {
return trace.Wrap(err)
}

for _, key := range keys {
if err := l.deleteInTransaction(l.ctx, key, tx); err != nil {
return trace.Wrap(err)
}
}
Expand Down

0 comments on commit 8ecea4a

Please sign in to comment.