aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2017-01-18 19:13:01 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-18 19:13:01 +0800
commitf85f29608de801d7cacc779a77c8edaed8124acf (patch)
tree6bb10b7ee796baacbd085dbd202d2372ec107bda /sql
parent17ce0b5b3f6a825fc77458bc8608cece1a6019c7 (diff)
downloadspark-f85f29608de801d7cacc779a77c8edaed8124acf.tar.gz
spark-f85f29608de801d7cacc779a77c8edaed8124acf.tar.bz2
spark-f85f29608de801d7cacc779a77c8edaed8124acf.zip
[SPARK-19024][SQL] Implement new approach to write a permanent view
## What changes were proposed in this pull request? On CREATE/ALTER a view, it's no longer needed to generate a SQL text string from the LogicalPlan, instead we store the SQL query text、the output column names of the query plan, and current database to CatalogTable. Permanent views created by this approach can be resolved by current view resolution approach. The main advantage includes: 1. If you update an underlying view, the current view also gets updated; 2. That gives us a change to get ride of SQL generation for operators. Major changes of this PR: 1. Generate the view-specific properties(e.g. view default database, view query output column names) during permanent view creation and store them as properties in the CatalogTable; 2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of SQL generation from them. ## How was this patch tested? Existing tests. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16613 from jiangxb1987/view-write-path.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala164
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala16
5 files changed, 146 insertions, 92 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2adccdd7bf..80d32822f5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -223,25 +223,6 @@ case class CatalogTable(
)
}
- /**
- * Insert/Update the view query output column names in `properties`.
- */
- def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
- val props = new mutable.HashMap[String, String]
- if (columns.nonEmpty) {
- props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
- columns.zipWithIndex.foreach { case (colName, index) =>
- props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
- }
- }
-
- // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
- // while `CatalogTable` should be serializable.
- copy(properties = properties.filterNot { case (key, _) =>
- key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
- } ++ props)
- }
-
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 154141bf83..3da4bcfe93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.execution.command
-import scala.util.control.NonFatal
+import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
/**
- * Create or replace a view with given query plan. This command will convert the query plan to
- * canonicalized SQL string, and store it as view text in metastore, if we need to create a
- * permanent view.
+ * Create or replace a view with given query plan. This command will generate some view-specific
+ * properties(e.g. view default database, view query output column names) and store them as
+ * properties in metastore, if we need to create a permanent view.
*
* @param name the name of this view.
* @param userSpecifiedColumns the output column names and optional comments specified by users,
@@ -75,8 +75,8 @@ object PersistedView extends ViewType
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
- * @param child the logical plan that represents the view; this is used to generate a canonicalized
- * version of the SQL that can be saved in the catalog.
+ * @param child the logical plan that represents the view; this is used to generate the logical
+ * plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
@@ -95,6 +95,8 @@ case class CreateViewCommand(
viewType: ViewType)
extends RunnableCommand {
+ import ViewHelper._
+
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
if (viewType == PersistedView) {
@@ -137,22 +139,12 @@ case class CreateViewCommand(
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
verifyTemporaryObjectsNotExists(sparkSession)
- val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
- analyzedPlan
- } else {
- val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
- case (attr, (colName, None)) => Alias(attr, colName)()
- case (attr, (colName, Some(colComment))) =>
- val meta = new MetadataBuilder().putString("comment", colComment).build()
- Alias(attr, colName)(explicitMetadata = Some(meta))
- }
- sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
- }
-
val catalog = sparkSession.sessionState.catalog
if (viewType == LocalTempView) {
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
+ val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
@@ -163,7 +155,7 @@ case class CreateViewCommand(
throw new AnalysisException(s"$name is not a view")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+ catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
@@ -173,7 +165,7 @@ case class CreateViewCommand(
}
} else {
// Create the view if it doesn't exist.
- catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
+ catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
Seq.empty[Row]
}
@@ -207,29 +199,44 @@ case class CreateViewCommand(
}
/**
- * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
- * SQL based on the analyzed plan, and also creates the proper schema for the view.
+ * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
+ * else return the analyzed plan directly.
*/
- private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
- val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
-
- // Validate the view SQL - make sure we can parse it and analyze it.
- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
- try {
- sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
- } catch {
- case NonFatal(e) =>
- throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
+ private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = {
+ if (userSpecifiedColumns.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
+ case (attr, (colName, None)) => Alias(attr, colName)()
+ case (attr, (colName, Some(colComment))) =>
+ val meta = new MetadataBuilder().putString("comment", colComment).build()
+ Alias(attr, colName)(explicitMetadata = Some(meta))
+ }
+ session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
+ }
+
+ /**
+ * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific
+ * properties(e.g. view default database, view query output column names) and store them as
+ * properties in the CatalogTable, and also creates the proper schema for the view.
+ */
+ private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
+ if (originalText.isEmpty) {
+ throw new AnalysisException(
+ "It is not allowed to create a persisted view from the Dataset API")
+ }
+
+ val newProperties = generateViewProperties(properties, session, analyzedPlan)
CatalogTable(
identifier = name,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = aliasedPlan.schema,
- properties = properties,
+ schema = aliasPlan(session, analyzedPlan).schema,
+ properties = newProperties,
viewOriginalText = originalText,
- viewText = Some(viewSQL),
+ viewText = originalText,
comment = comment
)
}
@@ -244,14 +251,16 @@ case class CreateViewCommand(
* @param name the name of this view.
* @param originalText the original SQL text of this view. Note that we can only alter a view by
* SQL API, which means we always have originalText.
- * @param query the logical plan that represents the view; this is used to generate a canonicalized
- * version of the SQL that can be saved in the catalog.
+ * @param query the logical plan that represents the view; this is used to generate the new view
+ * schema.
*/
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends RunnableCommand {
+ import ViewHelper._
+
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(session: SparkSession): Seq[Row] = {
@@ -275,21 +284,80 @@ case class AlterViewAsCommand(
throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
}
- val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
- // Validate the view SQL - make sure we can parse it and analyze it.
- // If we cannot analyze the generated query, there is probably a bug in SQL generation.
- try {
- session.sql(viewSQL).queryExecution.assertAnalyzed()
- } catch {
- case NonFatal(e) =>
- throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
- }
+ val newProperties = generateViewProperties(viewMeta.properties, session, analyzedPlan)
val updatedViewMeta = viewMeta.copy(
schema = analyzedPlan.schema,
+ properties = newProperties,
viewOriginalText = Some(originalText),
- viewText = Some(viewSQL))
+ viewText = Some(originalText))
session.sessionState.catalog.alterTable(updatedViewMeta)
}
}
+
+object ViewHelper {
+
+ import CatalogTable._
+
+ /**
+ * Generate the view default database in `properties`.
+ */
+ private def generateViewDefaultDatabase(databaseName: String): Map[String, String] = {
+ Map(VIEW_DEFAULT_DATABASE -> databaseName)
+ }
+
+ /**
+ * Generate the view query output column names in `properties`.
+ */
+ private def generateQueryColumnNames(columns: Seq[String]): Map[String, String] = {
+ val props = new mutable.HashMap[String, String]
+ if (columns.nonEmpty) {
+ props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
+ columns.zipWithIndex.foreach { case (colName, index) =>
+ props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
+ }
+ }
+ props.toMap
+ }
+
+ /**
+ * Remove the view query output column names in `properties`.
+ */
+ private def removeQueryColumnNames(properties: Map[String, String]): Map[String, String] = {
+ // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
+ // while `CatalogTable` should be serializable.
+ properties.filterNot { case (key, _) =>
+ key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
+ }
+ }
+
+ /**
+ * Generate the view properties in CatalogTable, including:
+ * 1. view default database that is used to provide the default database name on view resolution.
+ * 2. the output column names of the query that creates a view, this is used to map the output of
+ * the view child to the view output during view resolution.
+ *
+ * @param properties the `properties` in CatalogTable.
+ * @param session the spark session.
+ * @param analyzedPlan the analyzed logical plan that represents the child of a view.
+ * @return new view properties including view default database and query column names properties.
+ */
+ def generateViewProperties(
+ properties: Map[String, String],
+ session: SparkSession,
+ analyzedPlan: LogicalPlan): Map[String, String] = {
+ // Generate the query column names, throw an AnalysisException if there exists duplicate column
+ // names.
+ val queryOutput = analyzedPlan.schema.fieldNames
+ assert(queryOutput.distinct.size == queryOutput.size,
+ s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.")
+
+ // Generate the view default database name.
+ val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
+
+ removeQueryColumnNames(properties) ++
+ generateViewDefaultDatabase(viewDefaultDatabase) ++
+ generateQueryColumnNames(queryOutput)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 605dec4a1e..10607b8dc2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2501,11 +2501,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("should be able to resolve a persistent view") {
- withTable("t1") {
+ withTable("t1", "t2") {
withView("v1") {
sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS t1(a, b)")
- sql("CREATE VIEW `v1` AS SELECT * FROM t1")
- checkAnswer(spark.table("v1"), Row(1, 1))
+ sql("CREATE TABLE `t2` USING parquet AS SELECT * FROM VALUES('a', 2, 1.0) AS t2(d, e, f)")
+ sql("CREATE VIEW `v1`(x, y) AS SELECT * FROM t1")
+ checkAnswer(spark.table("v1").orderBy("x"), Row(1, 1))
+
+ sql("ALTER VIEW `v1` AS SELECT * FROM t2")
+ checkAnswer(spark.table("v1").orderBy("f"), Row("a", 2, 1.0))
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 882a184124..edef30823b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -381,28 +381,30 @@ class HiveDDLSuite
spark.range(10).write.saveAsTable(tabName)
val viewName = "view1"
withView(viewName) {
- val catalog = spark.sessionState.catalog
+ def checkProperties(expected: Map[String, String]): Boolean = {
+ val properties = spark.sessionState.catalog.getTableMetadata(TableIdentifier(viewName))
+ .properties
+ properties.filterNot { case (key, value) =>
+ Seq("transient_lastDdlTime", CatalogTable.VIEW_DEFAULT_DATABASE).contains(key) ||
+ key.startsWith(CatalogTable.VIEW_QUERY_OUTPUT_PREFIX)
+ } == expected
+ }
sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName")
- assert(catalog.getTableMetadata(TableIdentifier(viewName))
- .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+ checkProperties(Map())
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
- assert(catalog.getTableMetadata(TableIdentifier(viewName))
- .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
+ checkProperties(Map("p" -> "an"))
// no exception or message will be issued if we set it again
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')")
- assert(catalog.getTableMetadata(TableIdentifier(viewName))
- .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "an"))
+ checkProperties(Map("p" -> "an"))
// the value will be updated if we set the same key to a different value
sql(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'b')")
- assert(catalog.getTableMetadata(TableIdentifier(viewName))
- .properties.filter(_._1 != "transient_lastDdlTime") == Map("p" -> "b"))
+ checkProperties(Map("p" -> "b"))
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
- assert(catalog.getTableMetadata(TableIdentifier(viewName))
- .properties.filter(_._1 != "transient_lastDdlTime") == Map())
+ checkProperties(Map())
val message = intercept[AnalysisException] {
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
@@ -655,10 +657,7 @@ class HiveDDLSuite
Seq(
Row("# View Information", "", ""),
Row("View Original Text:", "SELECT * FROM tbl", ""),
- Row("View Expanded Text:",
- "SELECT `gen_attr_0` AS `a` FROM (SELECT `gen_attr_0` FROM " +
- "(SELECT `a` AS `gen_attr_0` FROM `default`.`tbl`) AS gen_subquery_0) AS tbl",
- "")
+ Row("View Expanded Text:", "SELECT * FROM tbl", "")
)
))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 9bc078dbb0..2658e2c91f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -222,13 +222,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("correctly parse CREATE VIEW statement") {
- sql(
- """CREATE VIEW IF NOT EXISTS
- |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
- |TBLPROPERTIES ('a' = 'b')
- |AS SELECT * FROM jt""".stripMargin)
- checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
- sql("DROP VIEW testView")
+ withView("testView") {
+ sql(
+ """CREATE VIEW IF NOT EXISTS
+ |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt
+ |""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ }
}
test("correctly parse CREATE TEMPORARY VIEW statement") {