From c06110187b3e41405fc13aba367abdd4183ed9a6 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 22 Apr 2016 20:30:51 -0700 Subject: [SPARK-14842][SQL] Implement view creation in sql/core ## What changes were proposed in this pull request? This patch re-implements view creation command in sql/core, based on the pre-existing view creation command in the Hive module. This consolidates the view creation logical command and physical command into a single one, called CreateViewCommand. ## How was this patch tested? All the code should've been tested by existing tests. Author: Reynold Xin Closes #12615 from rxin/SPARK-14842-2. --- .../org/apache/spark/sql/types/DataType.scala | 3 + .../apache/spark/sql/types/UserDefinedType.scala | 2 + .../spark/sql/execution/SparkSqlParser.scala | 8 +- .../apache/spark/sql/execution/command/views.scala | 132 +++++++++++++++++++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 19 +-- .../apache/spark/sql/hive/client/HiveClient.scala | 6 - .../spark/sql/hive/client/HiveClientImpl.scala | 8 -- .../sql/hive/execution/CreateViewAsSelect.scala | 137 --------------------- .../spark/sql/hive/HiveDDLCommandSuite.scala | 7 +- 9 files changed, 140 insertions(+), 182 deletions(-) delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 3d4a02b0ff..4fc65cbce1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -62,6 +62,9 @@ abstract class DataType extends AbstractDataType { /** Readable string representation for the type. */ def simpleString: String = typeName + /** String representation for the type saved in external catalogs. */ + def catalogString: String = simpleString + /** Readable string representation for the type with truncation */ private[sql] def simpleString(maxNumberFields: Int): String = simpleString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala index 71a9b9f808..aa36121bde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala @@ -91,6 +91,8 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa case that: UserDefinedType[_] => this.acceptsType(that) case _ => false } + + override def catalogString: String = sqlType.simpleString } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e983a4cee6..7dc888cdde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + * Create or replace a view. This creates a [[CreateViewCommand]] command. * * For example: * {{{ @@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command. + * Alter the query of a view. This creates a [[CreateViewCommand]] command. */ override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { createView( @@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CreateViewAsSelectLogicalCommand]] command. + * Create a [[CreateViewCommand]] command. */ private def createView( ctx: ParserRuleContext, @@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { viewOriginalText = sql, viewText = sql, comment = comment) - CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) + CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index aa6112c7f0..082f944f99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -17,16 +17,136 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} +import scala.util.control.NonFatal -case class CreateViewAsSelectLogicalCommand( +import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.catalyst.SQLBuilder +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} + + +/** + * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of + * depending on Hive meta-store. + * + * @param tableDesc the catalog table + * @param child the logical plan that represents the view; this is used to generate a canonicalized + * version of the SQL that can be saved in the catalog. + * @param allowExisting if true, and if the view already exists, noop; if false, and if the view + * already exists, throws analysis exception. + * @param replace if true, and if the view already exists, updates it; if false, and if the view + * already exists, throws analysis exception. + * @param sql the original sql + */ +case class CreateViewCommand( tableDesc: CatalogTable, child: LogicalPlan, allowExisting: Boolean, replace: Boolean, - sql: String) extends UnaryNode with Command { + sql: String) + extends RunnableCommand { + + // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is + // different from Hive and may not work for some cases like create view on self join. + override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = false + + require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW) + require(tableDesc.viewText.isDefined) + + private val tableIdentifier = tableDesc.identifier + + if (allowExisting && replace) { + throw new AnalysisException( + "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + val analzyedPlan = sqlContext.executePlan(child).analyzed + + require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length) + val sessionState = sqlContext.sessionState + + if (sessionState.catalog.tableExists(tableIdentifier)) { + if (allowExisting) { + // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view + // already exists. + } else if (replace) { + // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` + sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan)) + } else { + // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already + // exists. + throw new AnalysisException(s"View $tableIdentifier already exists. " + + "If you want to update the view definition, please use ALTER VIEW AS or " + + "CREATE OR REPLACE VIEW AS") + } + } else { + // Create the view if it doesn't exist. + sessionState.catalog.createTable( + prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false) + } + + Seq.empty[Row] + } + + private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = { + val expandedText = if (sqlContext.conf.canonicalView) { + try rebuildViewQueryString(sqlContext, analzyedPlan) catch { + case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan) + } + } else { + wrapViewTextWithSelect(analzyedPlan) + } + + val viewSchema = { + if (tableDesc.schema.isEmpty) { + analzyedPlan.output.map { a => + CatalogColumn(a.name, a.dataType.simpleString) + } + } else { + analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) => + CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment) + } + } + } + + tableDesc.copy(schema = viewSchema, viewText = Some(expandedText)) + } + + private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = { + // When user specified column names for view, we should create a project to do the renaming. + // When no column name specified, we still need to create a project to declare the columns + // we need, to make us more robust to top level `*`s. + val viewOutput = { + val columnNames = analzyedPlan.output.map(f => quote(f.name)) + if (tableDesc.schema.isEmpty) { + columnNames.mkString(", ") + } else { + columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { + case (name, alias) => s"$name AS $alias" + }.mkString(", ") + } + } + + val viewText = tableDesc.viewText.get + val viewName = quote(tableDesc.identifier.table) + s"SELECT $viewOutput FROM ($viewText) $viewName" + } + + private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = { + val logicalPlan = if (tableDesc.schema.isEmpty) { + analzyedPlan + } else { + val projectList = analzyedPlan.output.zip(tableDesc.schema).map { + case (attr, col) => Alias(attr, col.name)() + } + sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed + } + new SQLBuilder(logicalPlan).toSQL + } + + // escape backtick with double-backtick in column name and wrap it with backtick. + private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index df2b6beac6..6ccff454b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand, HiveNativeCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} @@ -629,22 +629,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if p.resolved => p - case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) - if conf.nativeView => - if (allowExisting && replace) { - throw new AnalysisException( - "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") - } - - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) - - execution.CreateViewAsSelect( - table.copy(identifier = TableIdentifier(tblName, Some(dbName))), - child, - allowExisting, - replace) - - case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) => + case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView => HiveNativeCommand(sql) case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 6f7e7bf451..ae719f86aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -75,12 +75,6 @@ private[hive] trait HiveClient { /** Returns the metadata for the specified table or None if it doesn't exist. */ def getTableOption(dbName: String, tableName: String): Option[CatalogTable] - /** Creates a view with the given metadata. */ - def createView(view: CatalogTable): Unit - - /** Updates the given view with new metadata. */ - def alertView(view: CatalogTable): Unit - /** Creates a table with the given metadata. */ def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 703d991829..6327431368 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -342,14 +342,6 @@ private[hive] class HiveClientImpl( } } - override def createView(view: CatalogTable): Unit = withHiveState { - client.createTable(toHiveViewTable(view)) - } - - override def alertView(view: CatalogTable): Unit = withHiveState { - client.alterTable(view.qualifiedName, toHiveViewTable(view)) - } - override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { client.createTable(toHiveTable(table), ignoreIfExists) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala deleted file mode 100644 index fa830a1a0e..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import scala.util.control.NonFatal - -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} -import org.apache.spark.sql.catalyst.SQLBuilder -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.Alias -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveSessionState} - -/** - * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of - * depending on Hive meta-store. - */ -// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different -// from Hive and may not work for some cases like create view on self join. -private[hive] case class CreateViewAsSelect( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean, - orReplace: Boolean) extends RunnableCommand { - - private val childSchema = child.output - - assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) - assert(tableDesc.viewText.isDefined) - - private val tableIdentifier = tableDesc.identifier - - override def run(sqlContext: SQLContext): Seq[Row] = { - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - - sessionState.catalog.tableExists(tableIdentifier) match { - case true if allowExisting => - // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view - // already exists. - - case true if orReplace => - // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - sessionState.metadataHive.alertView(prepareTable(sqlContext)) - - case true => - // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already - // exists. - throw new AnalysisException(s"View $tableIdentifier already exists. " + - "If you want to update the view definition, please use ALTER VIEW AS or " + - "CREATE OR REPLACE VIEW AS") - - case false => - sessionState.metadataHive.createView(prepareTable(sqlContext)) - } - - Seq.empty[Row] - } - - private def prepareTable(sqlContext: SQLContext): CatalogTable = { - val expandedText = if (sqlContext.conf.canonicalView) { - try rebuildViewQueryString(sqlContext) catch { - case NonFatal(e) => wrapViewTextWithSelect - } - } else { - wrapViewTextWithSelect - } - - val viewSchema = { - if (tableDesc.schema.isEmpty) { - childSchema.map { a => - CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType)) - } - } else { - childSchema.zip(tableDesc.schema).map { case (a, col) => - CatalogColumn( - col.name, - HiveMetastoreTypes.toMetastoreType(a.dataType), - nullable = true, - col.comment) - } - } - } - - tableDesc.copy(schema = viewSchema, viewText = Some(expandedText)) - } - - private def wrapViewTextWithSelect: String = { - // When user specified column names for view, we should create a project to do the renaming. - // When no column name specified, we still need to create a project to declare the columns - // we need, to make us more robust to top level `*`s. - val viewOutput = { - val columnNames = childSchema.map(f => quote(f.name)) - if (tableDesc.schema.isEmpty) { - columnNames.mkString(", ") - } else { - columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { - case (name, alias) => s"$name AS $alias" - }.mkString(", ") - } - } - - val viewText = tableDesc.viewText.get - val viewName = quote(tableDesc.identifier.table) - s"SELECT $viewOutput FROM ($viewText) $viewName" - } - - private def rebuildViewQueryString(sqlContext: SQLContext): String = { - val logicalPlan = if (tableDesc.schema.isEmpty) { - child - } else { - val projectList = childSchema.zip(tableDesc.schema).map { - case (attr, col) => Alias(attr, col.name)() - } - sqlContext.executePlan(Project(projectList, child)).analyzed - } - new SQLBuilder(logicalPlan).toSQL - } - - // escape backtick with double-backtick in column name and wrap it with backtick. - private def quote(name: String) = s"`${name.replaceAll("`", "``")}`" -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index f6e3a4bd2d..e3204ff793 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, HiveNativeCommand, LoadData} import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { @@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest { parser.parsePlan(sql).collect { case CreateTable(desc, allowExisting) => (desc, allowExisting) case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting) - case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting) + case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting) }.head } @@ -521,14 +521,13 @@ class HiveDDLCommandSuite extends PlanTest { test("create view - full") { val v1 = """ - |CREATE OR REPLACE VIEW IF NOT EXISTS view1 + |CREATE OR REPLACE VIEW view1 |(col1, col3) |COMMENT 'BLABLA' |TBLPROPERTIES('prop1Key'="prop1Val") |AS SELECT * FROM tab1 """.stripMargin val (desc, exists) = extractTableDesc(v1) - assert(exists) assert(desc.identifier.database.isEmpty) assert(desc.identifier.table == "view1") assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW) -- cgit v1.2.3