diff options
3 files changed, 29 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0bcfbf6f84..7195f9709d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -186,8 +186,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { - val currentTable = catalog.lookupRelation(None, tableName) - val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match { + val currentTable = table(tableName).queryExecution.analyzed + val asInMemoryRelation = currentTable match { case _: InMemoryRelation => currentTable.logicalPlan @@ -202,7 +202,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Removes the specified table from the in-memory cache. */ def uncacheTable(tableName: String): Unit = { - EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { + table(tableName).queryExecution.analyzed match { // This is kind of a hack to make sure that if this was just an RDD registered as a table, // we reregister the RDD as a table. case inMem @ InMemoryRelation(_, _, e: ExistingRdd) => @@ -218,8 +218,8 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Returns true if the table is currently cached in-memory. */ def isCached(tableName: String): Boolean = { - val relation = catalog.lookupRelation(None, tableName) - EliminateAnalysisOperators(relation) match { + val relation = table(tableName).queryExecution.analyzed + relation match { case _: InMemoryRelation => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 128ddae11e..c3c0dcb1aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -20,13 +20,31 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ class CachedTableSuite extends QueryTest { TestData // Load test tables. - // NOTE: ALL TESTS ARE IGNORED PENDING SPARK-2264 + test("SPARK-1669: cacheTable should be idempotent") { + assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) - ignore("read from cached table and uncache") { + cacheTable("testData") + table("testData").queryExecution.analyzed match { + case _: InMemoryRelation => + case _ => + fail("testData should be cached") + } + + cacheTable("testData") + table("testData").queryExecution.analyzed match { + case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => + fail("cacheTable is not idempotent") + + case _ => + } + } + + test("read from cached table and uncache") { TestSQLContext.cacheTable("testData") checkAnswer( @@ -53,20 +71,20 @@ class CachedTableSuite extends QueryTest { } } - ignore("correct error on uncache of non-cached table") { + test("correct error on uncache of non-cached table") { intercept[IllegalArgumentException] { TestSQLContext.uncacheTable("testData") } } - ignore("SELECT Star Cached Table") { + test("SELECT Star Cached Table") { TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar") TestSQLContext.cacheTable("selectStar") TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect() TestSQLContext.uncacheTable("selectStar") } - ignore("Self-join cached") { + test("Self-join cached") { val unCachedAnswer = TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect() TestSQLContext.cacheTable("testData") @@ -76,7 +94,7 @@ class CachedTableSuite extends QueryTest { TestSQLContext.uncacheTable("testData") } - ignore("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { + test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { TestSQLContext.sql("CACHE TABLE testData") TestSQLContext.table("testData").queryExecution.executedPlan match { case _: InMemoryColumnarTableScan => // Found evidence of caching diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cca58c0063..bf7fafe952 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -406,23 +406,4 @@ class SQLQuerySuite extends QueryTest { ) clear() } - - test("SPARK-1669: cacheTable should be idempotent") { - assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) - - cacheTable("testData") - EliminateAnalysisOperators(table("testData").logicalPlan) match { - case _: InMemoryRelation => - case _ => - fail("testData should be cached") - } - - cacheTable("testData") - EliminateAnalysisOperators(table("testData").logicalPlan) match { - case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => - fail("cacheTable is not idempotent") - - case _ => - } - } } |