Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2455,7 +2455,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
// If this is the first-time execution of a portal without a limit set,
// it means all rows will be exhausted, so no need to pause this portal.
if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil {
portal.pauseInfo = nil
ex.disablePortalPausability(&portal)
}

stmtRes := ex.clientComm.CreateStatementResult(
Expand All @@ -2467,7 +2467,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
ex.sessionData().DataConversionConfig,
ex.sessionData().GetLocation(),
tcmd.Limit,
portalName,
portal.Name,
ex.implicitTxn(),
portal.portalPausablity,
)
Expand All @@ -2483,7 +2483,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
// followed by Sync (which is the common case), then we still can auto-commit,
// which allows the 1PC txn fast path to be used.
canAutoCommit := ex.implicitTxn() && tcmd.FollowedBySync
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo, canAutoCommit)
ev, payload, err = ex.execPortal(ctx, portal, stmtRes, pinfo, canAutoCommit)
return err
}()
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
Expand Down
137 changes: 66 additions & 71 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func (ex *connExecutor) recordFailure(p eventNonRetryableErrPayload) {
func (ex *connExecutor) execPortal(
ctx context.Context,
portal PreparedPortal,
portalName string,
stmtRes CommandResult,
pinfo *tree.PlaceholderInfo,
canAutoCommit bool,
Expand All @@ -290,7 +289,7 @@ func (ex *connExecutor) execPortal(
if portal.isPausable() {
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
ex.exhaustPortal(portalName)
ex.exhaustPortal(portal.Name)
})
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
}
Expand Down Expand Up @@ -330,8 +329,8 @@ func (ex *connExecutor) execPortal(
// to re-execute the portal from scratch.
// The current statement may have just closed and deleted the portal,
// so only exhaust it if it still exists.
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() {
defer ex.exhaustPortal(portalName)
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portal.Name]; ok && !portal.isPausable() {
defer ex.exhaustPortal(portal.Name)
}
return ev, payload, retErr

Expand Down Expand Up @@ -1695,6 +1694,9 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
}
}

// Note that here we process the cleanup function (which will append it
// to the cleanup queue) without a defer since there is no more code
// relevant to pausable portals model below.
processCleanupFunc(func() {
cancelQueryCtx := ctx
if portal.isPausable() {
Expand Down Expand Up @@ -2526,6 +2528,13 @@ func (ex *connExecutor) rollbackSQLTransaction(
return eventTxnFinishAborted{}, nil
}

func getPausablePortalInfo(p *planner) *portalPauseInfo {
if p != nil && p.pausablePortal != nil {
return p.pausablePortal.pauseInfo
}
return nil
}

// Each statement in an explicit READ COMMITTED transaction has a SAVEPOINT.
// This allows for TransactionRetry errors to be retried automatically. We don't
// do this for implicit transactions because the conn_executor state machine
Expand All @@ -2543,13 +2552,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
)
}

getPausablePortalInfo := func() *portalPauseInfo {
if p != nil && p.pausablePortal != nil {
return p.pausablePortal.pauseInfo
}
return nil
}
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
p.autoRetryStmtReason = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason
p.autoRetryStmtCounter = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter
}
Expand Down Expand Up @@ -2645,7 +2648,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
}
p.autoRetryStmtCounter++
p.autoRetryStmtReason = maybeRetryableErr
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason = p.autoRetryStmtReason
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter = p.autoRetryStmtCounter
}
Expand All @@ -2672,14 +2675,8 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
func (ex *connExecutor) dispatchToExecutionEngine(
ctx context.Context, planner *planner, res RestrictedCommandResult,
) (retErr error) {
getPausablePortalInfo := func() *portalPauseInfo {
if planner != nil && planner.pausablePortal != nil {
return planner.pausablePortal.pauseInfo
}
return nil
}
defer func() {
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
}
Expand Down Expand Up @@ -2743,34 +2740,50 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

var err error
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
ctx, err = ex.makeExecPlan(ctx, planner)
// TODO(janexing): This is a temporary solution to disallow procedure
// call statements that contain mutations for pausable portals. Since
// relational.CanMutate is not yet propagated from the function body
// via builder.BuildCall(), we must temporarily disallow all
// TCL statements, which includes the CALL statements.
// This should be removed once CanMutate is fully propagated.
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
if flags := planner.curPlan.flags; err == nil && (isTCL || flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
// We don't allow mutations in a pausable portal. Set it back to
// an un-pausable (normal) portal.
planner.pausablePortal.pauseInfo = nil
err = res.RevokePortalPausability()
// If this plan is a transaction control statement, we don't
// even execute it but just early exit.
if isTCL {
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
if err == nil {
// TODO(janexing): This is a temporary solution to disallow procedure
// call statements that contain mutations for pausable portals. Since
// relational.CanMutate is not yet propagated from the function body
// via builder.BuildCall(), we must temporarily disallow all
// TCL statements, which includes the CALL statements.
// This should be removed once CanMutate is fully propagated.
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
// We don't allow mutations in a pausable portal.
notReadOnly := isTCL || planner.curPlan.flags.IsSet(planFlagContainsMutation) || planner.curPlan.flags.IsSet(planFlagIsDDL)
// We don't allow sub / post queries for pausable portal.
hasSubOrPostQuery := len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 || len(planner.curPlan.triggers) != 0
if notReadOnly || hasSubOrPostQuery {
if notReadOnly {
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
} else {
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
}
// This stmt is not supported via the pausable portals model
// - set it back to an un-pausable (normal) portal.
ex.disablePortalPausability(planner.pausablePortal)
planner.pausablePortal = nil
err = res.RevokePortalPausability()
// If this plan is a transaction control statement, we don't
// even execute it but just early exit.
if isTCL {
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
}
defer planner.curPlan.close(ctx)
} else {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
defer func() {
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
})
}()
}
defer planner.curPlan.close(ctx)
} else {
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
})
defer planner.curPlan.close(ctx)
}
} else {
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
Expand Down Expand Up @@ -2825,32 +2838,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.sessionTracing.TracePlanCheckStart(ctx)

var afterGetPlanDistribution func()
if planner.pausablePortal != nil {
if len(planner.curPlan.subqueryPlans) == 0 &&
len(planner.curPlan.cascades) == 0 &&
len(planner.curPlan.checkPlans) == 0 &&
len(planner.curPlan.triggers) == 0 {
// We don't allow a distributed plan for pausable portals.
origDistSQLMode := ex.sessionData().DistSQLMode
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
afterGetPlanDistribution = func() {
ex.sessionData().DistSQLMode = origDistSQLMode
}
} else {
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
// We don't allow sub / post queries for pausable portal. Set it back to an
// un-pausable (normal) portal.
// With pauseInfo is nil, no cleanup function will be added to the stack
// and all clean-up steps will be performed as for normal portals.
// TODO(#115887): We may need to move resetting pauseInfo before we add
// the pausable portal cleanup step above.
planner.pausablePortal.pauseInfo = nil
// We need this so that the result consumption for this portal cannot be
// paused either.
if err := res.RevokePortalPausability(); err != nil {
res.SetError(err)
return nil
}
if getPausablePortalInfo(planner) != nil {
// We don't allow a distributed plan for pausable portals.
origDistSQLMode := ex.sessionData().DistSQLMode
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
afterGetPlanDistribution = func() {
ex.sessionData().DistSQLMode = origDistSQLMode
}
}
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
Expand Down Expand Up @@ -2903,7 +2896,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
stats, err := ex.execWithDistSQLEngine(
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
)
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
// For pausable portals, we log the stats when closing the portal, so we need
// to aggregate the stats for all executions.
ppInfo.dispatchToExecutionEngine.queryStats.add(&stats)
Expand All @@ -2930,10 +2923,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
// We need to ensure that we're using the planner bound to the first-time
// execution of a portal.
curPlanner := *planner
// Note that here we append the cleanup function without a defer since
// there is no more code relevant to pausable portals model below.
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
Expand Down
55 changes: 37 additions & 18 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ const (
// PreparedPortal is a PreparedStatement that has been bound with query
// arguments.
type PreparedPortal struct {
// Fields below are initialized when creating the PreparedPortal and aren't
// modified later.
Name string
Stmt *prep.Statement
Qargs tree.QueryArguments

// OutFormats contains the requested formats for the output columns.
OutFormats []pgwirebase.FormatCode

// Fields below might be updated throughout the PreparedPortal's lifecycle.

// exhausted tracks whether this portal has already been fully exhausted,
// meaning that any additional attempts to execute it should return no
// rows.
Expand Down Expand Up @@ -141,6 +145,16 @@ func (ex *connExecutor) makePreparedPortal(
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}

func (ex *connExecutor) disablePortalPausability(portal *PreparedPortal) {
portal.portalPausablity = PortalPausabilityDisabled
portal.pauseInfo = nil
// Since the PreparedPortal is stored by value in the map, we need to
// explicitly update it. (Note that PreparedPortal.pauseInfo is stored by
// pointer, so modifications to portalPauseInfo will be reflected in the map
// automatically.)
ex.extraTxnState.prepStmtsNamespace.portals[portal.Name] = *portal
}

// accountForCopy updates the state to account for the copy of the
// PreparedPortal (p is the copy).
func (p *PreparedPortal) accountForCopy(
Expand Down Expand Up @@ -175,23 +189,28 @@ func (p *PreparedPortal) isPausable() bool {
return p != nil && p.pauseInfo != nil
}

// cleanupFuncStack stores cleanup functions for a portal. The clean-up
// cleanupFuncQueue stores cleanup functions for a portal. The clean-up
// functions are added during the first-time execution of a portal. When the
// first-time execution is finished, we mark isComplete to true.
type cleanupFuncStack struct {
stack []func(context.Context)
//
// Generally, cleanup functions should be added in a defer (assuming that
// originally they were deferred as well). The functions will be appended to the
// end of the queue, which preserves the order of their execution if pausable
// portals model wasn't used.
type cleanupFuncQueue struct {
queue []func(context.Context)
isComplete bool
}

func (n *cleanupFuncStack) appendFunc(f func(context.Context)) {
n.stack = append(n.stack, f)
func (n *cleanupFuncQueue) appendFunc(f func(context.Context)) {
n.queue = append(n.queue, f)
}

func (n *cleanupFuncStack) run(ctx context.Context) {
for i := 0; i < len(n.stack); i++ {
n.stack[i](ctx)
func (n *cleanupFuncQueue) run(ctx context.Context) {
for i := 0; i < len(n.queue); i++ {
n.queue[i](ctx)
}
*n = cleanupFuncStack{}
*n = cleanupFuncQueue{}
}

// instrumentationHelperWrapper wraps the instrumentation helper.
Expand Down Expand Up @@ -226,23 +245,23 @@ type portalPauseInfo struct {
// When closing a portal, we need to follow the reverse order of its execution,
// which means running the cleanup functions of the four structs in the
// following order:
// - exhaustPortal.cleanup
// - execStmtInOpenState.cleanup
// - dispatchToExecutionEngine.cleanup
// - resumableFlow.cleanup
// - dispatchToExecutionEngine.cleanup
// - execStmtInOpenState.cleanup
// - exhaustPortal.cleanup
//
// If an error occurs in any of these functions, we run the cleanup functions of
// this layer and its children layers, and propagate the error to the parent
// layer. For example, if an error occurs in execStmtInOpenStateCleanup(), we
// run the cleanup functions in the following order:
// - execStmtInOpenState.cleanup
// - dispatchToExecutionEngine.cleanup
// - resumableFlow.cleanup
// - dispatchToExecutionEngine.cleanup
// - execStmtInOpenState.cleanup
//
// When exiting connExecutor.execStmtInOpenState(), we finally run the
// exhaustPortal.cleanup function in connExecutor.execPortal().
exhaustPortal struct {
cleanup cleanupFuncStack
cleanup cleanupFuncQueue
}

// TODO(sql-session): replace certain fields here with planner.
Expand All @@ -266,7 +285,7 @@ type portalPauseInfo struct {
// retErr is needed for the cleanup steps as we will have to check the latest
// encountered error, so this field should be updated for each execution.
retErr error
cleanup cleanupFuncStack
cleanup cleanupFuncQueue
}

dispatchReadCommittedStmtToExecutionEngine struct {
Expand Down Expand Up @@ -295,7 +314,7 @@ type portalPauseInfo struct {
// queryStats stores statistics on query execution. It is incremented for
// each execution of the portal.
queryStats *topLevelQueryStats
cleanup cleanupFuncStack
cleanup cleanupFuncQueue
}

resumableFlow struct {
Expand All @@ -307,7 +326,7 @@ type portalPauseInfo struct {
// We need this as when re-executing the portal, we are reusing the flow
// with the new receiver, but not re-generating the physical plan.
outputTypes []*types.T
cleanup cleanupFuncStack
cleanup cleanupFuncQueue
}
}

Expand Down