diff options
4 files changed, 48 insertions, 12 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala index e8a793d107..11e6831b24 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ /** - * Provides helper methods for comparing plans. + * Tests for the sameResult function of [[LogicalPlan]]. */ class SameResultSuite extends FunSuite { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -52,11 +52,15 @@ class SameResultSuite extends FunSuite { assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b)) assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a)) - assertSameResult(testRelation, testRelation2.select('a), false) - assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false) + assertSameResult(testRelation, testRelation2.select('a), result = false) + assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), result = false) } test("filters") { assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b)) } + + test("sorts") { + assertSameResult(testRelation.orderBy('a.asc), testRelation2.orderBy('a.asc)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 5ab2b5316a..3ced11a5e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -82,7 +82,7 @@ private[sql] trait CacheManager { private[sql] def cacheQuery( query: SchemaRDD, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.optimizedPlan + val planToCache = query.queryExecution.analyzed if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { @@ -96,8 +96,8 @@ private[sql] trait CacheManager { /** Removes the data for the given SchemaRDD from the cache */ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.optimizedPlan - val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache)) + val planToCache = query.queryExecution.analyzed + val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) cachedData.remove(dataIndex) @@ -106,12 +106,12 @@ private[sql] trait CacheManager { /** Optionally returns cached data for the given SchemaRDD */ private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock { - lookupCachedData(query.queryExecution.optimizedPlan) + lookupCachedData(query.queryExecution.analyzed) } /** Optionally returns cached data for the given LogicalPlan. */ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.find(_.plan.sameResult(plan)) + cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0e4a9ca60b..590dbf3cb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -374,13 +374,13 @@ class SQLContext(@transient val sparkContext: SparkContext) def logical: LogicalPlan lazy val analyzed = ExtractPythonUdfs(analyzer(logical)) - lazy val optimizedPlan = optimizer(analyzed) - lazy val withCachedData = useCachedData(optimizedPlan) + lazy val withCachedData = useCachedData(analyzed) + lazy val optimizedPlan = optimizer(withCachedData) // TODO: Don't just pick the first one... lazy val sparkPlan = { SparkPlan.currentContext.set(self) - planner(withCachedData).next() + planner(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 444bc95009..da5a358df3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -53,17 +53,47 @@ class CachedTableSuite extends QueryTest { sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } + test("cache temp table") { + testData.select('key).registerTempTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0) + cacheTable("tempTable") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + uncacheTable("tempTable") + } + + test("cache table as select") { + sql("CACHE TABLE tempTable AS SELECT key FROM testData") + assertCached(sql("SELECT COUNT(*) FROM tempTable")) + uncacheTable("tempTable") + } + + test("uncaching temp table") { + testData.select('key).registerTempTable("tempTable1") + testData.select('key).registerTempTable("tempTable2") + cacheTable("tempTable1") + + assertCached(sql("SELECT COUNT(*) FROM tempTable1")) + assertCached(sql("SELECT COUNT(*) FROM tempTable2")) + + // Is this valid? + uncacheTable("tempTable2") + + // Should this be cached? + assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0) + } + test("too big for memory") { val data = "*" * 10000 sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData") table("bigData").persist(StorageLevel.MEMORY_AND_DISK) assert(table("bigData").count() === 200000L) - table("bigData").unpersist() + table("bigData").unpersist(blocking = true) } test("calling .cache() should use in-memory columnar caching") { table("testData").cache() assertCached(table("testData")) + table("testData").unpersist(blocking = true) } test("calling .unpersist() should drop in-memory columnar cache") { @@ -108,6 +138,8 @@ class CachedTableSuite extends QueryTest { case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r }.size } + + uncacheTable("testData") } test("read from cached table and uncache") { |