From f7c9ff57c17a950cccdc26aadf8768c899a4d572 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 16 Aug 2016 23:09:53 -0700 Subject: [SPARK-17068][SQL] Make view-usage visible during analysis ## What changes were proposed in this pull request? This PR adds a field to subquery alias in order to make the usage of views in a resolved `LogicalPlan` more visible (and more understandable). For example, the following view and query: ```sql create view constants as select 1 as id union all select 1 union all select 42 select * from constants; ``` ...now yields the following analyzed plan: ``` Project [id#39] +- SubqueryAlias c, `default`.`constants` +- Project [gen_attr_0#36 AS id#39] +- SubqueryAlias gen_subquery_0 +- Union :- Union : :- Project [1 AS gen_attr_0#36] : : +- OneRowRelation$ : +- Project [1 AS gen_attr_1#37] : +- OneRowRelation$ +- Project [42 AS gen_attr_2#38] +- OneRowRelation$ ``` ## How was this patch tested? Added tests for the two code paths in `SessionCatalogSuite` (sql/core) and `HiveMetastoreCatalogSuite` (sql/hive) Author: Herman van Hovell Closes #14657 from hvanhovell/SPARK-17068. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 +-- .../sql/catalyst/catalog/SessionCatalog.scala | 30 ++++++++++++---------- .../apache/spark/sql/catalyst/dsl/package.scala | 4 +-- .../spark/sql/catalyst/expressions/subquery.scala | 8 +++--- .../spark/sql/catalyst/optimizer/Optimizer.scala | 8 +++--- .../spark/sql/catalyst/parser/AstBuilder.scala | 4 +-- .../plans/logical/basicLogicalOperators.scala | 7 ++++- .../sql/catalyst/analysis/AnalysisSuite.scala | 4 +-- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 19 +++++++++----- .../catalyst/optimizer/ColumnPruningSuite.scala | 8 +++--- .../optimizer/EliminateSubqueryAliasesSuite.scala | 6 ++--- .../catalyst/optimizer/JoinOptimizationSuite.scala | 8 +++--- .../sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../org/apache/spark/sql/catalyst/SQLBuilder.scala | 6 ++--- .../spark/sql/execution/datasources/rules.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 21 +++++++-------- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 4 +-- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 14 +++++++++- 20 files changed, 94 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a2a022c247..bd4c19181f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -138,7 +138,7 @@ class Analyzer( case u : UnresolvedRelation => val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) .map(_._2).map { relation => - val withAlias = u.alias.map(SubqueryAlias(_, relation)) + val withAlias = u.alias.map(SubqueryAlias(_, relation, None)) withAlias.getOrElse(relation) } substituted.getOrElse(u) @@ -2057,7 +2057,7 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child) => child + case SubqueryAlias(_, child, _) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 41b7e62d8c..e07e9194be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -141,8 +141,8 @@ trait CheckAnalysis extends PredicateHelper { // Skip projects and subquery aliases added by the Analyzer and the SQLBuilder. def cleanQuery(p: LogicalPlan): LogicalPlan = p match { - case SubqueryAlias(_, child) => cleanQuery(child) - case Project(_, child) => cleanQuery(child) + case s: SubqueryAlias => cleanQuery(s.child) + case p: Project => cleanQuery(p.child) case child => child } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 00c3db0aac..62d0da076b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -411,27 +411,29 @@ class SessionCatalog( } /** - * Return a [[LogicalPlan]] that represents the given table. + * Return a [[LogicalPlan]] that represents the given table or view. * - * If a database is specified in `name`, this will return the table from that database. - * If no database is specified, this will first attempt to return a temporary table with - * the same name, then, if that does not exist, return the table from the current database. + * If a database is specified in `name`, this will return the table/view from that database. + * If no database is specified, this will first attempt to return a temporary table/view with + * the same name, then, if that does not exist, return the table/view from the current database. + * + * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will + * track the name of the view. */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - val relation = - if (name.database.isDefined || !tempTables.contains(table)) { - val metadata = externalCatalog.getTable(db, table) - SimpleCatalogRelation(db, metadata) - } else { - tempTables(table) + val relationAlias = alias.getOrElse(table) + if (name.database.isDefined || !tempTables.contains(table)) { + val metadata = externalCatalog.getTable(db, table) + val view = Option(metadata.tableType).collect { + case CatalogTableType.VIEW => name } - val qualifiedTable = SubqueryAlias(table, relation) - // If an alias was specified by the lookup, wrap the plan in a subquery so that - // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view) + } else { + SubqueryAlias(relationAlias, tempTables(table), Option(name)) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 5181dcc786..9f54d709a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -343,7 +343,7 @@ package object dsl { orderSpec: Seq[SortOrder]): LogicalPlan = Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) - def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) + def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None) def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan) @@ -367,7 +367,7 @@ package object dsl { def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) - case plan => SubqueryAlias(alias, plan) + case plan => SubqueryAlias(alias, plan, None) } def repartition(num: Integer): LogicalPlan = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index ac44f08897..ddbe937cba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -72,7 +72,7 @@ case class ScalarSubquery( override def dataType: DataType = query.schema.fields.head.dataType override def foldable: Boolean = false override def nullable: Boolean = true - override def plan: LogicalPlan = SubqueryAlias(toString, query) + override def plan: LogicalPlan = SubqueryAlias(toString, query, None) override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(query = plan) override def toString: String = s"scalar-subquery#${exprId.id} $conditionString" } @@ -100,7 +100,7 @@ case class PredicateSubquery( override lazy val resolved = childrenResolved && query.resolved override lazy val references: AttributeSet = super.references -- query.outputSet override def nullable: Boolean = nullAware - override def plan: LogicalPlan = SubqueryAlias(toString, query) + override def plan: LogicalPlan = SubqueryAlias(toString, query, None) override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(query = plan) override def semanticEquals(o: Expression): Boolean = o match { case p: PredicateSubquery => @@ -153,7 +153,7 @@ case class ListQuery(query: LogicalPlan, exprId: ExprId = NamedExpression.newExp override def dataType: DataType = ArrayType(NullType) override def nullable: Boolean = false override def withNewPlan(plan: LogicalPlan): ListQuery = copy(query = plan) - override def plan: LogicalPlan = SubqueryAlias(toString, query) + override def plan: LogicalPlan = SubqueryAlias(toString, query, None) override def toString: String = s"list#${exprId.id}" } @@ -174,6 +174,6 @@ case class Exists(query: LogicalPlan, exprId: ExprId = NamedExpression.newExprId override def children: Seq[Expression] = Seq.empty override def nullable: Boolean = false override def withNewPlan(plan: LogicalPlan): Exists = copy(query = plan) - override def plan: LogicalPlan = SubqueryAlias(toString, query) + override def plan: LogicalPlan = SubqueryAlias(toString, query, None) override def toString: String = s"exists#${exprId.id}" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e34a478818..f97a78b411 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1862,7 +1862,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { // and Project operators, followed by an optional Filter, followed by an // Aggregate. Traverse the operators recursively. def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match { - case SubqueryAlias(_, child) => evalPlan(child) + case SubqueryAlias(_, child, _) => evalPlan(child) case Filter(condition, child) => val bindings = evalPlan(child) if (bindings.isEmpty) bindings @@ -1920,7 +1920,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart += p bottomPart = child - case s @ SubqueryAlias(_, child) => + case s @ SubqueryAlias(_, child, _) => topPart += s bottomPart = child @@ -1991,8 +1991,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { topPart.reverse.foreach { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) - case s @ SubqueryAlias(alias, _) => - subqueryRoot = SubqueryAlias(alias, subqueryRoot) + case s @ SubqueryAlias(alias, _, None) => + subqueryRoot = SubqueryAlias(alias, subqueryRoot, None) case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 09b650ce18..adf78396d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -107,7 +107,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * This is only used for Common Table Expressions. */ override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) { - SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith)) + SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith), None) } /** @@ -723,7 +723,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an alias (SubqueryAlias) for a LogicalPlan. */ private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = { - SubqueryAlias(alias.getText, plan) + SubqueryAlias(alias.getText, plan, None) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2917d8d2a9..af1736e607 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -693,7 +694,11 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo } } -case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode { +case class SubqueryAlias( + alias: String, + child: LogicalPlan, + view: Option[TableIdentifier]) + extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 22e1c9be05..8971edc7d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -339,8 +339,8 @@ class AnalysisSuite extends AnalysisTest { val query = Project(Seq($"x.key", $"y.key"), Join( - Project(Seq($"x.key"), SubqueryAlias("x", input)), - Project(Seq($"y.key"), SubqueryAlias("y", input)), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), + Project(Seq($"y.key"), SubqueryAlias("y", input, None)), Inner, None)) assertAnalysisSuccess(query) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index b31b4406ae..c9d4fef805 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -395,31 +395,38 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1))) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1)) + == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1))) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) } test("lookup table relation with alias") { val catalog = new SessionCatalog(newBasicCatalog()) val alias = "monster" val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) - val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata)) + val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None) val relationWithAlias = SubqueryAlias(alias, - SubqueryAlias("tbl1", - SimpleCatalogRelation("db2", tableMetadata))) + SimpleCatalogRelation("db2", tableMetadata), None) assert(catalog.lookupRelation( TableIdentifier("tbl1", Some("db2")), alias = None) == relation) assert(catalog.lookupRelation( TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } + test("lookup view with view name in alias") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tmpView = Range(1, 10, 2, 10) + catalog.createTempView("vw1", tmpView, overrideIfExists = false) + val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range")) + assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))) + } + test("table exists") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 589607e3ad..5bd1bc80c3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input), - BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze + SubqueryAlias("x", input, None), + BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input)), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), BroadcastHint( - Project(Seq($"y.key"), SubqueryAlias("y", input))), + Project(Seq($"y.key"), SubqueryAlias("y", input, None))), Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala index 9b6d68aee8..a8aeedbd62 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate top level subquery") { val input = LocalRelation('a.int, 'b.int) - val query = SubqueryAlias("a", input) + val query = SubqueryAlias("a", input, None) comparePlans(afterOptimization(query), input) } test("eliminate mid-tree subquery") { val input = LocalRelation('a.int, 'b.int) - val query = Filter(TrueLiteral, SubqueryAlias("a", input)) + val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate multiple subqueries") { val input = LocalRelation('a.int, 'b.int) val query = Filter(TrueLiteral, - SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input)))) + SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None)) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index c1ebf8b09e..dbb3e6a527 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -97,15 +97,15 @@ class JoinOptimizationSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input), - BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze + SubqueryAlias("x", input, None), + BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input)), - BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), + BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))), Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 34d52c75e0..7af333b34f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -80,7 +80,7 @@ class PlanParserSuite extends PlanTest { def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { val ctes = namedPlans.map { case (name, cte) => - name -> SubqueryAlias(name, cte) + name -> SubqueryAlias(name, cte, None) } With(plan, ctes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c119df83b3..6da99ce0dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -967,7 +967,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def as(alias: String): Dataset[T] = withTypedPlan { - SubqueryAlias(alias, logicalPlan) + SubqueryAlias(alias, logicalPlan, None) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index 5d93419f35..ff8e0f2642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -75,7 +75,7 @@ class SQLBuilder private ( val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map { case (attr, name) => Alias(attr.withQualifier(None), name)() } - val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan)) + val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)) try { val replaced = finalPlan.transformAllExpressions { @@ -440,7 +440,7 @@ class SQLBuilder private ( object RemoveSubqueriesAboveSQLTable extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, t @ ExtractSQLTable(_)) => t + case SubqueryAlias(_, t @ ExtractSQLTable(_), _) => t } } @@ -557,7 +557,7 @@ class SQLBuilder private ( } private def addSubquery(plan: LogicalPlan): SubqueryAlias = { - SubqueryAlias(newSubqueryName(), plan) + SubqueryAlias(newSubqueryName(), plan, None) } private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index fc8d8c3667..5eb2f0a9ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { s"${u.tableIdentifier.database.get}") } val plan = LogicalRelation(dataSource.resolveRelation()) - u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan) + u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan) } catch { case e: ClassNotFoundException => u case e: Exception => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7c1acda25..7118edabb8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -162,24 +162,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (table.properties.get(DATASOURCE_PROVIDER).isDefined) { val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable) + val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) } else if (table.tableType == CatalogTableType.VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) - alias match { - case None => - SubqueryAlias(table.identifier.table, - sparkSession.sessionState.sqlParser.parsePlan(viewText)) - case Some(aliasText) => - SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText)) - } + SubqueryAlias( + alias.getOrElse(table.identifier.table), + sparkSession.sessionState.sqlParser.parsePlan(viewText), + Option(table.identifier)) } else { val qualifiedTable = MetastoreRelation( qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) } } @@ -383,7 +380,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation) + SubqueryAlias(relation.tableName, parquetRelation, None) } } } @@ -421,7 +418,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation) + SubqueryAlias(relation.tableName, orcRelation, None) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index c59ac3dcaf..ebed9eb6e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -68,10 +68,10 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.lookupRelation(newName, alias) } else { val relation = tempTables(table) - val tableWithQualifiers = SubqueryAlias(table, relation) + val tableWithQualifiers = SubqueryAlias(table, relation, None) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) + alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 9d72367f43..0477ea4d4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -23,12 +23,13 @@ import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, IntegerType, StringType, StructField, StructType} -class HiveMetastoreCatalogSuite extends TestHiveSingleton { +class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { import spark.implicits._ test("struct field should accept underscore in sub-column name") { @@ -57,6 +58,17 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton { val dataType = StructType((1 to 100).map(field)) assert(CatalystSqlParser.parseDataType(dataType.catalogString) == dataType) } + + test("view relation") { + withView("vw1") { + spark.sql("create view vw1 as select 1 as id") + val plan = spark.sql("select id from vw1").queryExecution.analyzed + val aliases = plan.collect { + case x @ SubqueryAlias("vw1", _, Some(TableIdentifier("vw1", Some("default")))) => x + } + assert(aliases.size == 1) + } + } } class DataSourceWithHiveMetastoreCatalogSuite -- cgit v1.2.3