diff --git a/CHANGELOG.md b/CHANGELOG.md index 14ff201dd..b68575f4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `WithConcurrentResultSets` option for `db.Query().Query()` + ## v3.117.1 * Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()` * Fixed race in integration test `TestTopicWriterLogMessagesWithoutData` diff --git a/internal/query/client.go b/internal/query/client.go index 40a50e243..10ac77f26 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -401,11 +401,12 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option if err != nil { return xerrors.WithStackTrace(err) } + defer func() { _ = streamResult.Close(ctx) }() - r, err = resultToMaterializedResult(ctx, streamResult) + r, err = concurrentResultToMaterializedResult(ctx, streamResult) if err != nil { return xerrors.WithStackTrace(err) } diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 2421f6259..a87911d79 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -922,6 +922,24 @@ func TestClient(t *testing.T) { Status: Ydb.StatusIds_SUCCESS, ResultSetIndex: 0, ResultSet: &Ydb.ResultSet{ + Columns: []*Ydb.Column{ + { + Name: "a", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UINT64, + }, + }, + }, + { + Name: "b", + Type: &Ydb.Type{ + Type: &Ydb.Type_TypeId{ + TypeId: Ydb.Type_UTF8, + }, + }, + }, + }, Rows: []*Ydb.Value{ { Items: []*Ydb.Value{{ diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 6cdcb622c..243d8ad1a 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -32,6 +32,7 @@ type executeSettings interface { ResourcePool() string ResponsePartLimitSizeBytes() int64 Label() string + ConcurrentResultSets() bool } type executeScriptConfig interface { @@ -92,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) ( }, Parameters: params, StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()), - ConcurrentResultSets: false, + ConcurrentResultSets: cfg.ConcurrentResultSets(), PoolId: cfg.ResourcePool(), ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(), } diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index cb60336ae..c823c0046 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -44,6 +44,7 @@ type ( retryOptions []retry.Option responsePartLimitBytes int64 label string + concurrentResultSets bool } // Execute is an interface for execute method options @@ -70,6 +71,7 @@ type ( } execModeOption = ExecMode responsePartLimitBytes int64 + concurrentResultSets bool ) func (poolID resourcePool) applyExecuteOption(s *executeSettings) { @@ -119,6 +121,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) { s.execMode = mode } +func (opt concurrentResultSets) applyExecuteOption(s *executeSettings) { + s.concurrentResultSets = bool(opt) +} + const ( ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE) ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE) @@ -192,6 +198,10 @@ func (s *executeSettings) Label() string { return s.label } +func (s *executeSettings) ConcurrentResultSets() bool { + return s.concurrentResultSets +} + func WithParameters(params params.Parameters) parametersOption { return parametersOption{ params: params, @@ -224,6 +234,10 @@ func WithResponsePartLimitSizeBytes(size int64) responsePartLimitBytes { return responsePartLimitBytes(size) } +func WithConcurrentResultSets(isEnabled bool) concurrentResultSets { + return concurrentResultSets(isEnabled) +} + func (size responsePartLimitBytes) applyExecuteOption(s *executeSettings) { s.responsePartLimitBytes = int64(size) } diff --git a/internal/query/result.go b/internal/query/result.go index c21f8fc14..8c7216d00 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -13,6 +13,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stats" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -341,6 +342,33 @@ func (r *streamResult) nextPartFunc( } } +func (r *streamResult) NextPart(ctx context.Context) (_ result.Part, err error) { + if r.lastPart == nil { + return nil, xerrors.WithStackTrace(io.EOF) + } + + select { + case <-r.closer.Done(): + return nil, xerrors.WithStackTrace(r.closer.Err()) + case <-ctx.Done(): + return nil, xerrors.WithStackTrace(ctx.Err()) + default: + part, err := r.nextPart(ctx) + if err != nil && !xerrors.Is(err, io.EOF) { + return nil, xerrors.WithStackTrace(err) + } + if part.GetExecStats() != nil && r.statsCallback != nil { + r.statsCallback(stats.FromQueryStats(part.GetExecStats())) + } + defer func() { + r.lastPart = part + r.resultSetIndex = part.GetResultSetIndex() + }() + + return newResultPart(r.lastPart), nil + } +} + func (r *streamResult) NextResultSet(ctx context.Context) (_ result.Set, err error) { if r.trace != nil { onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx, @@ -425,11 +453,20 @@ func exactlyOneResultSetFromResult(ctx context.Context, r result.Result) (rs res return MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows), nil } -func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Result, error) { - var resultSets []result.Set +func concurrentResultToMaterializedResult(ctx context.Context, r result.ConcurrentResult) (result.Result, error) { + type resultSet struct { + rows []query.Row + columnNames []string + columnTypes []types.Type + } + resultSetByIndex := make(map[int64]resultSet) for { - rs, err := r.NextResultSet(ctx) + if ctx.Err() != nil { + return nil, xerrors.WithStackTrace(ctx.Err()) + } + + part, err := r.NextPart(ctx) if err != nil { if xerrors.Is(err, io.EOF) { break @@ -438,9 +475,15 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re return nil, xerrors.WithStackTrace(err) } - var rows []query.Row + rs := resultSetByIndex[part.ResultSetIndex()] + if len(rs.columnNames) == 0 { + rs.columnTypes = part.ColumnTypes() + rs.columnNames = part.ColumnNames() + } + + rows := make([]query.Row, 0) for { - row, err := rs.NextRow(ctx) + row, err := part.NextRow(ctx) if err != nil { if xerrors.Is(err, io.EOF) { break @@ -448,11 +491,16 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re return nil, xerrors.WithStackTrace(err) } - rows = append(rows, row) } + rs.rows = append(rs.rows, rows...) + + resultSetByIndex[part.ResultSetIndex()] = rs + } - resultSets = append(resultSets, MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows)) + resultSets := make([]result.Set, len(resultSetByIndex)) + for rsIndex, rs := range resultSetByIndex { + resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), rs.columnNames, rs.columnTypes, rs.rows) } return &materializedResult{ diff --git a/internal/query/result/result.go b/internal/query/result/result.go index 6205eea37..b94940c0d 100644 --- a/internal/query/result/result.go +++ b/internal/query/result/result.go @@ -21,6 +21,11 @@ type ( // with Go version 1.23+ ResultSets(ctx context.Context) xiter.Seq2[Set, error] } + ConcurrentResult interface { + closer.Closer + + NextPart(ctx context.Context) (Part, error) + } Set interface { Index() int Columns() []string @@ -34,6 +39,13 @@ type ( Set closer.Closer } + Part interface { + ResultSetIndex() int64 + ColumnNames() []string + ColumnTypes() []types.Type + + NextRow(ctx context.Context) (Row, error) + } Row interface { Values() []value.Value diff --git a/internal/query/result_part.go b/internal/query/result_part.go new file mode 100644 index 000000000..0d7fa22e0 --- /dev/null +++ b/internal/query/result_part.go @@ -0,0 +1,77 @@ +package query + +import ( + "context" + "io" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +var _ query.Part = (*resultPart)(nil) + +type ( + resultPart struct { + resultSetIndex int64 + columns []*Ydb.Column + rows []*Ydb.Value + columnNames []string + columnTypes []types.Type + rowIndex int + } +) + +func (p *resultPart) ResultSetIndex() int64 { + return p.resultSetIndex +} + +func (p *resultPart) ColumnNames() []string { + if len(p.columnNames) != 0 { + return p.columnNames + } + names := make([]string, len(p.columns)) + for i, col := range p.columns { + names[i] = col.GetName() + } + p.columnNames = names + + return names +} + +func (p *resultPart) ColumnTypes() []types.Type { + if len(p.columnTypes) != 0 { + return p.columnTypes + } + colTypes := make([]types.Type, len(p.columns)) + for i, col := range p.columns { + colTypes[i] = types.TypeFromYDB(col.GetType()) + } + p.columnTypes = colTypes + + return colTypes +} + +func (p *resultPart) NextRow(ctx context.Context) (query.Row, error) { + if p.rowIndex == len(p.rows) { + return nil, xerrors.WithStackTrace(io.EOF) + } + + defer func() { + p.rowIndex++ + }() + + return NewRow(p.columns, p.rows[p.rowIndex]), nil +} + +func newResultPart(part *Ydb_Query.ExecuteQueryResponsePart) *resultPart { + return &resultPart{ + resultSetIndex: part.GetResultSetIndex(), + columns: part.GetResultSet().GetColumns(), + rows: part.GetResultSet().GetRows(), + rowIndex: 0, + } +} diff --git a/query/execute_options.go b/query/execute_options.go index 784189e9c..a9935ee92 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -63,6 +63,10 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption { return options.WithResponsePartLimitSizeBytes(size) } +func WithConcurrentResultSets(isEnabled bool) ExecuteOption { + return options.WithConcurrentResultSets(isEnabled) +} + func WithCallOptions(opts ...grpc.CallOption) ExecuteOption { return options.WithCallOptions(opts...) } diff --git a/query/result.go b/query/result.go index 9990fa8f0..30e02d927 100644 --- a/query/result.go +++ b/query/result.go @@ -8,8 +8,10 @@ import ( type ( Result = result.Result + ConcurrentResult = result.ConcurrentResult ResultSet = result.Set ClosableResultSet = result.ClosableResultSet + Part = result.Part Row = result.Row Type = types.Type NamedDestination = scanner.NamedDestination