aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-06-16 10:01:59 -0700
committerCheng Lian <lian@databricks.com>2016-06-16 10:01:59 -0700
commit6451cf9270b55465d8ecea4c4031329a1058561a (patch)
treea85d5758f2be2ab562f75ae04ec3096d8f950120 /sql
parent7c6c6926376c93acc42dd56a399d816f4838f28c (diff)
downloadspark-6451cf9270b55465d8ecea4c4031329a1058561a.tar.gz
spark-6451cf9270b55465d8ecea4c4031329a1058561a.tar.bz2
spark-6451cf9270b55465d8ecea4c4031329a1058561a.zip
[SPARK-15862][SQL] Better Error Message When Having Database Name in CACHE TABLE AS SELECT
#### What changes were proposed in this pull request? ~~If the temp table already exists, we should not silently replace it when doing `CACHE TABLE AS SELECT`. This is inconsistent with the behavior of `CREAT VIEW` or `CREATE TABLE`. This PR is to fix this silent drop.~~ ~~Maybe, we also can introduce new syntax for replacing the existing one. For example, in Hive, to replace a view, the syntax should be like `ALTER VIEW AS SELECT` or `CREATE OR REPLACE VIEW AS SELECT`~~ The table name in `CACHE TABLE AS SELECT` should NOT contain database prefix like "database.table". Thus, this PR captures this in Parser and outputs a better error message, instead of reporting the view already exists. In addition, refactoring the `Parser` to generate table identifiers instead of returning the table name string. #### How was this patch tested? - Added a test case for caching and uncaching qualified table names - Fixed a few test cases that do not drop temp table at the end - Added the related test case for the issue resolved in this PR Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13572 from gatorsmile/cacheTableAsSelect.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala68
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala79
7 files changed, 121 insertions, 64 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 044f910388..b60319668c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -114,8 +114,8 @@ statement
tableIdentifier partitionSpec? describeColName? #describeTable
| REFRESH TABLE tableIdentifier #refreshTable
| REFRESH .*? #refreshResource
- | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable
- | UNCACHE TABLE identifier #uncacheTable
+ | CACHE LAZY? TABLE tableIdentifier (AS? query)? #cacheTable
+ | UNCACHE TABLE tableIdentifier #uncacheTable
| CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a0508ad601..154c25adfa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -221,14 +221,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
val query = Option(ctx.query).map(plan)
- CacheTableCommand(ctx.identifier.getText, query, ctx.LAZY != null)
+ val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
+ if (query.isDefined && tableIdent.database.isDefined) {
+ val database = tableIdent.database.get
+ throw new ParseException(s"It is not allowed to add database prefix `$database` to " +
+ s"the table name in CACHE TABLE AS SELECT", ctx)
+ }
+ CacheTableCommand(tableIdent, query, ctx.LAZY != null)
}
/**
* Create an [[UncacheTableCommand]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
- UncacheTableCommand(ctx.identifier.getText)
+ UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 5332366d24..697e2ff211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -18,15 +18,17 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
case class CacheTableCommand(
- tableName: String,
- plan: Option[LogicalPlan],
- isLazy: Boolean)
- extends RunnableCommand {
+ tableIdent: TableIdentifier,
+ plan: Option[LogicalPlan],
+ isLazy: Boolean) extends RunnableCommand {
+ require(plan.isEmpty || tableIdent.database.isEmpty,
+ "Database name is not allowed in CACHE TABLE AS SELECT")
override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq
@@ -34,13 +36,13 @@ case class CacheTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
- Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName)
+ Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
}
- sparkSession.catalog.cacheTable(tableName)
+ sparkSession.catalog.cacheTable(tableIdent.quotedString)
if (!isLazy) {
// Performs eager caching
- sparkSession.table(tableName).count()
+ sparkSession.table(tableIdent).count()
}
Seq.empty[Row]
@@ -50,10 +52,10 @@ case class CacheTableCommand(
}
-case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+case class UncacheTableCommand(tableIdent: TableIdentifier) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.catalog.uncacheTable(tableName)
+ sparkSession.catalog.uncacheTable(tableIdent.quotedString)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index b56c200e9e..088f684365 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -79,7 +79,7 @@ case class CreateViewCommand(
if (isTemporary && tableDesc.identifier.database.isDefined) {
val database = tableDesc.identifier.database.get
throw new AnalysisException(
- s"It is not allowed to add database prefix ${database} for the TEMPORARY view name.")
+ s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
}
override def run(sparkSession: SparkSession): Seq[Row] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index d7df18ae1c..6f6abfa93c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -73,11 +73,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
test("cache temp table") {
- testData.select('key).createOrReplaceTempView("tempTable")
- assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
- spark.catalog.cacheTable("tempTable")
- assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- spark.catalog.uncacheTable("tempTable")
+ withTempTable("tempTable") {
+ testData.select('key).createOrReplaceTempView("tempTable")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
+ spark.catalog.cacheTable("tempTable")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+ spark.catalog.uncacheTable("tempTable")
+ }
}
test("unpersist an uncached table will not raise exception") {
@@ -95,9 +97,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
test("cache table as select") {
- sql("CACHE TABLE tempTable AS SELECT key FROM testData")
- assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- spark.catalog.uncacheTable("tempTable")
+ withTempTable("tempTable") {
+ sql("CACHE TABLE tempTable AS SELECT key FROM testData")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+ spark.catalog.uncacheTable("tempTable")
+ }
}
test("uncaching temp table") {
@@ -223,32 +227,36 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
- sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- assertCached(spark.table("testCacheTable"))
-
- val rddId = rddIdOf("testCacheTable")
- assert(
- isMaterialized(rddId),
- "Eagerly cached in-memory table should have already been materialized")
-
- spark.catalog.uncacheTable("testCacheTable")
- eventually(timeout(10 seconds)) {
- assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ withTempTable("testCacheTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+ assertCached(spark.table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
+ spark.catalog.uncacheTable("testCacheTable")
+ eventually(timeout(10 seconds)) {
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
}
}
test("CACHE TABLE tableName AS SELECT ...") {
- sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
- assertCached(spark.table("testCacheTable"))
-
- val rddId = rddIdOf("testCacheTable")
- assert(
- isMaterialized(rddId),
- "Eagerly cached in-memory table should have already been materialized")
-
- spark.catalog.uncacheTable("testCacheTable")
- eventually(timeout(10 seconds)) {
- assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ withTempTable("testCacheTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
+ assertCached(spark.table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
+ spark.catalog.uncacheTable("testCacheTable")
+ eventually(timeout(10 seconds)) {
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d1d5e3f7b..b45be0251d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -449,7 +449,7 @@ private[hive] class TestHiveQueryExecution(
override lazy val analyzed: LogicalPlan = {
val describedTables = logical match {
- case CacheTableCommand(tbl, _, _) => tbl :: Nil
+ case CacheTableCommand(tbl, _, _) => tbl.table :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index e35a71917f..f7c3e347b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -128,29 +129,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
- sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
- assertCached(table("testCacheTable"))
+ withTempTable("testCacheTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
+ assertCached(table("testCacheTable"))
- val rddId = rddIdOf("testCacheTable")
- assert(
- isMaterialized(rddId),
- "Eagerly cached in-memory table should have already been materialized")
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
- uncacheTable("testCacheTable")
- assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
}
test("CACHE TABLE tableName AS SELECT ...") {
- sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
- assertCached(table("testCacheTable"))
+ withTempTable("testCacheTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
+ assertCached(table("testCacheTable"))
- val rddId = rddIdOf("testCacheTable")
- assert(
- isMaterialized(rddId),
- "Eagerly cached in-memory table should have already been materialized")
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
- uncacheTable("testCacheTable")
- assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
}
test("CACHE LAZY TABLE tableName") {
@@ -172,9 +177,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
test("CACHE TABLE with Hive UDF") {
- sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
- assertCached(table("udfTest"))
- uncacheTable("udfTest")
+ withTempTable("udfTest") {
+ sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
+ assertCached(table("udfTest"))
+ uncacheTable("udfTest")
+ }
}
test("REFRESH TABLE also needs to recache the data (data source tables)") {
@@ -267,6 +274,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
Utils.deleteRecursively(tempPath)
}
+ test("Cache/Uncache Qualified Tables") {
+ withTempDatabase { db =>
+ withTempTable("cachedTable") {
+ sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1")
+ sql(s"CACHE TABLE $db.cachedTable")
+ assertCached(spark.table(s"$db.cachedTable"))
+
+ activateDatabase(db) {
+ assertCached(spark.table("cachedTable"))
+ sql("UNCACHE TABLE cachedTable")
+ assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should not be cached")
+ sql(s"CACHE TABLE cachedTable")
+ assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached")
+ }
+
+ sql(s"UNCACHE TABLE $db.cachedTable")
+ assert(!spark.catalog.isCached(s"$db.cachedTable"),
+ "Table 'cachedTable' should not be cached")
+ }
+ }
+ }
+
+ test("Cache Table As Select - having database name") {
+ withTempDatabase { db =>
+ withTempTable("cachedTable") {
+ val e = intercept[ParseException] {
+ sql(s"CACHE TABLE $db.cachedTable AS SELECT 1")
+ }.getMessage
+ assert(e.contains("It is not allowed to add database prefix ") &&
+ e.contains("to the table name in CACHE TABLE AS SELECT"))
+ }
+ }
+ }
+
test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")