@@ -8,11 +8,13 @@ import (
88 "time"
99
1010 "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
11+ "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1112 "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
12-
13+
1314 "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
1415 "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1516 "github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
17+ "github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1618 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1719 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
1820 "github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -459,3 +461,60 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re
459461 resultSets : resultSets ,
460462 }, nil
461463}
464+
465+ func streamToMaterializedResult (ctx context.Context , stream Ydb_Query_V1.QueryService_ExecuteQueryClient ) (result.Result , error ) {
466+ type resultSet struct {
467+ rows []query.Row
468+ rawColumns []* Ydb.Column
469+ columnNames []string
470+ columnTypes []types.Type
471+ }
472+ resultSetByIndex := make (map [int64 ]resultSet )
473+
474+ for {
475+ if ctx .Err () != nil {
476+ return nil , xerrors .WithStackTrace (ctx .Err ())
477+ }
478+
479+ part , err := stream .Recv ()
480+ if err != nil {
481+ if xerrors .Is (err , io .EOF ) {
482+ break
483+ }
484+ return nil , xerrors .WithStackTrace (err )
485+ }
486+
487+ if part .GetResultSetIndex () < 0 {
488+ break
489+ }
490+
491+ rs := resultSetByIndex [part .GetResultSetIndex ()]
492+ if len (rs .columnNames ) == 0 {
493+ rs .rawColumns = part .GetResultSet ().GetColumns ()
494+ rs .columnNames = make ([]string , 0 , len (rs .rawColumns ))
495+ rs .columnTypes = make ([]types.Type , 0 , len (rs .rawColumns ))
496+
497+ for _ , column := range rs .rawColumns {
498+ rs .columnNames = append (rs .columnNames , column .GetName ())
499+ rs .columnTypes = append (rs .columnTypes , types .TypeFromYDB (column .GetType ()))
500+ }
501+ }
502+
503+ rows := make ([]query.Row , 0 , len (part .GetResultSet ().GetRows ()))
504+ for _ , row := range part .GetResultSet ().GetRows () {
505+ rows = append (rows , NewRow (rs .rawColumns , row ))
506+ }
507+ rs .rows = append (rs .rows , rows ... )
508+
509+ resultSetByIndex [part .GetResultSetIndex ()] = rs
510+ }
511+
512+ resultSets := make ([]result.Set , len (resultSetByIndex ))
513+ for rsIndex , rs := range resultSetByIndex {
514+ resultSets [rsIndex ] = MaterializedResultSet (int (rsIndex ), rs .columnNames , rs .columnTypes , rs .rows )
515+ }
516+
517+ return & materializedResult {
518+ resultSets : resultSets ,
519+ }, nil
520+ }
0 commit comments