Skip to content

Commit

Permalink
[SPARK-51247][SQL] Move SubstituteExecuteImmediate to 'resolution' ba…
Browse files Browse the repository at this point in the history
…tch and prepare it for SQL Scripting local variables

### What changes were proposed in this pull request?
This PR changes `SubstituteExecuteImmediate`  to analyze it's entire subtree within a scoped context. This will allow us to disable SQL scripting local variables in the subtree, when they are added, which is necessary in order to sandbox the generated plan.

This PR also moves `SubstituteExecuteImmediate` to `resolution` batch in the analyzer. This is necessary in order to resolve arguments of EXECUTE IMMEDIATE properly, notably if the EXECUTE IMMEDIATE is the child of a `ParameterizedQuery`. This ensured proper resolution ordering i.e. first all parameters of EXECUTE IMMEDIATE will be resolved, and only then will the generated query itself be analyzed.

Local variables PR - #49445

### Why are the changes needed?
They are necessaty for local variables support in SQL scripting.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests and golden files.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49993 from dusantism-db/execute-immediate-resolution-batch.

Authored-by: Dušan Tišma <dusan.tisma@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 26febf7)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
dusantism-db authored and cloud-fan committed Feb 19, 2025
1 parent db6c1d0 commit d0a7db4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo
* if `t` was a permanent table when the current view was created, it
* should still be a permanent table when resolving the current view,
* even if a temp view `t` has been created.
* @param isExecuteImmediate Whether the current plan is created by EXECUTE IMMEDIATE. Used when
* resolving variables, as SQL Scripting local variables should not be
* visible from EXECUTE IMMEDIATE.
* @param outerPlan The query plan from the outer query that can be used to resolve star
* expressions in a subquery.
*/
Expand All @@ -154,6 +157,7 @@ case class AnalysisContext(
referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
referredTempVariableNames: Seq[Seq[String]] = Seq.empty,
outerPlan: Option[LogicalPlan] = None,
isExecuteImmediate: Boolean = false,

/**
* This is a bridge state between this fixed-point [[Analyzer]] and a single-pass [[Resolver]].
Expand Down Expand Up @@ -208,7 +212,16 @@ object AnalysisContext {
originContext.relationCache,
viewDesc.viewReferredTempViewNames,
mutable.Set(viewDesc.viewReferredTempFunctionNames: _*),
viewDesc.viewReferredTempVariableNames)
viewDesc.viewReferredTempVariableNames,
isExecuteImmediate = originContext.isExecuteImmediate)
set(context)
try f finally { set(originContext) }
}

def withExecuteImmediateContext[A](f: => A): A = {
val originContext = value.get()
val context = originContext.copy(isExecuteImmediate = true)

set(context)
try f finally { set(originContext) }
}
Expand Down Expand Up @@ -325,7 +338,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
new SubstituteExecuteImmediate(catalogManager),
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
Expand Down Expand Up @@ -401,6 +413,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolveRowLevelCommandAssignments ::
MoveParameterizedQueriesDown ::
BindParameters ::
new SubstituteExecuteImmediate(
catalogManager,
resolveChild = executeSameContext,
checkAnalysis = checkAnalysis) ::
typeCoercionRules() ++
Seq(
ResolveWithCTE,
Expand Down Expand Up @@ -1670,6 +1686,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
resolveReferencesInSort(s)

// Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]].
case e : ExecuteImmediateQuery => e

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}")
q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ case class ExecuteImmediateQuery(
}

/**
* This rule substitutes execute immediate query node with plan that is passed as string literal
* or session parameter.
* This rule substitutes execute immediate query node with fully analyzed
* plan that is passed as string literal or session parameter.
*/
class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
extends Rule[LogicalPlan]
with ColumnResolutionHelper {
class SubstituteExecuteImmediate(
val catalogManager: CatalogManager,
resolveChild: LogicalPlan => LogicalPlan,
checkAnalysis: LogicalPlan => Unit)
extends Rule[LogicalPlan] with ColumnResolutionHelper {

def resolveVariable(e: Expression): Expression = {

Expand Down Expand Up @@ -106,7 +108,12 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)

override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) {
case ExecuteImmediateQuery(expressions, query, targetVariables) =>
case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) =>
e.copy(args = resolveArguments(expressions))

case ExecuteImmediateQuery(expressions, query, targetVariables)
if expressions.forall(_.resolved) =>

val queryString = extractQueryString(query)
val plan = parseStatement(queryString, targetVariables)

Expand All @@ -123,21 +130,16 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
} else {
if (posNodes.nonEmpty) {
PosParameterizedQuery(
plan,
// We need to resolve arguments before Resolution batch to make sure
// that some rule does not accidentally resolve our parameters.
// We do not want this as they can resolve some unsupported parameters
resolveArguments(expressions))
PosParameterizedQuery(plan, expressions)
} else {
val aliases = expressions.collect {
case e: Alias => e
case u: UnresolvedAttribute => Alias(u, u.nameParts.last)()
case u: VariableReference => Alias(u, u.identifier.name())()
}

if (aliases.size != expressions.size) {
val nonAliases = expressions.filter(attr =>
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[UnresolvedAttribute])
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference])

throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases)
}
Expand All @@ -148,13 +150,20 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
// We need to resolve arguments before Resolution batch to make sure
// that some rule does not accidentally resolve our parameters.
// We do not want this as they can resolve some unsupported parameters.
resolveArguments(aliases))
aliases)
}
}

// Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure
// that SQL scripting local variables will not be accessed from the plan.
val finalPlan = AnalysisContext.withExecuteImmediateContext {
resolveChild(queryPlan)
}
checkAnalysis(finalPlan)

if (targetVariables.nonEmpty) {
SetVariable(targetVariables, queryPlan)
} else { queryPlan }
SetVariable(targetVariables, finalPlan)
} else { finalPlan }
}

private def parseStatement(
Expand Down

0 comments on commit d0a7db4

Please sign in to comment.