Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added method `query.WithIssuesHandler` to get query issues

## v3.117.1
* Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()`
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`
Expand Down
7 changes: 5 additions & 2 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := s.execute(ctx, q, settings, withStreamResultTrace(s.trace))
streamResult, err := s.execute(ctx, q, settings,
withStreamResultTrace(s.trace), withIssuesHandler(settings.IssuesOpts()))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -397,7 +398,9 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withStreamResultTrace(s.trace))
settingsLocal := options.ExecuteSettings(opts...)
streamResult, err := s.execute(ctx, q, settings,
withStreamResultTrace(s.trace), withIssuesHandler(settingsLocal.IssuesOpts()))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down
19 changes: 19 additions & 0 deletions internal/query/options/execute.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package options

import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
"google.golang.org/grpc"

Expand Down Expand Up @@ -42,6 +43,7 @@ type (
callOptions []grpc.CallOption
txControl *tx.Control
retryOptions []retry.Option
issueCallback func(issues []*Ydb_Issue.IssueMessage)
responsePartLimitBytes int64
label string
}
Expand Down Expand Up @@ -70,6 +72,9 @@ type (
}
execModeOption = ExecMode
responsePartLimitBytes int64
issuesOption struct {
callback func([]*Ydb_Issue.IssueMessage)
}
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand All @@ -80,6 +85,10 @@ func (s *executeSettings) RetryOpts() []retry.Option {
return s.retryOptions
}

func (s *executeSettings) IssuesOpts() func([]*Ydb_Issue.IssueMessage) {
return s.issueCallback
}

func (s *executeSettings) StatsCallback() func(stats.QueryStats) {
return s.statsCallback
}
Expand Down Expand Up @@ -119,6 +128,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) {
s.execMode = mode
}

func (opts issuesOption) applyExecuteOption(s *executeSettings) {
s.issueCallback = opts.callback
}

const (
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
Expand Down Expand Up @@ -244,6 +257,12 @@ func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOpt
}
}

func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) issuesOption {
return issuesOption{
callback: callback,
}
}

func WithCallOptions(opts ...grpc.CallOption) callOptionsOption {
return opts
}
Expand Down
15 changes: 15 additions & 0 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
Expand Down Expand Up @@ -38,6 +39,7 @@ type (
resultSetIndex int64
trace *trace.Query
statsCallback func(queryStats stats.QueryStats)
issuesCallback func(issues []*Ydb_Issue.IssueMessage)
onNextPartErr []func(err error)
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
closeTimeout time.Duration
Expand Down Expand Up @@ -92,6 +94,12 @@ func withStreamResultTrace(t *trace.Query) resultOption {
}
}

func withIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) resultOption {
return func(s *streamResult) {
s.issuesCallback = callback
}
}

func withStreamResultStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
return func(s *streamResult) {
s.statsCallback = callback
Expand Down Expand Up @@ -162,6 +170,13 @@ func newResult(
if part.GetExecStats() != nil && r.statsCallback != nil {
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
}
if r.issuesCallback != nil {
if r.lastPart != nil {
r.issuesCallback(r.lastPart.GetIssues())
} else {
r.issuesCallback(make([]*Ydb_Issue.IssueMessage, 0))
}
}

return &r, nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/query/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ type (
ScanNamed(dst ...scanner.NamedDestination) error
ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) error
}
resultOption func(s *Result)
Option interface {
ApplyResultOption(opts *resultOption)
}
)
5 changes: 5 additions & 0 deletions query/execute_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package query

import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
Expand Down Expand Up @@ -57,6 +58,10 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) ExecuteOption {
return options.WithStatsMode(mode, callback)
}

func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) ExecuteOption {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return options.WithIssuesHandler(callback)
}

// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse
// it isn't limit total size of answer
func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
Expand Down
80 changes: 80 additions & 0 deletions tests/integration/query_execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/decimal"
Expand Down Expand Up @@ -771,3 +772,82 @@ func TestIssue1785FillDecimalFields(t *testing.T) {
require.EqualValues(t, expectedVal, rd.DecimalVal)
})
}

// https://github.com/ydb-platform/ydb-go-sdk/issues/1872
func TestIssue1872QueryWarning(t *testing.T) {
ctx, cancel := context.WithCancel(xtest.Context(t))
defer cancel()
db, err := ydb.Open(ctx,
os.Getenv("YDB_CONNECTION_STRING"),
ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")),
ydb.WithTraceQuery(
log.Query(
log.Default(os.Stdout,
log.WithLogQuery(),
log.WithColoring(),
log.WithMinLevel(log.INFO),
),
trace.QueryEvents,
),
),
)
require.NoError(t, err)
_ = db.Query().Exec(ctx,
`drop table TestIssue1872QueryWarning;`,
)
err = db.Query().Exec(ctx,
`create table TestIssue1872QueryWarning
(Id uint64, Amount decimal(22,9) , primary key(Id));`,
query.WithParameters(
ydb.ParamsBuilder().
Param("$p1").Text("test1").
Build(),
),
)
require.NoError(t, err)

t.Run("Query", func(t *testing.T) {
var issueList []*Ydb_Issue.IssueMessage
q := db.Query()
result, err := q.Query(ctx, `
insert into TestIssue1872QueryWarning (Id, Amount) values (-3, Decimal("3.01",22,9));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler SQL query without table creation, and with issues:

DECLARE $x as String; SELECT 42;

Copy link
Contributor Author

@xelavopelk xelavopelk Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert to table is leaved as the second issue case

insert into TestIssue1872QueryWarning (Id, Amount) values (-5, Decimal("5.01",22,9));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues is returned when several ResultSets , for example:

DECLARE $x as String;
SELECT 42;
SELECT 43;

It is a bug in the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

`,
query.WithParameters(
ydb.ParamsBuilder().
Param("$p1").Text("test").
Build(),
),
query.WithSyntax(query.SyntaxYQL),
query.WithIdempotent(),
query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) {
issueList = issues
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add asserts inside callback:

Suggested change
issueList = issues
require.Len(t, issueList, 1)
require.Equal(t, "Failed to ...", issueList[0].Issues[0].Issues[0].Message)

and other asserts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

fmt.Printf("len=%d", len(issues))
}),
)
require.NoError(t, err)
require.Equal(t, 1, len(issueList))
require.Equal(t, "Type annotation", issueList[0].Message)
require.Equal(t, 2, len(issueList[0].Issues))
require.Equal(t, "At function: KiWriteTable!", issueList[0].Issues[0].Message)
require.Equal(t,
"Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>",
issueList[0].Issues[0].Issues[0].Message)
fmt.Printf("%#v", result)
})

t.Run("Exec", func(t *testing.T) {
var issueList []*Ydb_Issue.IssueMessage
q := db.Query()
err := q.Exec(ctx, `
insert into TestIssue1872QueryWarning (Id, Amount) values (-7, Decimal("37.01",22,9));
`,
query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) {
issueList = issues
fmt.Printf("len=%d", len(issues))
}),
)
require.NoError(t, err)
require.Equal(t, 1, len(issueList))
})
}
Loading