aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-25 22:02:00 +0800
committerCheng Lian <lian@databricks.com>2016-07-25 22:02:00 +0800
commitd27d362ebae0c4a5cc6c99f13ef20049214dd4f9 (patch)
tree0f214921fc8ac1aff74f0c8e4f171adb724c43fc
parent7ffd99ec5f267730734431097cbb700ad074bebe (diff)
downloadspark-d27d362ebae0c4a5cc6c99f13ef20049214dd4f9.tar.gz
spark-d27d362ebae0c4a5cc6c99f13ef20049214dd4f9.tar.bz2
spark-d27d362ebae0c4a5cc6c99f13ef20049214dd4f9.zip
[SPARK-16660][SQL] CreateViewCommand should not take CatalogTable
## What changes were proposed in this pull request? `CreateViewCommand` only needs some information of a `CatalogTable`, but not all of them. We have some tricks(e.g. we need to check the table type is `VIEW`, we need to make `CatalogColumn.dataType` nullable) to allow it to take a `CatalogTable`. This PR cleans it up and only pass in necessary information to `CreateViewCommand`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14297 from cloud-fan/minor2.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala111
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala46
6 files changed, 116 insertions, 127 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index b7f35b3af4..2a20651459 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -81,9 +81,9 @@ object CatalogStorageFormat {
*/
case class CatalogColumn(
name: String,
- // This may be null when used to create views. TODO: make this type-safe; this is left
- // as a string due to issues in converting Hive varchars to and from SparkSQL strings.
- @Nullable dataType: String,
+ // TODO: make this type-safe; this is left as a string due to issues in converting Hive
+ // varchars to and from SparkSQL strings.
+ dataType: String,
nullable: Boolean = true,
comment: Option[String] = None) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b28ecb753f..8b6443c8b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2421,13 +2421,7 @@ class Dataset[T] private[sql](
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
- val tableDesc = CatalogTable(
- identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
- tableType = CatalogTableType.VIEW,
- schema = Seq.empty[CatalogColumn],
- storage = CatalogStorageFormat.empty)
- CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false,
- isTemporary = true)
+ createViewCommand(viewName, replace = false)
}
/**
@@ -2438,12 +2432,19 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
- val tableDesc = CatalogTable(
- identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
- tableType = CatalogTableType.VIEW,
- schema = Seq.empty[CatalogColumn],
- storage = CatalogStorageFormat.empty)
- CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true,
+ createViewCommand(viewName, replace = true)
+ }
+
+ private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
+ CreateViewCommand(
+ name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
+ userSpecifiedColumns = Nil,
+ comment = None,
+ properties = Map.empty,
+ originalText = None,
+ child = logicalPlan,
+ allowExisting = false,
+ replace = replace,
isTemporary = true)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 9b09801896..5e1ad9b885 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1235,20 +1235,21 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.identifierList != null) {
operationNotAllowed("CREATE VIEW ... PARTITIONED ON", ctx)
} else {
- val identifiers = Option(ctx.identifierCommentList).toSeq.flatMap(_.identifierComment.asScala)
- val schema = identifiers.map { ic =>
- CatalogColumn(ic.identifier.getText, null, nullable = true, Option(ic.STRING).map(string))
+ val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap { icl =>
+ icl.identifierComment.asScala.map { ic =>
+ ic.identifier.getText -> Option(ic.STRING).map(string)
+ }
}
createView(
ctx,
ctx.tableIdentifier,
comment = Option(ctx.STRING).map(string),
- schema,
+ userSpecifiedColumns,
ctx.query,
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
- ctx.EXISTS != null,
- ctx.REPLACE != null,
- ctx.TEMPORARY != null
+ allowExisting = ctx.EXISTS != null,
+ replace = ctx.REPLACE != null,
+ isTemporary = ctx.TEMPORARY != null
)
}
}
@@ -1259,12 +1260,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
createView(
ctx,
- ctx.tableIdentifier,
+ name = ctx.tableIdentifier,
comment = None,
- Seq.empty,
- ctx.query,
- Map.empty,
- allowExist = false,
+ userSpecifiedColumns = Seq.empty,
+ query = ctx.query,
+ properties = Map.empty,
+ allowExisting = false,
replace = true,
isTemporary = false)
}
@@ -1276,23 +1277,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx: ParserRuleContext,
name: TableIdentifierContext,
comment: Option[String],
- schema: Seq[CatalogColumn],
+ userSpecifiedColumns: Seq[(String, Option[String])],
query: QueryContext,
properties: Map[String, String],
- allowExist: Boolean,
+ allowExisting: Boolean,
replace: Boolean,
isTemporary: Boolean): LogicalPlan = {
- val sql = Option(source(query))
- val tableDesc = CatalogTable(
- identifier = visitTableIdentifier(name),
- tableType = CatalogTableType.VIEW,
- schema = schema,
- storage = CatalogStorageFormat.empty,
- properties = properties,
- viewOriginalText = sql,
- viewText = sql,
- comment = comment)
- CreateViewCommand(tableDesc, plan(query), allowExist, replace, isTemporary)
+ val originalText = source(query)
+ CreateViewCommand(
+ visitTableIdentifier(name),
+ userSpecifiedColumns,
+ comment,
+ properties,
+ Some(originalText),
+ plan(query),
+ allowExisting = allowExisting,
+ replace = replace,
+ isTemporary = isTemporary)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 16b333a402..312a1f691b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -31,7 +31,13 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
* Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
* depending on Hive meta-store.
*
- * @param tableDesc the catalog table
+ * @param name the name of this view.
+ * @param userSpecifiedColumns the output column names and optional comments specified by users,
+ * can be Nil if not specified.
+ * @param comment the comment of this view.
+ * @param properties the properties of this view.
+ * @param originalText the original SQL text of this view, can be None if this view is created via
+ * Dataset API.
* @param child the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
@@ -44,7 +50,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
* unless they are specified with full qualified table name with database prefix.
*/
case class CreateViewCommand(
- tableDesc: CatalogTable,
+ name: TableIdentifier,
+ userSpecifiedColumns: Seq[(String, Option[String])],
+ comment: Option[String],
+ properties: Map[String, String],
+ originalText: Option[String],
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
@@ -58,11 +68,9 @@ case class CreateViewCommand(
override def output: Seq[Attribute] = Seq.empty[Attribute]
- require(tableDesc.tableType == CatalogTableType.VIEW,
- "The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.")
if (!isTemporary) {
- require(tableDesc.viewText.isDefined,
- "The table to created with CREATE VIEW must have 'viewText'.")
+ require(originalText.isDefined,
+ "The table to created with CREATE VIEW must have 'originalText'.")
}
if (allowExisting && replace) {
@@ -76,8 +84,8 @@ case class CreateViewCommand(
}
// Temporary view names should NOT contain database prefix like "database.table"
- if (isTemporary && tableDesc.identifier.database.isDefined) {
- val database = tableDesc.identifier.database.get
+ if (isTemporary && name.database.isDefined) {
+ val database = name.database.get
throw new AnalysisException(
s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
}
@@ -88,23 +96,23 @@ case class CreateViewCommand(
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
- if (tableDesc.schema != Nil && tableDesc.schema.length != analyzedPlan.output.length) {
+ if (userSpecifiedColumns.nonEmpty &&
+ userSpecifiedColumns.length != analyzedPlan.output.length) {
throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
- s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).")
+ s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
}
val sessionState = sparkSession.sessionState
if (isTemporary) {
- createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan)
+ createTemporaryView(sparkSession, analyzedPlan)
} else {
// Adds default database for permanent table if it doesn't exist, so that tableExists()
// only check permanent tables.
- val database = tableDesc.identifier.database.getOrElse(
- sessionState.catalog.getCurrentDatabase)
- val tableIdentifier = tableDesc.identifier.copy(database = Option(database))
+ val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+ val qualifiedName = name.copy(database = Option(database))
- if (sessionState.catalog.tableExists(tableIdentifier)) {
+ if (sessionState.catalog.tableExists(qualifiedName)) {
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
@@ -115,7 +123,7 @@ case class CreateViewCommand(
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
throw new AnalysisException(
- s"View $tableIdentifier already exists. If you want to update the view definition, " +
+ s"View $qualifiedName already exists. If you want to update the view definition, " +
"please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
}
} else {
@@ -127,25 +135,20 @@ case class CreateViewCommand(
Seq.empty[Row]
}
- private def createTemporaryView(
- table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
-
- val sessionState = sparkSession.sessionState
- val catalog = sessionState.catalog
+ private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
+ val catalog = sparkSession.sessionState.catalog
// Projects column names to alias names
- val logicalPlan = {
- if (tableDesc.schema.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
- case (attr, col) => Alias(attr, col.name)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
+ val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, _)) => Alias(attr, colName)()
}
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
- catalog.createTempView(table.table, logicalPlan, replace)
+ catalog.createTempView(name.table, logicalPlan, replace)
}
/**
@@ -154,15 +157,14 @@ case class CreateViewCommand(
*/
private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
val viewSQL: String = {
- val logicalPlan =
- if (tableDesc.schema.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
- case (attr, col) => Alias(attr, col.name)()
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
+ val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, _)) => Alias(attr, colName)()
}
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
+ }
new SQLBuilder(logicalPlan).toSQL
}
@@ -176,21 +178,26 @@ case class CreateViewCommand(
"Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
}
- val viewSchema: Seq[CatalogColumn] = {
- if (tableDesc.schema.isEmpty) {
- analyzedPlan.output.map { a =>
- CatalogColumn(a.name, a.dataType.catalogString)
- }
- } else {
- analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
- CatalogColumn(col.name, a.dataType.catalogString, nullable = true, col.comment)
- }
+ val viewSchema = if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan.output.map { a =>
+ CatalogColumn(a.name, a.dataType.catalogString)
+ }
+ } else {
+ analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (a, (name, comment)) =>
+ CatalogColumn(name, a.dataType.catalogString, comment = comment)
}
}
- tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
+ CatalogTable(
+ identifier = name,
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = viewSchema,
+ properties = properties,
+ viewOriginalText = originalText,
+ viewText = Some(viewSQL),
+ comment = comment
+ )
}
-
- /** Escape backtick with double-backtick in column name and wrap it with backtick. */
- private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d308a31061..db970785a7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -171,8 +171,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
} else if (table.tableType == CatalogTableType.VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
- // because hive use things like `_c0` to build the expanded text
- // currently we cannot support view from "create view v1(c1) as ..."
case None =>
SubqueryAlias(table.identifier.table,
sparkSession.sessionState.sqlParser.parsePlan(viewText))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 9d99d960ac..a708434f5e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -37,7 +37,6 @@ class HiveDDLCommandSuite extends PlanTest {
parser.parsePlan(sql).collect {
case c: CreateTableCommand => (c.table, c.ifNotExists)
case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
- case c: CreateViewCommand => (c.tableDesc, c.allowExisting)
}.head
}
@@ -470,47 +469,30 @@ class HiveDDLCommandSuite extends PlanTest {
test("create view -- basic") {
val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
- val (desc, exists) = extractTableDesc(v1)
- assert(!exists)
- assert(desc.identifier.database.isEmpty)
- assert(desc.identifier.table == "view1")
- assert(desc.tableType == CatalogTableType.VIEW)
- assert(desc.storage.locationUri.isEmpty)
- assert(desc.schema == Seq.empty[CatalogColumn])
- assert(desc.viewText == Option("SELECT * FROM tab1"))
- assert(desc.viewOriginalText == Option("SELECT * FROM tab1"))
- assert(desc.storage.properties == Map())
- assert(desc.storage.inputFormat.isEmpty)
- assert(desc.storage.outputFormat.isEmpty)
- assert(desc.storage.serde.isEmpty)
- assert(desc.properties == Map())
+ val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
+ assert(!command.allowExisting)
+ assert(command.name.database.isEmpty)
+ assert(command.name.table == "view1")
+ assert(command.originalText == Some("SELECT * FROM tab1"))
+ assert(command.userSpecifiedColumns.isEmpty)
}
test("create view - full") {
val v1 =
"""
|CREATE OR REPLACE VIEW view1
- |(col1, col3)
+ |(col1, col3 COMMENT 'hello')
|COMMENT 'BLABLA'
|TBLPROPERTIES('prop1Key'="prop1Val")
|AS SELECT * FROM tab1
""".stripMargin
- val (desc, exists) = extractTableDesc(v1)
- assert(desc.identifier.database.isEmpty)
- assert(desc.identifier.table == "view1")
- assert(desc.tableType == CatalogTableType.VIEW)
- assert(desc.storage.locationUri.isEmpty)
- assert(desc.schema ==
- CatalogColumn("col1", null, nullable = true, None) ::
- CatalogColumn("col3", null, nullable = true, None) :: Nil)
- assert(desc.viewText == Option("SELECT * FROM tab1"))
- assert(desc.viewOriginalText == Option("SELECT * FROM tab1"))
- assert(desc.storage.properties == Map())
- assert(desc.storage.inputFormat.isEmpty)
- assert(desc.storage.outputFormat.isEmpty)
- assert(desc.storage.serde.isEmpty)
- assert(desc.properties == Map("prop1Key" -> "prop1Val"))
- assert(desc.comment == Option("BLABLA"))
+ val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
+ assert(command.name.database.isEmpty)
+ assert(command.name.table == "view1")
+ assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello")))
+ assert(command.originalText == Some("SELECT * FROM tab1"))
+ assert(command.properties == Map("prop1Key" -> "prop1Val"))
+ assert(command.comment == Some("BLABLA"))
}
test("create view -- partitioned view") {