diff options
author | Michael Armbrust <michael@databricks.com> | 2014-06-24 19:04:29 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-24 19:04:29 -0700 |
commit | a162c9b337d99dd2a6102a80deb2a9707cdd93e9 (patch) | |
tree | 1ef7490fecaaed1e9ff25945b1c3fafa1b07012b | |
parent | 1978a9033ea0d47ac0e0a51d97c8515689f84d04 (diff) | |
download | spark-a162c9b337d99dd2a6102a80deb2a9707cdd93e9.tar.gz spark-a162c9b337d99dd2a6102a80deb2a9707cdd93e9.tar.bz2 spark-a162c9b337d99dd2a6102a80deb2a9707cdd93e9.zip |
[SPARK-2264][SQL] Fix failing CachedTableSuite
Author: Michael Armbrust <michael@databricks.com>
Closes #1201 from marmbrus/fixCacheTests and squashes the following commits:
9d87ed1 [Michael Armbrust] Use analyzer (which runs to fixed point) instead of manually removing analysis operators.
3 files changed, 29 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0bcfbf6f84..7195f9709d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -186,8 +186,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { - val currentTable = catalog.lookupRelation(None, tableName) - val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match { + val currentTable = table(tableName).queryExecution.analyzed + val asInMemoryRelation = currentTable match { case _: InMemoryRelation => currentTable.logicalPlan @@ -202,7 +202,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = { - EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { + table(tableName).queryExecution.analyzed match { // This is kind of a hack to make sure that if this was just an RDD registered as a table, // we reregister the RDD as a table. case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => @@ -218,8 +218,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Returns true if the table is currently cached in-memory. */ def isCached(tableName: String): Boolean = { - val relation = catalog.lookupRelation(None, tableName) - EliminateAnalysisOperators(relation) match { + val relation = table(tableName).queryExecution.analyzed + relation match { case _: InMemoryRelation => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 128ddae11e..c3c0dcb1aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -20,13 +20,31 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ class CachedTableSuite extends QueryTest { TestData // Load test tables. - // NOTE: ALL TESTS ARE IGNORED PENDING SPARK-2264 + test("SPARK-1669: cacheTable should be idempotent") { + assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) - ignore("read from cached table and uncache") { + cacheTable("testData") + table("testData").queryExecution.analyzed match { + case _: InMemoryRelation => + case _ => + fail("testData should be cached") + } + + cacheTable("testData") + table("testData").queryExecution.analyzed match { + case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => + fail("cacheTable is not idempotent") + + case _ => + } + } + + test("read from cached table and uncache") { TestSQLContext.cacheTable("testData") checkAnswer( @@ -53,20 +71,20 @@ class CachedTableSuite extends QueryTest { } } - ignore("correct error on uncache of non-cached table") { + test("correct error on uncache of non-cached table") { intercept[IllegalArgumentException] { TestSQLContext.uncacheTable("testData") } } - ignore("SELECT Star Cached Table") { + test("SELECT Star Cached Table") { TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar") TestSQLContext.cacheTable("selectStar") TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect() TestSQLContext.uncacheTable("selectStar") } - ignore("Self-join cached") { + test("Self-join cached") { val unCachedAnswer = TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect() TestSQLContext.cacheTable("testData") @@ -76,7 +94,7 @@ class CachedTableSuite extends QueryTest { TestSQLContext.uncacheTable("testData") } - ignore("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { + test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { TestSQLContext.sql("CACHE TABLE testData") TestSQLContext.table("testData").queryExecution.executedPlan match { case _: InMemoryColumnarTableScan => // Found evidence of caching diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cca58c0063..bf7fafe952 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -406,23 +406,4 @@ class SQLQuerySuite extends QueryTest { ) clear() } - - test("SPARK-1669: cacheTable should be idempotent") { - assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) - - cacheTable("testData") - EliminateAnalysisOperators(table("testData").logicalPlan) match { - case _: InMemoryRelation => - case _ => - fail("testData should be cached") - } - - cacheTable("testData") - EliminateAnalysisOperators(table("testData").logicalPlan) match { - case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => - fail("cacheTable is not idempotent") - - case _ => - } - } } |