From 12fd0cd615683cd4c3e9094ce71a1e6fc33b8d6a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 31 Aug 2016 17:08:08 +0800 Subject: [SPARK-17180][SPARK-17309][SPARK-17323][SQL] create AlterViewAsCommand to handle ALTER VIEW AS ## What changes were proposed in this pull request? Currently we use `CreateViewCommand` to implement ALTER VIEW AS, which has 3 bugs: 1. SPARK-17180: ALTER VIEW AS should alter temp view if view name has no database part and temp view exists 2. SPARK-17309: ALTER VIEW AS should issue exception if view does not exist. 3. SPARK-17323: ALTER VIEW AS should keep the previous table properties, comment, create_time, etc. The root cause is, ALTER VIEW AS is quite different from CREATE VIEW, we need different code path to handle them. However, in `CreateViewCommand`, there is no way to distinguish ALTER VIEW AS and CREATE VIEW, we have to introduce extra flag. But instead of doing this, I think a more natural way is to separate the ALTER VIEW AS logic into a new command. ## How was this patch tested? new tests in SQLViewSuite Author: Wenchen Fan Closes #14874 from cloud-fan/minor4. --- .../spark/sql/execution/SparkSqlParser.scala | 63 +++++------------- .../apache/spark/sql/execution/command/views.scala | 77 ++++++++++++++++++---- .../spark/sql/hive/execution/SQLViewSuite.scala | 77 +++++++++++++++++++++- 3 files changed, 157 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e32d30178e..656494d97d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1254,60 +1254,33 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ic.identifier.getText -> Option(ic.STRING).map(string) } } - createView( - ctx, - ctx.tableIdentifier, + + CreateViewCommand( + name = visitTableIdentifier(ctx.tableIdentifier), + userSpecifiedColumns = userSpecifiedColumns, comment = Option(ctx.STRING).map(string), - userSpecifiedColumns, - ctx.query, - Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty), + properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty), + originalText = Option(source(ctx.query)), + child = plan(ctx.query), allowExisting = ctx.EXISTS != null, replace = ctx.REPLACE != null, - isTemporary = ctx.TEMPORARY != null - ) + isTemporary = ctx.TEMPORARY != null) } } /** - * Alter the query of a view. This creates a [[CreateViewCommand]] command. + * Alter the query of a view. This creates a [[AlterViewAsCommand]] command. + * + * For example: + * {{{ + * ALTER VIEW [db_name.]view_name AS SELECT ...; + * }}} */ override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { - createView( - ctx, - name = ctx.tableIdentifier, - comment = None, - userSpecifiedColumns = Seq.empty, - query = ctx.query, - properties = Map.empty, - allowExisting = false, - replace = true, - isTemporary = false) - } - - /** - * Create a [[CreateViewCommand]] command. - */ - private def createView( - ctx: ParserRuleContext, - name: TableIdentifierContext, - comment: Option[String], - userSpecifiedColumns: Seq[(String, Option[String])], - query: QueryContext, - properties: Map[String, String], - allowExisting: Boolean, - replace: Boolean, - isTemporary: Boolean): LogicalPlan = { - val originalText = source(query) - CreateViewCommand( - visitTableIdentifier(name), - userSpecifiedColumns, - comment, - properties, - Some(originalText), - plan(query), - allowExisting = allowExisting, - replace = replace, - isTemporary = isTemporary) + AlterViewAsCommand( + name = visitTableIdentifier(ctx.tableIdentifier), + originalText = source(ctx.query), + query = plan(ctx.query)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f0d7b64c3c..15340ee921 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -22,15 +22,16 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} +import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.types.StructType /** - * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of - * depending on Hive meta-store. + * Create or replace a view with given query plan. This command will convert the query plan to + * canonicalized SQL string, and store it as view text in metastore, if we need to create a + * permanent view. * * @param name the name of this view. * @param userSpecifiedColumns the output column names and optional comments specified by users, @@ -64,11 +65,6 @@ case class CreateViewCommand( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) - // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is - // different from Hive and may not work for some cases like create view on self join. - - override def output: Seq[Attribute] = Seq.empty[Attribute] - if (!isTemporary) { require(originalText.isDefined, "The table to created with CREATE VIEW must have 'originalText'.") @@ -119,9 +115,7 @@ case class CreateViewCommand( // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. } else if (tableMetadata.tableType != CatalogTableType.VIEW) { - throw new AnalysisException( - "Existing table is not a view. The following is an existing table, " + - s"not a view: $qualifiedName") + throw new AnalysisException(s"$qualifiedName is not a view") } else if (replace) { // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) @@ -179,7 +173,7 @@ case class CreateViewCommand( sparkSession.sql(viewSQL).queryExecution.assertAnalyzed() } catch { case NonFatal(e) => - throw new RuntimeException(s"Failed to analyze the canonicalized SQL: ${viewSQL}", e) + throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) } val viewSchema = if (userSpecifiedColumns.isEmpty) { @@ -202,3 +196,62 @@ case class CreateViewCommand( ) } } + +/** + * Alter a view with given query plan. If the view name contains database prefix, this command will + * alter a permanent view matching the given name, or throw an exception if view not exist. Else, + * this command will try to alter a temporary view first, if view not exist, try permanent view + * next, if still not exist, throw an exception. + * + * @param name the name of this view. + * @param originalText the original SQL text of this view. Note that we can only alter a view by + * SQL API, which means we always have originalText. + * @param query the logical plan that represents the view; this is used to generate a canonicalized + * version of the SQL that can be saved in the catalog. + */ +case class AlterViewAsCommand( + name: TableIdentifier, + originalText: String, + query: LogicalPlan) extends RunnableCommand { + + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + + override def run(session: SparkSession): Seq[Row] = { + // If the plan cannot be analyzed, throw an exception and don't proceed. + val qe = session.sessionState.executePlan(query) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + if (session.sessionState.catalog.isTemporaryTable(name)) { + session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true) + } else { + alterPermanentView(session, analyzedPlan) + } + + Seq.empty[Row] + } + + private def alterPermanentView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = { + val viewMeta = session.sessionState.catalog.getTableMetadata(name) + if (viewMeta.tableType != CatalogTableType.VIEW) { + throw new AnalysisException(s"${viewMeta.identifier} is not a view.") + } + + val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL + // Validate the view SQL - make sure we can parse it and analyze it. + // If we cannot analyze the generated query, there is probably a bug in SQL generation. + try { + session.sql(viewSQL).queryExecution.assertAnalyzed() + } catch { + case NonFatal(e) => + throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) + } + + val updatedViewMeta = viewMeta.copy( + schema = analyzedPlan.schema, + viewOriginalText = Some(originalText), + viewText = Some(viewSQL)) + + session.sessionState.catalog.alterTable(updatedViewMeta) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 6a80664417..bc999d4724 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -60,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { var e = intercept[AnalysisException] { sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + assert(e.contains("`default`.`tab1` is not a view")) e = intercept[AnalysisException] { sql("CREATE VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + assert(e.contains("`default`.`tab1` is not a view")) e = intercept[AnalysisException] { sql("ALTER VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + assert(e.contains("`default`.`tab1` is not a view")) } } @@ -274,6 +276,75 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("should not allow ALTER VIEW AS when the view does not exist") { + intercept[NoSuchTableException]( + sql("ALTER VIEW testView AS SELECT 1, 2") + ) + + intercept[NoSuchTableException]( + sql("ALTER VIEW default.testView AS SELECT 1, 2") + ) + } + + test("ALTER VIEW AS should try to alter temp view first if view name has no database part") { + withView("test_view") { + withTempView("test_view") { + sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") + sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") + + sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") + + // The temporary view should be updated. + checkAnswer(spark.table("test_view"), Row(3, 4)) + + // The permanent view should stay same. + checkAnswer(spark.table("default.test_view"), Row(1, 2)) + } + } + } + + test("ALTER VIEW AS should alter permanent view if view name has database part") { + withView("test_view") { + withTempView("test_view") { + sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") + sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") + + sql("ALTER VIEW default.test_view AS SELECT 3 AS i, 4 AS j") + + // The temporary view should stay same. + checkAnswer(spark.table("test_view"), Row(1, 2)) + + // The permanent view should be updated. + checkAnswer(spark.table("default.test_view"), Row(3, 4)) + } + } + } + + test("ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.") { + withView("test_view") { + sql( + """ + |CREATE VIEW test_view + |COMMENT 'test' + |TBLPROPERTIES ('key' = 'a') + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + + val catalog = spark.sessionState.catalog + val viewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) + assert(viewMeta.comment == Some("test")) + assert(viewMeta.properties("key") == "a") + + sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") + val updatedViewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) + assert(updatedViewMeta.comment == Some("test")) + assert(updatedViewMeta.properties("key") == "a") + assert(updatedViewMeta.createTime == viewMeta.createTime) + // The view should be updated. + checkAnswer(spark.table("test_view"), Row(3, 4)) + } + } + test("create hive view for json table") { // json table is not hive-compatible, make sure the new flag fix it. withView("testView") { -- cgit v1.2.3