diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-06-16 10:01:59 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-16 10:01:59 -0700 |
commit | 6451cf9270b55465d8ecea4c4031329a1058561a (patch) | |
tree | a85d5758f2be2ab562f75ae04ec3096d8f950120 /sql/core | |
parent | 7c6c6926376c93acc42dd56a399d816f4838f28c (diff) | |
download | spark-6451cf9270b55465d8ecea4c4031329a1058561a.tar.gz spark-6451cf9270b55465d8ecea4c4031329a1058561a.tar.bz2 spark-6451cf9270b55465d8ecea4c4031329a1058561a.zip |
[SPARK-15862][SQL] Better Error Message When Having Database Name in CACHE TABLE AS SELECT
#### What changes were proposed in this pull request?
~~If the temp table already exists, we should not silently replace it when doing `CACHE TABLE AS SELECT`. This is inconsistent with the behavior of `CREAT VIEW` or `CREATE TABLE`. This PR is to fix this silent drop.~~
~~Maybe, we also can introduce new syntax for replacing the existing one. For example, in Hive, to replace a view, the syntax should be like `ALTER VIEW AS SELECT` or `CREATE OR REPLACE VIEW AS SELECT`~~
The table name in `CACHE TABLE AS SELECT` should NOT contain database prefix like "database.table". Thus, this PR captures this in Parser and outputs a better error message, instead of reporting the view already exists.
In addition, refactoring the `Parser` to generate table identifiers instead of returning the table name string.
#### How was this patch tested?
- Added a test case for caching and uncaching qualified table names
- Fixed a few test cases that do not drop temp table at the end
- Added the related test case for the issue resolved in this PR
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes #13572 from gatorsmile/cacheTableAsSelect.
Diffstat (limited to 'sql/core')
4 files changed, 58 insertions, 42 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a0508ad601..154c25adfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -221,14 +221,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { */ override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { val query = Option(ctx.query).map(plan) - CacheTableCommand(ctx.identifier.getText, query, ctx.LAZY != null) + val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + if (query.isDefined && tableIdent.database.isDefined) { + val database = tableIdent.database.get + throw new ParseException(s"It is not allowed to add database prefix `$database` to " + + s"the table name in CACHE TABLE AS SELECT", ctx) + } + CacheTableCommand(tableIdent, query, ctx.LAZY != null) } /** * Create an [[UncacheTableCommand]] logical plan. */ override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTableCommand(ctx.identifier.getText) + UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 5332366d24..697e2ff211 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -18,15 +18,17 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan case class CacheTableCommand( - tableName: String, - plan: Option[LogicalPlan], - isLazy: Boolean) - extends RunnableCommand { + tableIdent: TableIdentifier, + plan: Option[LogicalPlan], + isLazy: Boolean) extends RunnableCommand { + require(plan.isEmpty || tableIdent.database.isEmpty, + "Database name is not allowed in CACHE TABLE AS SELECT") override protected def innerChildren: Seq[QueryPlan[_]] = { plan.toSeq @@ -34,13 +36,13 @@ case class CacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => - Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) + Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) } - sparkSession.catalog.cacheTable(tableName) + sparkSession.catalog.cacheTable(tableIdent.quotedString) if (!isLazy) { // Performs eager caching - sparkSession.table(tableName).count() + sparkSession.table(tableIdent).count() } Seq.empty[Row] @@ -50,10 +52,10 @@ case class CacheTableCommand( } -case class UncacheTableCommand(tableName: String) extends RunnableCommand { +case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.catalog.uncacheTable(tableName) + sparkSession.catalog.uncacheTable(tableIdent.quotedString) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index b56c200e9e..088f684365 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -79,7 +79,7 @@ case class CreateViewCommand( if (isTemporary && tableDesc.identifier.database.isDefined) { val database = tableDesc.identifier.database.get throw new AnalysisException( - s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.") + s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") } override def run(sparkSession: SparkSession): Seq[Row] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d7df18ae1c..6f6abfa93c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -73,11 +73,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("cache temp table") { - testData.select('key).createOrReplaceTempView("tempTable") - assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) - spark.catalog.cacheTable("tempTable") - assertCached(sql("SELECT COUNT(*) FROM tempTable")) - spark.catalog.uncacheTable("tempTable") + withTempTable("tempTable") { + testData.select('key).createOrReplaceTempView("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) + spark.catalog.cacheTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + spark.catalog.uncacheTable("tempTable") + } } test("unpersist an uncached table will not raise exception") { @@ -95,9 +97,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("cache table as select") { - sql("CACHE TABLE tempTable AS SELECT key FROM testData") - assertCached(sql("SELECT COUNT(*) FROM tempTable")) - spark.catalog.uncacheTable("tempTable") + withTempTable("tempTable") { + sql("CACHE TABLE tempTable AS SELECT key FROM testData") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + spark.catalog.uncacheTable("tempTable") + } } test("uncaching temp table") { @@ -223,32 +227,36 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { - sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - assertCached(spark.table("testCacheTable")) - - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") - - spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") + assertCached(spark.table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + + spark.catalog.uncacheTable("testCacheTable") + eventually(timeout(10 seconds)) { + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } } test("CACHE TABLE tableName AS SELECT ...") { - sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") - assertCached(spark.table("testCacheTable")) - - val rddId = rddIdOf("testCacheTable") - assert( - isMaterialized(rddId), - "Eagerly cached in-memory table should have already been materialized") - - spark.catalog.uncacheTable("testCacheTable") - eventually(timeout(10 seconds)) { - assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + withTempTable("testCacheTable") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") + assertCached(spark.table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + + spark.catalog.uncacheTable("testCacheTable") + eventually(timeout(10 seconds)) { + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } } } |