diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f59e0f473739..e0321dbb19678 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -137,6 +137,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * if `t` was a permanent table when the current view was created, it * should still be a permanent table when resolving the current view, * even if a temp view `t` has been created. + * @param isExecuteImmediate Whether the current plan is created by EXECUTE IMMEDIATE. Used when + * resolving variables, as SQL Scripting local variables should not be + * visible from EXECUTE IMMEDIATE. * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. */ @@ -154,6 +157,7 @@ case class AnalysisContext( referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty, referredTempVariableNames: Seq[Seq[String]] = Seq.empty, outerPlan: Option[LogicalPlan] = None, + isExecuteImmediate: Boolean = false, /** * This is a bridge state between this fixed-point [[Analyzer]] and a single-pass [[Resolver]]. @@ -208,7 +212,16 @@ object AnalysisContext { originContext.relationCache, viewDesc.viewReferredTempViewNames, mutable.Set(viewDesc.viewReferredTempFunctionNames: _*), - viewDesc.viewReferredTempVariableNames) + viewDesc.viewReferredTempVariableNames, + isExecuteImmediate = originContext.isExecuteImmediate) + set(context) + try f finally { set(originContext) } + } + + def withExecuteImmediateContext[A](f: => A): A = { + val originContext = value.get() + val context = originContext.copy(isExecuteImmediate = true) + set(context) try f finally { set(originContext) } } @@ -325,7 +338,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor override def batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, - new SubstituteExecuteImmediate(catalogManager), // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule. // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early @@ -401,6 +413,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ResolveRowLevelCommandAssignments :: MoveParameterizedQueriesDown :: BindParameters :: + new SubstituteExecuteImmediate( + catalogManager, + resolveChild = executeSameContext, + checkAnalysis = checkAnalysis) :: typeCoercionRules() ++ Seq( ResolveWithCTE, @@ -1670,6 +1686,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case s: Sort if !s.resolved || s.missingInput.nonEmpty => resolveReferencesInSort(s) + // Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]]. + case e : ExecuteImmediateQuery => e + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}") q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala index b9e20f59bb0ee..2b7eed0dc7950 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala @@ -44,12 +44,14 @@ case class ExecuteImmediateQuery( } /** - * This rule substitutes execute immediate query node with plan that is passed as string literal - * or session parameter. + * This rule substitutes execute immediate query node with fully analyzed + * plan that is passed as string literal or session parameter. */ -class SubstituteExecuteImmediate(val catalogManager: CatalogManager) - extends Rule[LogicalPlan] - with ColumnResolutionHelper { +class SubstituteExecuteImmediate( + val catalogManager: CatalogManager, + resolveChild: LogicalPlan => LogicalPlan, + checkAnalysis: LogicalPlan => Unit) + extends Rule[LogicalPlan] with ColumnResolutionHelper { def resolveVariable(e: Expression): Expression = { @@ -106,7 +108,12 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { - case ExecuteImmediateQuery(expressions, query, targetVariables) => + case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) => + e.copy(args = resolveArguments(expressions)) + + case ExecuteImmediateQuery(expressions, query, targetVariables) + if expressions.forall(_.resolved) => + val queryString = extractQueryString(query) val plan = parseStatement(queryString, targetVariables) @@ -123,21 +130,16 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) throw QueryCompilationErrors.invalidQueryMixedQueryParameters() } else { if (posNodes.nonEmpty) { - PosParameterizedQuery( - plan, - // We need to resolve arguments before Resolution batch to make sure - // that some rule does not accidentally resolve our parameters. - // We do not want this as they can resolve some unsupported parameters - resolveArguments(expressions)) + PosParameterizedQuery(plan, expressions) } else { val aliases = expressions.collect { case e: Alias => e - case u: UnresolvedAttribute => Alias(u, u.nameParts.last)() + case u: VariableReference => Alias(u, u.identifier.name())() } if (aliases.size != expressions.size) { val nonAliases = expressions.filter(attr => - !attr.isInstanceOf[Alias] && !attr.isInstanceOf[UnresolvedAttribute]) + !attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference]) throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases) } @@ -148,13 +150,20 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) // We need to resolve arguments before Resolution batch to make sure // that some rule does not accidentally resolve our parameters. // We do not want this as they can resolve some unsupported parameters. - resolveArguments(aliases)) + aliases) } } + // Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure + // that SQL scripting local variables will not be accessed from the plan. + val finalPlan = AnalysisContext.withExecuteImmediateContext { + resolveChild(queryPlan) + } + checkAnalysis(finalPlan) + if (targetVariables.nonEmpty) { - SetVariable(targetVariables, queryPlan) - } else { queryPlan } + SetVariable(targetVariables, finalPlan) + } else { finalPlan } } private def parseStatement(