Skip to content

Commit b1937bb

Browse files
craig[bot]yuzefovich
andcommitted
Merge #155670
155670: sql: minor cleanup around portal pausability r=yuzefovich a=yuzefovich This PR contains several commits that cleanup and harden code around pausable portals. See each commit for details. Informs: https://github.com/cockroachlabs/support/issues/3463. Epic: None Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents eddf7cb + 2195a92 commit b1937bb

File tree

3 files changed

+105
-93
lines changed

3 files changed

+105
-93
lines changed

pkg/sql/conn_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2455,7 +2455,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24552455
// If this is the first-time execution of a portal without a limit set,
24562456
// it means all rows will be exhausted, so no need to pause this portal.
24572457
if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil {
2458-
portal.pauseInfo = nil
2458+
ex.disablePortalPausability(&portal)
24592459
}
24602460

24612461
stmtRes := ex.clientComm.CreateStatementResult(
@@ -2467,7 +2467,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24672467
ex.sessionData().DataConversionConfig,
24682468
ex.sessionData().GetLocation(),
24692469
tcmd.Limit,
2470-
portalName,
2470+
portal.Name,
24712471
ex.implicitTxn(),
24722472
portal.portalPausablity,
24732473
)
@@ -2483,7 +2483,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24832483
// followed by Sync (which is the common case), then we still can auto-commit,
24842484
// which allows the 1PC txn fast path to be used.
24852485
canAutoCommit := ex.implicitTxn() && tcmd.FollowedBySync
2486-
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo, canAutoCommit)
2486+
ev, payload, err = ex.execPortal(ctx, portal, stmtRes, pinfo, canAutoCommit)
24872487
return err
24882488
}()
24892489
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,

pkg/sql/conn_executor_exec.go

Lines changed: 65 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ func (ex *connExecutor) recordFailure(p eventNonRetryableErrPayload) {
281281
func (ex *connExecutor) execPortal(
282282
ctx context.Context,
283283
portal PreparedPortal,
284-
portalName string,
285284
stmtRes CommandResult,
286285
pinfo *tree.PlaceholderInfo,
287286
canAutoCommit bool,
@@ -290,7 +289,7 @@ func (ex *connExecutor) execPortal(
290289
if portal.isPausable() {
291290
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
292291
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
293-
ex.exhaustPortal(portalName)
292+
ex.exhaustPortal(portal.Name)
294293
})
295294
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
296295
}
@@ -330,8 +329,8 @@ func (ex *connExecutor) execPortal(
330329
// to re-execute the portal from scratch.
331330
// The current statement may have just closed and deleted the portal,
332331
// so only exhaust it if it still exists.
333-
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() {
334-
defer ex.exhaustPortal(portalName)
332+
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portal.Name]; ok && !portal.isPausable() {
333+
defer ex.exhaustPortal(portal.Name)
335334
}
336335
return ev, payload, retErr
337336

@@ -1695,6 +1694,9 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
16951694
}
16961695
}
16971696

1697+
// Note that here we process the cleanup function (which will append it
1698+
// to the cleanup queue) without a defer since there is no more code
1699+
// relevant to pausable portals model below.
16981700
processCleanupFunc(func() {
16991701
cancelQueryCtx := ctx
17001702
if portal.isPausable() {
@@ -2526,6 +2528,13 @@ func (ex *connExecutor) rollbackSQLTransaction(
25262528
return eventTxnFinishAborted{}, nil
25272529
}
25282530

2531+
func getPausablePortalInfo(p *planner) *portalPauseInfo {
2532+
if p != nil && p.pausablePortal != nil {
2533+
return p.pausablePortal.pauseInfo
2534+
}
2535+
return nil
2536+
}
2537+
25292538
// Each statement in an explicit READ COMMITTED transaction has a SAVEPOINT.
25302539
// This allows for TransactionRetry errors to be retried automatically. We don't
25312540
// do this for implicit transactions because the conn_executor state machine
@@ -2543,13 +2552,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
25432552
)
25442553
}
25452554

2546-
getPausablePortalInfo := func() *portalPauseInfo {
2547-
if p != nil && p.pausablePortal != nil {
2548-
return p.pausablePortal.pauseInfo
2549-
}
2550-
return nil
2551-
}
2552-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2555+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
25532556
p.autoRetryStmtReason = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason
25542557
p.autoRetryStmtCounter = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter
25552558
}
@@ -2645,7 +2648,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26452648
}
26462649
p.autoRetryStmtCounter++
26472650
p.autoRetryStmtReason = maybeRetryableErr
2648-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2651+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
26492652
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason = p.autoRetryStmtReason
26502653
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter = p.autoRetryStmtCounter
26512654
}
@@ -2672,14 +2675,8 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26722675
func (ex *connExecutor) dispatchToExecutionEngine(
26732676
ctx context.Context, planner *planner, res RestrictedCommandResult,
26742677
) (retErr error) {
2675-
getPausablePortalInfo := func() *portalPauseInfo {
2676-
if planner != nil && planner.pausablePortal != nil {
2677-
return planner.pausablePortal.pauseInfo
2678-
}
2679-
return nil
2680-
}
26812678
defer func() {
2682-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2679+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
26832680
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
26842681
ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
26852682
}
@@ -2743,34 +2740,48 @@ func (ex *connExecutor) dispatchToExecutionEngine(
27432740
}
27442741

27452742
var err error
2746-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2743+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
27472744
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
27482745
ctx, err = ex.makeExecPlan(ctx, planner)
2749-
// TODO(janexing): This is a temporary solution to disallow procedure
2750-
// call statements that contain mutations for pausable portals. Since
2751-
// relational.CanMutate is not yet propagated from the function body
2752-
// via builder.BuildCall(), we must temporarily disallow all
2753-
// TCL statements, which includes the CALL statements.
2754-
// This should be removed once CanMutate is fully propagated.
2755-
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2756-
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2757-
if flags := planner.curPlan.flags; err == nil && (isTCL || flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
2758-
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
2759-
// We don't allow mutations in a pausable portal. Set it back to
2760-
// an un-pausable (normal) portal.
2761-
planner.pausablePortal.pauseInfo = nil
2762-
err = res.RevokePortalPausability()
2763-
// If this plan is a transaction control statement, we don't
2764-
// even execute it but just early exit.
2765-
if isTCL {
2766-
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2746+
if err == nil {
2747+
// TODO(janexing): This is a temporary solution to disallow procedure
2748+
// call statements that contain mutations for pausable portals. Since
2749+
// relational.CanMutate is not yet propagated from the function body
2750+
// via builder.BuildCall(), we must temporarily disallow all
2751+
// TCL statements, which includes the CALL statements.
2752+
// This should be removed once CanMutate is fully propagated.
2753+
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2754+
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2755+
// We don't allow mutations in a pausable portal.
2756+
notReadOnly := isTCL || planner.curPlan.flags.IsSet(planFlagContainsMutation) || planner.curPlan.flags.IsSet(planFlagIsDDL)
2757+
// We don't allow sub / post queries for pausable portal.
2758+
hasSubOrPostQuery := len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.cascades) != 0 ||
2759+
len(planner.curPlan.checkPlans) != 0 || len(planner.curPlan.triggers) != 0
2760+
if notReadOnly || hasSubOrPostQuery {
2761+
if notReadOnly {
2762+
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
2763+
} else {
2764+
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
2765+
}
2766+
// This stmt is not supported via the pausable portals model
2767+
// - set it back to an un-pausable (normal) portal.
2768+
ex.disablePortalPausability(planner.pausablePortal)
2769+
planner.pausablePortal = nil
2770+
err = res.RevokePortalPausability()
2771+
// If this plan is a transaction control statement, we don't
2772+
// even execute it but just early exit.
2773+
if isTCL {
2774+
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2775+
}
2776+
defer planner.curPlan.close(ctx)
2777+
} else {
2778+
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
2779+
defer func() {
2780+
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2781+
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2782+
})
2783+
}()
27672784
}
2768-
defer planner.curPlan.close(ctx)
2769-
} else {
2770-
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
2771-
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2772-
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2773-
})
27742785
}
27752786
} else {
27762787
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
@@ -2825,32 +2836,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28252836
ex.sessionTracing.TracePlanCheckStart(ctx)
28262837

28272838
var afterGetPlanDistribution func()
2828-
if planner.pausablePortal != nil {
2829-
if len(planner.curPlan.subqueryPlans) == 0 &&
2830-
len(planner.curPlan.cascades) == 0 &&
2831-
len(planner.curPlan.checkPlans) == 0 &&
2832-
len(planner.curPlan.triggers) == 0 {
2833-
// We don't allow a distributed plan for pausable portals.
2834-
origDistSQLMode := ex.sessionData().DistSQLMode
2835-
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
2836-
afterGetPlanDistribution = func() {
2837-
ex.sessionData().DistSQLMode = origDistSQLMode
2838-
}
2839-
} else {
2840-
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
2841-
// We don't allow sub / post queries for pausable portal. Set it back to an
2842-
// un-pausable (normal) portal.
2843-
// With pauseInfo is nil, no cleanup function will be added to the stack
2844-
// and all clean-up steps will be performed as for normal portals.
2845-
// TODO(#115887): We may need to move resetting pauseInfo before we add
2846-
// the pausable portal cleanup step above.
2847-
planner.pausablePortal.pauseInfo = nil
2848-
// We need this so that the result consumption for this portal cannot be
2849-
// paused either.
2850-
if err := res.RevokePortalPausability(); err != nil {
2851-
res.SetError(err)
2852-
return nil
2853-
}
2839+
if getPausablePortalInfo(planner) != nil {
2840+
// We don't allow a distributed plan for pausable portals.
2841+
origDistSQLMode := ex.sessionData().DistSQLMode
2842+
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
2843+
afterGetPlanDistribution = func() {
2844+
ex.sessionData().DistSQLMode = origDistSQLMode
28542845
}
28552846
}
28562847
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
@@ -2903,7 +2894,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29032894
stats, err := ex.execWithDistSQLEngine(
29042895
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
29052896
)
2906-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2897+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
29072898
// For pausable portals, we log the stats when closing the portal, so we need
29082899
// to aggregate the stats for all executions.
29092900
ppInfo.dispatchToExecutionEngine.queryStats.add(&stats)
@@ -2930,10 +2921,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29302921
ex.extraTxnState.bytesRead += stats.bytesRead
29312922
ex.extraTxnState.rowsWritten += stats.rowsWritten
29322923

2933-
if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
2924+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
29342925
// We need to ensure that we're using the planner bound to the first-time
29352926
// execution of a portal.
29362927
curPlanner := *planner
2928+
// Note that here we append the cleanup function without a defer since
2929+
// there is no more code relevant to pausable portals model below.
29372930
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
29382931
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
29392932
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(

pkg/sql/prepared_stmt.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,17 @@ const (
8282
// PreparedPortal is a PreparedStatement that has been bound with query
8383
// arguments.
8484
type PreparedPortal struct {
85+
// Fields below are initialized when creating the PreparedPortal and aren't
86+
// modified later.
8587
Name string
8688
Stmt *prep.Statement
8789
Qargs tree.QueryArguments
8890

8991
// OutFormats contains the requested formats for the output columns.
9092
OutFormats []pgwirebase.FormatCode
9193

94+
// Fields below might be updated throughout the PreparedPortal's lifecycle.
95+
9296
// exhausted tracks whether this portal has already been fully exhausted,
9397
// meaning that any additional attempts to execute it should return no
9498
// rows.
@@ -141,6 +145,16 @@ func (ex *connExecutor) makePreparedPortal(
141145
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
142146
}
143147

148+
func (ex *connExecutor) disablePortalPausability(portal *PreparedPortal) {
149+
portal.portalPausablity = PortalPausabilityDisabled
150+
portal.pauseInfo = nil
151+
// Since the PreparedPortal is stored by value in the map, we need to
152+
// explicitly update it. (Note that PreparedPortal.pauseInfo is stored by
153+
// pointer, so modifications to portalPauseInfo will be reflected in the map
154+
// automatically.)
155+
ex.extraTxnState.prepStmtsNamespace.portals[portal.Name] = *portal
156+
}
157+
144158
// accountForCopy updates the state to account for the copy of the
145159
// PreparedPortal (p is the copy).
146160
func (p *PreparedPortal) accountForCopy(
@@ -175,23 +189,28 @@ func (p *PreparedPortal) isPausable() bool {
175189
return p != nil && p.pauseInfo != nil
176190
}
177191

178-
// cleanupFuncStack stores cleanup functions for a portal. The clean-up
192+
// cleanupFuncQueue stores cleanup functions for a portal. The clean-up
179193
// functions are added during the first-time execution of a portal. When the
180194
// first-time execution is finished, we mark isComplete to true.
181-
type cleanupFuncStack struct {
182-
stack []func(context.Context)
195+
//
196+
// Generally, cleanup functions should be added in a defer (assuming that
197+
// originally they were deferred as well). The functions will be appended to the
198+
// end of the queue, which preserves the order of their execution if pausable
199+
// portals model wasn't used.
200+
type cleanupFuncQueue struct {
201+
queue []func(context.Context)
183202
isComplete bool
184203
}
185204

186-
func (n *cleanupFuncStack) appendFunc(f func(context.Context)) {
187-
n.stack = append(n.stack, f)
205+
func (n *cleanupFuncQueue) appendFunc(f func(context.Context)) {
206+
n.queue = append(n.queue, f)
188207
}
189208

190-
func (n *cleanupFuncStack) run(ctx context.Context) {
191-
for i := 0; i < len(n.stack); i++ {
192-
n.stack[i](ctx)
209+
func (n *cleanupFuncQueue) run(ctx context.Context) {
210+
for i := 0; i < len(n.queue); i++ {
211+
n.queue[i](ctx)
193212
}
194-
*n = cleanupFuncStack{}
213+
*n = cleanupFuncQueue{}
195214
}
196215

197216
// instrumentationHelperWrapper wraps the instrumentation helper.
@@ -226,23 +245,23 @@ type portalPauseInfo struct {
226245
// When closing a portal, we need to follow the reverse order of its execution,
227246
// which means running the cleanup functions of the four structs in the
228247
// following order:
229-
// - exhaustPortal.cleanup
230-
// - execStmtInOpenState.cleanup
231-
// - dispatchToExecutionEngine.cleanup
232248
// - resumableFlow.cleanup
249+
// - dispatchToExecutionEngine.cleanup
250+
// - execStmtInOpenState.cleanup
251+
// - exhaustPortal.cleanup
233252
//
234253
// If an error occurs in any of these functions, we run the cleanup functions of
235254
// this layer and its children layers, and propagate the error to the parent
236255
// layer. For example, if an error occurs in execStmtInOpenStateCleanup(), we
237256
// run the cleanup functions in the following order:
238-
// - execStmtInOpenState.cleanup
239-
// - dispatchToExecutionEngine.cleanup
240257
// - resumableFlow.cleanup
258+
// - dispatchToExecutionEngine.cleanup
259+
// - execStmtInOpenState.cleanup
241260
//
242261
// When exiting connExecutor.execStmtInOpenState(), we finally run the
243262
// exhaustPortal.cleanup function in connExecutor.execPortal().
244263
exhaustPortal struct {
245-
cleanup cleanupFuncStack
264+
cleanup cleanupFuncQueue
246265
}
247266

248267
// TODO(sql-session): replace certain fields here with planner.
@@ -266,7 +285,7 @@ type portalPauseInfo struct {
266285
// retErr is needed for the cleanup steps as we will have to check the latest
267286
// encountered error, so this field should be updated for each execution.
268287
retErr error
269-
cleanup cleanupFuncStack
288+
cleanup cleanupFuncQueue
270289
}
271290

272291
dispatchReadCommittedStmtToExecutionEngine struct {
@@ -295,7 +314,7 @@ type portalPauseInfo struct {
295314
// queryStats stores statistics on query execution. It is incremented for
296315
// each execution of the portal.
297316
queryStats *topLevelQueryStats
298-
cleanup cleanupFuncStack
317+
cleanup cleanupFuncQueue
299318
}
300319

301320
resumableFlow struct {
@@ -307,7 +326,7 @@ type portalPauseInfo struct {
307326
// We need this as when re-executing the portal, we are reusing the flow
308327
// with the new receiver, but not re-generating the physical plan.
309328
outputTypes []*types.T
310-
cleanup cleanupFuncStack
329+
cleanup cleanupFuncQueue
311330
}
312331
}
313332

0 commit comments

Comments
 (0)