aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala34
4 files changed, 48 insertions, 12 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
index e8a793d107..11e6831b24 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SameResultSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
/**
- * Provides helper methods for comparing plans.
+ * Tests for the sameResult function of [[LogicalPlan]].
*/
class SameResultSuite extends FunSuite {
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -52,11 +52,15 @@ class SameResultSuite extends FunSuite {
assertSameResult(testRelation.select('a, 'b), testRelation2.select('a, 'b))
assertSameResult(testRelation.select('b, 'a), testRelation2.select('b, 'a))
- assertSameResult(testRelation, testRelation2.select('a), false)
- assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), false)
+ assertSameResult(testRelation, testRelation2.select('a), result = false)
+ assertSameResult(testRelation.select('b, 'a), testRelation2.select('a, 'b), result = false)
}
test("filters") {
assertSameResult(testRelation.where('a === 'b), testRelation2.where('a === 'b))
}
+
+ test("sorts") {
+ assertSameResult(testRelation.orderBy('a.asc), testRelation2.orderBy('a.asc))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 5ab2b5316a..3ced11a5e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -82,7 +82,7 @@ private[sql] trait CacheManager {
private[sql] def cacheQuery(
query: SchemaRDD,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
- val planToCache = query.queryExecution.optimizedPlan
+ val planToCache = query.queryExecution.analyzed
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
@@ -96,8 +96,8 @@ private[sql] trait CacheManager {
/** Removes the data for the given SchemaRDD from the cache */
private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
- val planToCache = query.queryExecution.optimizedPlan
- val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
+ val planToCache = query.queryExecution.analyzed
+ val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
@@ -106,12 +106,12 @@ private[sql] trait CacheManager {
/** Optionally returns cached data for the given SchemaRDD */
private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock {
- lookupCachedData(query.queryExecution.optimizedPlan)
+ lookupCachedData(query.queryExecution.analyzed)
}
/** Optionally returns cached data for the given LogicalPlan. */
private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
- cachedData.find(_.plan.sameResult(plan))
+ cachedData.find(cd => plan.sameResult(cd.plan))
}
/** Replaces segments of the given logical plan with cached versions where possible. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0e4a9ca60b..590dbf3cb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -374,13 +374,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
def logical: LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
- lazy val optimizedPlan = optimizer(analyzed)
- lazy val withCachedData = useCachedData(optimizedPlan)
+ lazy val withCachedData = useCachedData(analyzed)
+ lazy val optimizedPlan = optimizer(withCachedData)
// TODO: Don't just pick the first one...
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
- planner(withCachedData).next()
+ planner(optimizedPlan).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 444bc95009..da5a358df3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -53,17 +53,47 @@ class CachedTableSuite extends QueryTest {
sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
+ test("cache temp table") {
+ testData.select('key).registerTempTable("tempTable")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
+ cacheTable("tempTable")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+ uncacheTable("tempTable")
+ }
+
+ test("cache table as select") {
+ sql("CACHE TABLE tempTable AS SELECT key FROM testData")
+ assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+ uncacheTable("tempTable")
+ }
+
+ test("uncaching temp table") {
+ testData.select('key).registerTempTable("tempTable1")
+ testData.select('key).registerTempTable("tempTable2")
+ cacheTable("tempTable1")
+
+ assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
+ assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
+
+ // Is this valid?
+ uncacheTable("tempTable2")
+
+ // Should this be cached?
+ assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
+ }
+
test("too big for memory") {
val data = "*" * 10000
sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).registerTempTable("bigData")
table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
assert(table("bigData").count() === 200000L)
- table("bigData").unpersist()
+ table("bigData").unpersist(blocking = true)
}
test("calling .cache() should use in-memory columnar caching") {
table("testData").cache()
assertCached(table("testData"))
+ table("testData").unpersist(blocking = true)
}
test("calling .unpersist() should drop in-memory columnar cache") {
@@ -108,6 +138,8 @@ class CachedTableSuite extends QueryTest {
case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
}.size
}
+
+ uncacheTable("testData")
}
test("read from cached table and uncache") {