Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `WithConcurrentResultSets` option for `db.Query().Query()`

## v3.117.1
* Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()`
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`
Expand Down
3 changes: 2 additions & 1 deletion internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,12 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
if err != nil {
return xerrors.WithStackTrace(err)
}

defer func() {
_ = streamResult.Close(ctx)
}()

r, err = resultToMaterializedResult(ctx, streamResult)
r, err = concurrentResultToMaterializedResult(ctx, streamResult)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down
18 changes: 18 additions & 0 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,24 @@ func TestClient(t *testing.T) {
Status: Ydb.StatusIds_SUCCESS,
ResultSetIndex: 0,
ResultSet: &Ydb.ResultSet{
Columns: []*Ydb.Column{
{
Name: "a",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UINT64,
},
},
},
{
Name: "b",
Type: &Ydb.Type{
Type: &Ydb.Type_TypeId{
TypeId: Ydb.Type_UTF8,
},
},
},
},
Rows: []*Ydb.Value{
{
Items: []*Ydb.Value{{
Expand Down
3 changes: 2 additions & 1 deletion internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type executeSettings interface {
ResourcePool() string
ResponsePartLimitSizeBytes() int64
Label() string
ConcurrentResultSets() bool
}

type executeScriptConfig interface {
Expand Down Expand Up @@ -92,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) (
},
Parameters: params,
StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()),
ConcurrentResultSets: false,
ConcurrentResultSets: cfg.ConcurrentResultSets(),
PoolId: cfg.ResourcePool(),
ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(),
}
Expand Down
14 changes: 14 additions & 0 deletions internal/query/options/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
retryOptions []retry.Option
responsePartLimitBytes int64
label string
concurrentResultSets bool
}

// Execute is an interface for execute method options
Expand All @@ -70,6 +71,7 @@ type (
}
execModeOption = ExecMode
responsePartLimitBytes int64
concurrentResultSets bool
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand Down Expand Up @@ -119,6 +121,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) {
s.execMode = mode
}

func (opt concurrentResultSets) applyExecuteOption(s *executeSettings) {
s.concurrentResultSets = bool(opt)
}

const (
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
Expand Down Expand Up @@ -192,6 +198,10 @@ func (s *executeSettings) Label() string {
return s.label
}

func (s *executeSettings) ConcurrentResultSets() bool {
return s.concurrentResultSets
}

func WithParameters(params params.Parameters) parametersOption {
return parametersOption{
params: params,
Expand Down Expand Up @@ -224,6 +234,10 @@ func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes {
return responsePartLimitBytes(size)
}

func WithConcurrentResultSets(isEnabled bool) concurrentResultSets {
return concurrentResultSets(isEnabled)
}

func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) {
s.responsePartLimitBytes = int64(size)
}
Expand Down
62 changes: 55 additions & 7 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
Expand Down Expand Up @@ -341,6 +342,33 @@ func (r *streamResult) nextPartFunc(
}
}

func (r *streamResult) NextPart(ctx context.Context) (_ result.Part, err error) {
if r.lastPart == nil {
return nil, xerrors.WithStackTrace(io.EOF)
}

select {
case <-r.closer.Done():
return nil, xerrors.WithStackTrace(r.closer.Err())
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
default:
part, err := r.nextPart(ctx)
if err != nil && !xerrors.Is(err, io.EOF) {
return nil, xerrors.WithStackTrace(err)
}
if part.GetExecStats() != nil && r.statsCallback != nil {
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
}
defer func() {
r.lastPart = part
r.resultSetIndex = part.GetResultSetIndex()
}()

return newResultPart(r.lastPart), nil
}
}

func (r *streamResult) NextResultSet(ctx context.Context) (_ result.Set, err error) {
if r.trace != nil {
onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx,
Expand Down Expand Up @@ -425,11 +453,20 @@ func exactlyOneResultSetFromResult(ctx context.Context, r result.Result) (rs res
return MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows), nil
}

func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Result, error) {
var resultSets []result.Set
func concurrentResultToMaterializedResult(ctx context.Context, r result.ConcurrentResult) (result.Result, error) {
type resultSet struct {
rows []query.Row
columnNames []string
columnTypes []types.Type
}
resultSetByIndex := make(map[int64]resultSet)

for {
rs, err := r.NextResultSet(ctx)
if ctx.Err() != nil {
return nil, xerrors.WithStackTrace(ctx.Err())
}

part, err := r.NextPart(ctx)
if err != nil {
if xerrors.Is(err, io.EOF) {
break
Expand All @@ -438,21 +475,32 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re
return nil, xerrors.WithStackTrace(err)
}

var rows []query.Row
rs := resultSetByIndex[part.ResultSetIndex()]
if len(rs.columnNames) == 0 {
rs.columnTypes = part.ColumnTypes()
rs.columnNames = part.ColumnNames()
}

rows := make([]query.Row, 0)
for {
row, err := rs.NextRow(ctx)
row, err := part.NextRow(ctx)
if err != nil {
if xerrors.Is(err, io.EOF) {
break
}

return nil, xerrors.WithStackTrace(err)
}

rows = append(rows, row)
}
rs.rows = append(rs.rows, rows...)

resultSetByIndex[part.ResultSetIndex()] = rs
}

resultSets = append(resultSets, MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows))
resultSets := make([]result.Set, len(resultSetByIndex))
for rsIndex, rs := range resultSetByIndex {
resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), rs.columnNames, rs.columnTypes, rs.rows)
}

return &materializedResult{
Expand Down
12 changes: 12 additions & 0 deletions internal/query/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type (
// with Go version 1.23+
ResultSets(ctx context.Context) xiter.Seq2[Set, error]
}
ConcurrentResult interface {
closer.Closer

NextPart(ctx context.Context) (Part, error)
}
Set interface {
Index() int
Columns() []string
Expand All @@ -34,6 +39,13 @@ type (
Set
closer.Closer
}
Part interface {
ResultSetIndex() int64
ColumnNames() []string
ColumnTypes() []types.Type

NextRow(ctx context.Context) (Row, error)
}
Row interface {
Values() []value.Value

Expand Down
77 changes: 77 additions & 0 deletions internal/query/result_part.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package query

import (
"context"
"io"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
)

var _ query.Part = (*resultPart)(nil)

type (
resultPart struct {
resultSetIndex int64
columns []*Ydb.Column
rows []*Ydb.Value
columnNames []string
columnTypes []types.Type
rowIndex int
}
)

func (p *resultPart) ResultSetIndex() int64 {
return p.resultSetIndex
}

func (p *resultPart) ColumnNames() []string {
if len(p.columnNames) != 0 {
return p.columnNames
}
names := make([]string, len(p.columns))
for i, col := range p.columns {
names[i] = col.GetName()
}
p.columnNames = names

return names
}

func (p *resultPart) ColumnTypes() []types.Type {
if len(p.columnTypes) != 0 {
return p.columnTypes
}
colTypes := make([]types.Type, len(p.columns))
for i, col := range p.columns {
colTypes[i] = types.TypeFromYDB(col.GetType())
}
p.columnTypes = colTypes

return colTypes
}

func (p *resultPart) NextRow(ctx context.Context) (query.Row, error) {
if p.rowIndex == len(p.rows) {
return nil, xerrors.WithStackTrace(io.EOF)
}

defer func() {
p.rowIndex++
}()

return NewRow(p.columns, p.rows[p.rowIndex]), nil
}

func newResultPart(part *Ydb_Query.ExecuteQueryResponsePart) *resultPart {
return &resultPart{
resultSetIndex: part.GetResultSetIndex(),
columns: part.GetResultSet().GetColumns(),
rows: part.GetResultSet().GetRows(),
rowIndex: 0,
}
}
4 changes: 4 additions & 0 deletions query/execute_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
return options.WithResponsePartLimitSizeBytes(size)
}

func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
return options.WithConcurrentResultSets(isEnabled)
}

func WithCallOptions(opts ...grpc.CallOption) ExecuteOption {
return options.WithCallOptions(opts...)
}
Expand Down
2 changes: 2 additions & 0 deletions query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

type (
Result = result.Result
ConcurrentResult = result.ConcurrentResult
ResultSet = result.Set
ClosableResultSet = result.ClosableResultSet
Part = result.Part
Row = result.Row
Type = types.Type
NamedDestination = scanner.NamedDestination
Expand Down
Loading