aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-22 20:30:51 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 20:30:51 -0700
commitc06110187b3e41405fc13aba367abdd4183ed9a6 (patch)
treea00225fb89f7d58542092e7ec6752af628d02d94
parent7dde1da949d430c20a128bc3c6e5fe5c0271da11 (diff)
downloadspark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.gz
spark-c06110187b3e41405fc13aba367abdd4183ed9a6.tar.bz2
spark-c06110187b3e41405fc13aba367abdd4183ed9a6.zip
[SPARK-14842][SQL] Implement view creation in sql/core
## What changes were proposed in this pull request? This patch re-implements view creation command in sql/core, based on the pre-existing view creation command in the Hive module. This consolidates the view creation logical command and physical command into a single one, called CreateViewCommand. ## How was this patch tested? All the code should've been tested by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12615 from rxin/SPARK-14842-2.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala132
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala137
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala7
9 files changed, 140 insertions, 182 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 3d4a02b0ff..4fc65cbce1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -62,6 +62,9 @@ abstract class DataType extends AbstractDataType {
/** Readable string representation for the type. */
def simpleString: String = typeName
+ /** String representation for the type saved in external catalogs. */
+ def catalogString: String = simpleString
+
/** Readable string representation for the type with truncation */
private[sql] def simpleString(maxNumberFields: Int): String = simpleString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
index 71a9b9f808..aa36121bde 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -91,6 +91,8 @@ abstract class UserDefinedType[UserType >: Null] extends DataType with Serializa
case that: UserDefinedType[_] => this.acceptsType(that)
case _ => false
}
+
+ override def catalogString: String = sqlType.simpleString
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e983a4cee6..7dc888cdde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1101,7 +1101,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create or replace a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ * Create or replace a view. This creates a [[CreateViewCommand]] command.
*
* For example:
* {{{
@@ -1134,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Alter the query of a view. This creates a [[CreateViewAsSelectLogicalCommand]] command.
+ * Alter the query of a view. This creates a [[CreateViewCommand]] command.
*/
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
createView(
@@ -1149,7 +1149,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
- * Create a [[CreateViewAsSelectLogicalCommand]] command.
+ * Create a [[CreateViewCommand]] command.
*/
private def createView(
ctx: ParserRuleContext,
@@ -1170,7 +1170,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
viewOriginalText = sql,
viewText = sql,
comment = comment)
- CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+ CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index aa6112c7f0..082f944f99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,16 +17,136 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
+import scala.util.control.NonFatal
-case class CreateViewAsSelectLogicalCommand(
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ *
+ * @param tableDesc the catalog table
+ * @param child the logical plan that represents the view; this is used to generate a canonicalized
+ * version of the SQL that can be saved in the catalog.
+ * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
+ * already exists, throws analysis exception.
+ * @param replace if true, and if the view already exists, updates it; if false, and if the view
+ * already exists, throws analysis exception.
+ * @param sql the original sql
+ */
+case class CreateViewCommand(
tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- sql: String) extends UnaryNode with Command {
+ sql: String)
+ extends RunnableCommand {
+
+ // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
+ // different from Hive and may not work for some cases like create view on self join.
+
override def output: Seq[Attribute] = Seq.empty[Attribute]
- override lazy val resolved: Boolean = false
+
+ require(tableDesc.tableType == CatalogTableType.VIRTUAL_VIEW)
+ require(tableDesc.viewText.isDefined)
+
+ private val tableIdentifier = tableDesc.identifier
+
+ if (allowExisting && replace) {
+ throw new AnalysisException(
+ "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+ }
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val analzyedPlan = sqlContext.executePlan(child).analyzed
+
+ require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+ val sessionState = sqlContext.sessionState
+
+ if (sessionState.catalog.tableExists(tableIdentifier)) {
+ if (allowExisting) {
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+ } else if (replace) {
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+ } else {
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
+ throw new AnalysisException(s"View $tableIdentifier already exists. " +
+ "If you want to update the view definition, please use ALTER VIEW AS or " +
+ "CREATE OR REPLACE VIEW AS")
+ }
+ } else {
+ // Create the view if it doesn't exist.
+ sessionState.catalog.createTable(
+ prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
+ val expandedText = if (sqlContext.conf.canonicalView) {
+ try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
+ case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+ }
+ } else {
+ wrapViewTextWithSelect(analzyedPlan)
+ }
+
+ val viewSchema = {
+ if (tableDesc.schema.isEmpty) {
+ analzyedPlan.output.map { a =>
+ CatalogColumn(a.name, a.dataType.simpleString)
+ }
+ } else {
+ analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+ CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
+ }
+ }
+ }
+
+ tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+ }
+
+ private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
+ // When user specified column names for view, we should create a project to do the renaming.
+ // When no column name specified, we still need to create a project to declare the columns
+ // we need, to make us more robust to top level `*`s.
+ val viewOutput = {
+ val columnNames = analzyedPlan.output.map(f => quote(f.name))
+ if (tableDesc.schema.isEmpty) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
+ }
+
+ val viewText = tableDesc.viewText.get
+ val viewName = quote(tableDesc.identifier.table)
+ s"SELECT $viewOutput FROM ($viewText) $viewName"
+ }
+
+ private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
+ val logicalPlan = if (tableDesc.schema.isEmpty) {
+ analzyedPlan
+ } else {
+ val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
+ }
+ new SQLBuilder(logicalPlan).toSQL
+ }
+
+ // escape backtick with double-backtick in column name and wrap it with backtick.
+ private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index df2b6beac6..6ccff454b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
@@ -629,22 +629,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql)
- if conf.nativeView =>
- if (allowExisting && replace) {
- throw new AnalysisException(
- "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
- }
-
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
-
- execution.CreateViewAsSelect(
- table.copy(identifier = TableIdentifier(tblName, Some(dbName))),
- child,
- allowExisting,
- replace)
-
- case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) =>
+ case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView =>
HiveNativeCommand(sql)
case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f7e7bf451..ae719f86aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -75,12 +75,6 @@ private[hive] trait HiveClient {
/** Returns the metadata for the specified table or None if it doesn't exist. */
def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
- /** Creates a view with the given metadata. */
- def createView(view: CatalogTable): Unit
-
- /** Updates the given view with new metadata. */
- def alertView(view: CatalogTable): Unit
-
/** Creates a table with the given metadata. */
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 703d991829..6327431368 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -342,14 +342,6 @@ private[hive] class HiveClientImpl(
}
}
- override def createView(view: CatalogTable): Unit = withHiveState {
- client.createTable(toHiveViewTable(view))
- }
-
- override def alertView(view: CatalogTable): Unit = withHiveState {
- client.alterTable(view.qualifiedName, toHiveViewTable(view))
- }
-
override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
client.createTable(toHiveTable(table), ignoreIfExists)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
deleted file mode 100644
index fa830a1a0e..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
-import org.apache.spark.sql.catalyst.SQLBuilder
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveSessionState}
-
-/**
- * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
- * depending on Hive meta-store.
- */
-// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
-// from Hive and may not work for some cases like create view on self join.
-private[hive] case class CreateViewAsSelect(
- tableDesc: CatalogTable,
- child: LogicalPlan,
- allowExisting: Boolean,
- orReplace: Boolean) extends RunnableCommand {
-
- private val childSchema = child.output
-
- assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
- assert(tableDesc.viewText.isDefined)
-
- private val tableIdentifier = tableDesc.identifier
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-
- sessionState.catalog.tableExists(tableIdentifier) match {
- case true if allowExisting =>
- // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
- // already exists.
-
- case true if orReplace =>
- // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.metadataHive.alertView(prepareTable(sqlContext))
-
- case true =>
- // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
- // exists.
- throw new AnalysisException(s"View $tableIdentifier already exists. " +
- "If you want to update the view definition, please use ALTER VIEW AS or " +
- "CREATE OR REPLACE VIEW AS")
-
- case false =>
- sessionState.metadataHive.createView(prepareTable(sqlContext))
- }
-
- Seq.empty[Row]
- }
-
- private def prepareTable(sqlContext: SQLContext): CatalogTable = {
- val expandedText = if (sqlContext.conf.canonicalView) {
- try rebuildViewQueryString(sqlContext) catch {
- case NonFatal(e) => wrapViewTextWithSelect
- }
- } else {
- wrapViewTextWithSelect
- }
-
- val viewSchema = {
- if (tableDesc.schema.isEmpty) {
- childSchema.map { a =>
- CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
- }
- } else {
- childSchema.zip(tableDesc.schema).map { case (a, col) =>
- CatalogColumn(
- col.name,
- HiveMetastoreTypes.toMetastoreType(a.dataType),
- nullable = true,
- col.comment)
- }
- }
- }
-
- tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
- }
-
- private def wrapViewTextWithSelect: String = {
- // When user specified column names for view, we should create a project to do the renaming.
- // When no column name specified, we still need to create a project to declare the columns
- // we need, to make us more robust to top level `*`s.
- val viewOutput = {
- val columnNames = childSchema.map(f => quote(f.name))
- if (tableDesc.schema.isEmpty) {
- columnNames.mkString(", ")
- } else {
- columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
- case (name, alias) => s"$name AS $alias"
- }.mkString(", ")
- }
- }
-
- val viewText = tableDesc.viewText.get
- val viewName = quote(tableDesc.identifier.table)
- s"SELECT $viewOutput FROM ($viewText) $viewName"
- }
-
- private def rebuildViewQueryString(sqlContext: SQLContext): String = {
- val logicalPlan = if (tableDesc.schema.isEmpty) {
- child
- } else {
- val projectList = childSchema.zip(tableDesc.schema).map {
- case (attr, col) => Alias(attr, col.name)()
- }
- sqlContext.executePlan(Project(projectList, child)).analyzed
- }
- new SQLBuilder(logicalPlan).toSQL
- }
-
- // escape backtick with double-backtick in column name and wrap it with backtick.
- private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index f6e3a4bd2d..e3204ff793 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, HiveNativeCommand, LoadData}
import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
parser.parsePlan(sql).collect {
case CreateTable(desc, allowExisting) => (desc, allowExisting)
case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
- case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
+ case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
}.head
}
@@ -521,14 +521,13 @@ class HiveDDLCommandSuite extends PlanTest {
test("create view - full") {
val v1 =
"""
- |CREATE OR REPLACE VIEW IF NOT EXISTS view1
+ |CREATE OR REPLACE VIEW view1
|(col1, col3)
|COMMENT 'BLABLA'
|TBLPROPERTIES('prop1Key'="prop1Val")
|AS SELECT * FROM tab1
""".stripMargin
val (desc, exists) = extractTableDesc(v1)
- assert(exists)
assert(desc.identifier.database.isEmpty)
assert(desc.identifier.table == "view1")
assert(desc.tableType == CatalogTableType.VIRTUAL_VIEW)