aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-22 20:30:51 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 20:30:51 -0700
commitc06110187b3e41405fc13aba367abdd4183ed9a6 (patch)
treea00225fb89f7d58542092e7ec6752af628d02d94 /sql/core/src/main/scala/org/apache
parent7dde1da949d430c20a128bc3c6e5fe5c0271da11 (diff)
downloadspark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.gz
spark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.bz2
spark-c06110187b3e41405fc13aba367abdd4183ed9a6.zip
[SPARK-14842][SQL] Implement view creation in sql/core
## What changes were proposed in this pull request? This patch re-implements view creation command in sql/core, based on the pre-existing view creation command in the Hive module. This consolidates the view creation logical command and physical command into a single one, called CreateViewCommand. ## How was this patch tested? All the code should've been tested by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12615 from rxin/SPARK-14842-2.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala132
2 files changed, 130 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e983a4cee6..7dc888cdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ * Create or replace a view. This creates a [[CreateViewCommand]] command.
*
* For example:
* {{{
@@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ * Alter the query of a view. This creates a [[CreateViewCommand]] command.
*/
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
createView(
@@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a [[CreateViewAsSelectLogicalCommand]] command.
+ * Create a [[CreateViewCommand]] command.
*/
private def createView(
ctx: ParserRuleContext,
@@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
viewOriginalText = sql,
viewText = sql,
comment = comment)
- CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+ CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index aa6112c7f0..082f944f99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,16 +17,136 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+import scala.util.control.NonFatal
-case class CreateViewAsSelectLogicalCommand(
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ *
+ * @param tableDesc the catalog table
+ * @param child the logical plan that represents the view; this is used to generate a canonicalized
+ * version of the SQL that can be saved in the catalog.
+ * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
+ * already exists, throws analysis exception.
+ * @param replace if true, and if the view already exists, updates it; if false, and if the view
+ * already exists, throws analysis exception.
+ * @param sql the original sql
+ */
+case class CreateViewCommand(
tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- sql: String) extends UnaryNode with Command {
+ sql: String)
+ extends RunnableCommand {
+
+ // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
+ // different from Hive and may not work for some cases like create view on self join.
+
override def output: Seq[Attribute] = Seq.empty[Attribute]
- override lazy val resolved: Boolean = false
+
+ require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+ require(tableDesc.viewText.isDefined)
+
+ private val tableIdentifier = tableDesc.identifier
+
+ if (allowExisting && replace) {
+ throw new AnalysisException(
+ "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+ }
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val analzyedPlan = sqlContext.executePlan(child).analyzed
+
+ require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+ val sessionState = sqlContext.sessionState
+
+ if (sessionState.catalog.tableExists(tableIdentifier)) {
+ if (allowExisting) {
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+ } else if (replace) {
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+ } else {
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
+ throw new AnalysisException(s"View $tableIdentifier already exists. " +
+ "If you want to update the view definition, please use ALTER VIEW AS or " +
+ "CREATE OR REPLACE VIEW AS")
+ }
+ } else {
+ // Create the view if it doesn't exist.
+ sessionState.catalog.createTable(
+ prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
+ val expandedText = if (sqlContext.conf.canonicalView) {
+ try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
+ case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+ }
+ } else {
+ wrapViewTextWithSelect(analzyedPlan)
+ }
+
+ val viewSchema = {
+ if (tableDesc.schema.isEmpty) {
+ analzyedPlan.output.map { a =>
+ CatalogColumn(a.name, a.dataType.simpleString)
+ }
+ } else {
+ analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+ CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
+ }
+ }
+ }
+
+ tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+ }
+
+ private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
+ // When user specified column names for view, we should create a project to do the renaming.
+ // When no column name specified, we still need to create a project to declare the columns
+ // we need, to make us more robust to top level `*`s.
+ val viewOutput = {
+ val columnNames = analzyedPlan.output.map(f => quote(f.name))
+ if (tableDesc.schema.isEmpty) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
+ }
+
+ val viewText = tableDesc.viewText.get
+ val viewName = quote(tableDesc.identifier.table)
+ s"SELECT $viewOutput FROM ($viewText) $viewName"
+ }
+
+ private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
+ val logicalPlan = if (tableDesc.schema.isEmpty) {
+ analzyedPlan
+ } else {
+ val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
+ }
+ new SQLBuilder(logicalPlan).toSQL
+ }
+
+ // escape backtick with double-backtick in column name and wrap it with backtick.
+ private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}