aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSean Zhong <clockfly@gmail.com>2016-05-04 18:27:25 -0700
committerYin Huai <yhuai@databricks.com>2016-05-04 18:27:25 -0700
commit8fb1463d6a832f187f323d97635e5bec1e93c6f3 (patch)
treed3d0e3645f9ae5d0cad418c702660ca25952868e /sql
parentfa79d346e1a79ceda6ccd20e74eb850e769556ea (diff)
downloadspark-8fb1463d6a832f187f323d97635e5bec1e93c6f3.tar.gz
spark-8fb1463d6a832f187f323d97635e5bec1e93c6f3.tar.bz2
spark-8fb1463d6a832f187f323d97635e5bec1e93c6f3.zip
[SPARK-6339][SQL] Supports CREATE TEMPORARY VIEW tableIdentifier AS query
## What changes were proposed in this pull request? This PR support new SQL syntax CREATE TEMPORARY VIEW. Like: ``` CREATE TEMPORARY VIEW viewName AS SELECT * from xx CREATE OR REPLACE TEMPORARY VIEW viewName AS SELECT * from xx CREATE TEMPORARY VIEW viewName (c1 COMMENT 'blabla', c2 COMMENT 'blabla') AS SELECT * FROM xx ``` ## How was this patch tested? Unit tests. Author: Sean Zhong <clockfly@gmail.com> Closes #12872 from clockfly/spark-6399.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala86
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala110
5 files changed, 175 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 273ad92891..ee27d69ab3 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -84,7 +84,7 @@ statement
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
- | CREATE (OR REPLACE)? VIEW (IF NOT EXISTS)? tableIdentifier
+ | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 60388df596..146e036bb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -935,7 +935,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*
* For example:
* {{{
- * CREATE VIEW [IF NOT EXISTS] [db_name.]view_name
+ * CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
* [(column_name [COMMENT column_comment], ...) ]
* [COMMENT view_comment]
* [TBLPROPERTIES (property_name = property_value, ...)]
@@ -958,7 +958,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.query,
Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
ctx.EXISTS != null,
- ctx.REPLACE != null
+ ctx.REPLACE != null,
+ ctx.TEMPORARY != null
)
}
}
@@ -975,7 +976,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.query,
Map.empty,
allowExist = false,
- replace = true)
+ replace = true,
+ isTemporary = false)
}
/**
@@ -989,7 +991,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
query: QueryContext,
properties: Map[String, String],
allowExist: Boolean,
- replace: Boolean): LogicalPlan = {
+ replace: Boolean,
+ isTemporary: Boolean): LogicalPlan = {
val sql = Option(source(query))
val tableDesc = CatalogTable(
identifier = visitTableIdentifier(name),
@@ -1000,7 +1003,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
viewOriginalText = sql,
viewText = sql,
comment = comment)
- CreateViewCommand(tableDesc, plan(query), allowExist, replace, command(ctx))
+ CreateViewCommand(tableDesc, plan(query), allowExist, replace, isTemporary, command(ctx))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 0f656ef53e..70ce5c8429 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.SQLBuilder
+import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -37,6 +37,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
+ * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
+ * at the end of current Spark session. Existing permanent relations with the same
+ * name are not visible to the current session while the temporary view exists,
+ * unless they are specified with full qualified table name with database prefix.
* @param sql the original sql
*/
case class CreateViewCommand(
@@ -44,6 +48,7 @@ case class CreateViewCommand(
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
+ isTemporary: Boolean,
sql: String)
extends RunnableCommand {
@@ -55,12 +60,23 @@ case class CreateViewCommand(
require(tableDesc.tableType == CatalogTableType.VIEW)
require(tableDesc.viewText.isDefined)
- private val tableIdentifier = tableDesc.identifier
-
if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}
+ // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
+ if (allowExisting && isTemporary) {
+ throw new AnalysisException(
+ "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
+ }
+
+ // Temporary view names should NOT contain database prefix like "database.table"
+ if (isTemporary && tableDesc.identifier.database.isDefined) {
+ val database = tableDesc.identifier.database.get
+ throw new AnalysisException(
+ s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.")
+ }
+
override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.executePlan(child)
@@ -70,29 +86,59 @@ case class CreateViewCommand(
require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
val sessionState = sparkSession.sessionState
- if (sessionState.catalog.tableExists(tableIdentifier)) {
- if (allowExisting) {
- // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
- // already exists.
- } else if (replace) {
- // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
+ if (isTemporary) {
+ createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan)
+ } else {
+ // Adds default database for permanent table if it doesn't exist, so that tableExists()
+ // only check permanent tables.
+ val database = tableDesc.identifier.database.getOrElse(
+ sessionState.catalog.getCurrentDatabase)
+ val tableIdentifier = tableDesc.identifier.copy(database = Option(database))
+
+ if (sessionState.catalog.tableExists(tableIdentifier)) {
+ if (allowExisting) {
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+ } else if (replace) {
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
+ } else {
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
+ throw new AnalysisException(
+ s"View $tableIdentifier already exists. If you want to update the view definition, " +
+ "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+ }
} else {
- // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
- // exists.
- throw new AnalysisException(s"View $tableIdentifier already exists. " +
- "If you want to update the view definition, please use ALTER VIEW AS or " +
- "CREATE OR REPLACE VIEW AS")
+ // Create the view if it doesn't exist.
+ sessionState.catalog.createTable(
+ prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
- } else {
- // Create the view if it doesn't exist.
- sessionState.catalog.createTable(
- prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
-
Seq.empty[Row]
}
+ private def createTemporaryView(
+ table: TableIdentifier, sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = {
+
+ val sessionState = sparkSession.sessionState
+ val catalog = sessionState.catalog
+
+ // Projects column names to alias names
+ val logicalPlan = {
+ if (tableDesc.schema.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+ }
+ }
+
+ catalog.createTempTable(table.table, logicalPlan, replace)
+ }
+
/**
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
* SQL based on the analyzed plan, and also creates the proper schema for the view.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index c4ebc604dc..3d74235dc5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest {
parser.parsePlan(sql).collect {
case CreateTable(desc, allowExisting) => (desc, allowExisting)
case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
- case CreateViewCommand(desc, _, allowExisting, _, _) => (desc, allowExisting)
+ case CreateViewCommand(desc, _, allowExisting, _, _, _) => (desc, allowExisting)
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 5184847050..72f9fba13d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -37,11 +37,21 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sqlContext.sql(s"DROP TABLE IF EXISTS jt")
}
- test("nested views") {
- withView("jtv1", "jtv2") {
- sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
- sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
+ test("nested views (interleaved with temporary views)") {
+ withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
+ sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
+ sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
+
+ // Checks temporary views
+ sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
+ sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
+ checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
+
+ // Checks interleaved temporary view and normal view
+ sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
+ sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
+ checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
}
}
@@ -57,6 +67,33 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("error handling: fail if the temp view name contains the database prefix") {
+ // Fully qualified table name like "database.table" is not allowed for temporary view
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt")
+ }
+ assert(e.message.contains("It is not allowed to add database prefix"))
+ }
+
+ test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") {
+ val e = intercept[AnalysisException] {
+ sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt")
+ }
+ assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS"))
+ }
+
+ test("error handling: fail if the temp view sql itself is invalid") {
+ // A table that does not exist for temporary view
+ intercept[AnalysisException] {
+ sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345")
+ }
+
+ // A column that does not exist, for temporary view
+ intercept[AnalysisException] {
+ sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt")
+ }
+ }
+
test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
sql(
@@ -69,18 +106,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("correctly parse CREATE TEMPORARY VIEW statement") {
+ withView("testView") {
+ sql(
+ """CREATE TEMPORARY VIEW
+ |testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt
+ |""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+
+ test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") {
+ withView("testView") {
+ sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+ }
+
+ assert(e.message.contains("Temporary table") && e.message.contains("already exists"))
+ }
+ }
+
+ test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") {
+ withView("testView", "default.testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+ }
+ }
+
+ test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") {
+ withView("testView", "default.testView") {
+ sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ }
+ }
+
test("correctly handle CREATE VIEW IF NOT EXISTS") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt2") {
- sql("CREATE VIEW testView AS SELECT id FROM jt")
+ withView("testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
- // make sure our view doesn't change.
+ // make sure our view doesn't change.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ }
+ }
+ }
+ }
+
+ test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") {
+ withTable("jt2") {
+ withView("testView") {
+ sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- sql("DROP VIEW testView")
+
+ sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
}
}
@@ -215,5 +304,4 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}
-
}