aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-31 17:08:08 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-31 17:08:08 +0800
commit12fd0cd615683cd4c3e9094ce71a1e6fc33b8d6a (patch)
treef94ec14cb9c9b9c483de9e4b60c72e6e3920e7ba /sql/core/src
parentfa6347938fc1c72ddc03a5f3cd2e929b5694f0a6 (diff)
downloadspark-12fd0cd615683cd4c3e9094ce71a1e6fc33b8d6a.tar.gz
spark-12fd0cd615683cd4c3e9094ce71a1e6fc33b8d6a.tar.bz2
spark-12fd0cd615683cd4c3e9094ce71a1e6fc33b8d6a.zip
[SPARK-17180][SPARK-17309][SPARK-17323][SQL] create AlterViewAsCommand to handle ALTER VIEW AS
## What changes were proposed in this pull request? Currently we use `CreateViewCommand` to implement ALTER VIEW AS, which has 3 bugs: 1. SPARK-17180: ALTER VIEW AS should alter temp view if view name has no database part and temp view exists 2. SPARK-17309: ALTER VIEW AS should issue exception if view does not exist. 3. SPARK-17323: ALTER VIEW AS should keep the previous table properties, comment, create_time, etc. The root cause is, ALTER VIEW AS is quite different from CREATE VIEW, we need different code path to handle them. However, in `CreateViewCommand`, there is no way to distinguish ALTER VIEW AS and CREATE VIEW, we have to introduce extra flag. But instead of doing this, I think a more natural way is to separate the ALTER VIEW AS logic into a new command. ## How was this patch tested? new tests in SQLViewSuite Author: Wenchen Fan <wenchen@databricks.com> Closes #14874 from cloud-fan/minor4.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala63
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala77
2 files changed, 83 insertions, 57 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e32d30178e..656494d97d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1254,60 +1254,33 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ic.identifier.getText -> Option(ic.STRING).map(string)
}
}
- createView(
- ctx,
- ctx.tableIdentifier,
+
+ CreateViewCommand(
+ name = visitTableIdentifier(ctx.tableIdentifier),
+ userSpecifiedColumns = userSpecifiedColumns,
comment = Option(ctx.STRING).map(string),
- userSpecifiedColumns,
- ctx.query,
- Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
+ properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
+ originalText = Option(source(ctx.query)),
+ child = plan(ctx.query),
allowExisting = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
- isTemporary = ctx.TEMPORARY != null
- )
+ isTemporary = ctx.TEMPORARY != null)
}
}
/**
- * Alter the query of a view. This creates a [[CreateViewCommand]] command.
+ * Alter the query of a view. This creates a [[AlterViewAsCommand]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER VIEW [db_name.]view_name AS SELECT ...;
+ * }}}
*/
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
- createView(
- ctx,
- name = ctx.tableIdentifier,
- comment = None,
- userSpecifiedColumns = Seq.empty,
- query = ctx.query,
- properties = Map.empty,
- allowExisting = false,
- replace = true,
- isTemporary = false)
- }
-
- /**
- * Create a [[CreateViewCommand]] command.
- */
- private def createView(
- ctx: ParserRuleContext,
- name: TableIdentifierContext,
- comment: Option[String],
- userSpecifiedColumns: Seq[(String, Option[String])],
- query: QueryContext,
- properties: Map[String, String],
- allowExisting: Boolean,
- replace: Boolean,
- isTemporary: Boolean): LogicalPlan = {
- val originalText = source(query)
- CreateViewCommand(
- visitTableIdentifier(name),
- userSpecifiedColumns,
- comment,
- properties,
- Some(originalText),
- plan(query),
- allowExisting = allowExisting,
- replace = replace,
- isTemporary = isTemporary)
+ AlterViewAsCommand(
+ name = visitTableIdentifier(ctx.tableIdentifier),
+ originalText = source(ctx.query),
+ query = plan(ctx.query))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index f0d7b64c3c..15340ee921 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -22,15 +22,16 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.types.StructType
/**
- * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
- * depending on Hive meta-store.
+ * Create or replace a view with given query plan. This command will convert the query plan to
+ * canonicalized SQL string, and store it as view text in metastore, if we need to create a
+ * permanent view.
*
* @param name the name of this view.
* @param userSpecifiedColumns the output column names and optional comments specified by users,
@@ -64,11 +65,6 @@ case class CreateViewCommand(
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
- // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
- // different from Hive and may not work for some cases like create view on self join.
-
- override def output: Seq[Attribute] = Seq.empty[Attribute]
-
if (!isTemporary) {
require(originalText.isDefined,
"The table to created with CREATE VIEW must have 'originalText'.")
@@ -119,9 +115,7 @@ case class CreateViewCommand(
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
- throw new AnalysisException(
- "Existing table is not a view. The following is an existing table, " +
- s"not a view: $qualifiedName")
+ throw new AnalysisException(s"$qualifiedName is not a view")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
@@ -179,7 +173,7 @@ case class CreateViewCommand(
sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
} catch {
case NonFatal(e) =>
- throw new RuntimeException(s"Failed to analyze the canonicalized SQL: ${viewSQL}", e)
+ throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
}
val viewSchema = if (userSpecifiedColumns.isEmpty) {
@@ -202,3 +196,62 @@ case class CreateViewCommand(
)
}
}
+
+/**
+ * Alter a view with given query plan. If the view name contains database prefix, this command will
+ * alter a permanent view matching the given name, or throw an exception if view not exist. Else,
+ * this command will try to alter a temporary view first, if view not exist, try permanent view
+ * next, if still not exist, throw an exception.
+ *
+ * @param name the name of this view.
+ * @param originalText the original SQL text of this view. Note that we can only alter a view by
+ * SQL API, which means we always have originalText.
+ * @param query the logical plan that represents the view; this is used to generate a canonicalized
+ * version of the SQL that can be saved in the catalog.
+ */
+case class AlterViewAsCommand(
+ name: TableIdentifier,
+ originalText: String,
+ query: LogicalPlan) extends RunnableCommand {
+
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
+ override def run(session: SparkSession): Seq[Row] = {
+ // If the plan cannot be analyzed, throw an exception and don't proceed.
+ val qe = session.sessionState.executePlan(query)
+ qe.assertAnalyzed()
+ val analyzedPlan = qe.analyzed
+
+ if (session.sessionState.catalog.isTemporaryTable(name)) {
+ session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true)
+ } else {
+ alterPermanentView(session, analyzedPlan)
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def alterPermanentView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
+ val viewMeta = session.sessionState.catalog.getTableMetadata(name)
+ if (viewMeta.tableType != CatalogTableType.VIEW) {
+ throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
+ }
+
+ val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
+ // Validate the view SQL - make sure we can parse it and analyze it.
+ // If we cannot analyze the generated query, there is probably a bug in SQL generation.
+ try {
+ session.sql(viewSQL).queryExecution.assertAnalyzed()
+ } catch {
+ case NonFatal(e) =>
+ throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
+ }
+
+ val updatedViewMeta = viewMeta.copy(
+ schema = analyzedPlan.schema,
+ viewOriginalText = Some(originalText),
+ viewText = Some(viewSQL))
+
+ session.sessionState.catalog.alterTable(updatedViewMeta)
+ }
+}