diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-22 20:30:51 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-22 20:30:51 -0700 |
commit | c06110187b3e41405fc13aba367abdd4183ed9a6 (patch) | |
tree | a00225fb89f7d58542092e7ec6752af628d02d94 /sql/core/src/main/scala/org/apache | |
parent | 7dde1da949d430c20a128bc3c6e5fe5c0271da11 (diff) | |
download | spark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.gz spark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.bz2 spark-c06110187b3e41405fc13aba367abdd4183ed9a6.zip |
[SPARK-14842][SQL] Implement view creation in sql/core
## What changes were proposed in this pull request?
This patch re-implements view creation command in sql/core, based on the pre-existing view creation command in the Hive module. This consolidates the view creation logical command and physical command into a single one, called CreateViewCommand.
## How was this patch tested?
All the code should've been tested by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12615 from rxin/SPARK-14842-2.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 8 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala | 132 |
2 files changed, 130 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e983a4cee6..7dc888cdde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + * Create or replace a view. This creates a [[CreateViewCommand]] command. * * For example: * {{{ @@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + * Alter the query of a view. This creates a [[CreateViewCommand]] command. */ override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { createView( @@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CreateViewAsSelectLogicalCommand]] command. + * Create a [[CreateViewCommand]] command. */ private def createView( ctx: ParserRuleContext, @@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { viewOriginalText = sql, viewText = sql, comment = comment) - CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) + CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index aa6112c7f0..082f944f99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -17,16 +17,136 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} +import scala.util.control.NonFatal -case class CreateViewAsSelectLogicalCommand( +import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.catalyst.SQLBuilder +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} + + +/** + * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of + * depending on Hive meta-store. + * + * @param tableDesc the catalog table + * @param child the logical plan that represents the view; this is used to generate a canonicalized + * version of the SQL that can be saved in the catalog. + * @param allowExisting if true, and if the view already exists, noop; if false, and if the view + * already exists, throws analysis exception. + * @param replace if true, and if the view already exists, updates it; if false, and if the view + * already exists, throws analysis exception. + * @param sql the original sql + */ +case class CreateViewCommand( tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, replace: Boolean, - sql: String) extends UnaryNode with Command { + sql: String) + extends RunnableCommand { + + // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is + // different from Hive and may not work for some cases like create view on self join. + override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = false + + require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW) + require(tableDesc.viewText.isDefined) + + private val tableIdentifier = tableDesc.identifier + + if (allowExisting && replace) { + throw new AnalysisException( + "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + val analzyedPlan = sqlContext.executePlan(child).analyzed + + require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length) + val sessionState = sqlContext.sessionState + + if (sessionState.catalog.tableExists(tableIdentifier)) { + if (allowExisting) { + // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view + // already exists. + } else if (replace) { + // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` + sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan)) + } else { + // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already + // exists. + throw new AnalysisException(s"View $tableIdentifier already exists. " + + "If you want to update the view definition, please use ALTER VIEW AS or " + + "CREATE OR REPLACE VIEW AS") + } + } else { + // Create the view if it doesn't exist. + sessionState.catalog.createTable( + prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false) + } + + Seq.empty[Row] + } + + private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = { + val expandedText = if (sqlContext.conf.canonicalView) { + try rebuildViewQueryString(sqlContext, analzyedPlan) catch { + case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan) + } + } else { + wrapViewTextWithSelect(analzyedPlan) + } + + val viewSchema = { + if (tableDesc.schema.isEmpty) { + analzyedPlan.output.map { a => + CatalogColumn(a.name, a.dataType.simpleString) + } + } else { + analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) => + CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment) + } + } + } + + tableDesc.copy(schema = viewSchema, viewText = Some(expandedText)) + } + + private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = { + // When user specified column names for view, we should create a project to do the renaming. + // When no column name specified, we still need to create a project to declare the columns + // we need, to make us more robust to top level `*`s. + val viewOutput = { + val columnNames = analzyedPlan.output.map(f => quote(f.name)) + if (tableDesc.schema.isEmpty) { + columnNames.mkString(", ") + } else { + columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { + case (name, alias) => s"$name AS $alias" + }.mkString(", ") + } + } + + val viewText = tableDesc.viewText.get + val viewName = quote(tableDesc.identifier.table) + s"SELECT $viewOutput FROM ($viewText) $viewName" + } + + private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = { + val logicalPlan = if (tableDesc.schema.isEmpty) { + analzyedPlan + } else { + val projectList = analzyedPlan.output.zip(tableDesc.schema).map { + case (attr, col) => Alias(attr, col.name)() + } + sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed + } + new SQLBuilder(logicalPlan).toSQL + } + + // escape backtick with double-backtick in column name and wrap it with backtick. + private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" } |