aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-06-11 18:01:32 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-11 18:01:47 -0700
commit767cc94ca6d397ba19226996ccb3c8e57083c549 (patch)
tree51137e263c24f768b0cb46894a5870e80ad59f1a
parent337c16d57e40cb4967bf85269baae14745f161db (diff)
downloadspark-767cc94ca6d397ba19226996ccb3c8e57083c549.tar.gz
spark-767cc94ca6d397ba19226996ccb3c8e57083c549.tar.bz2
spark-767cc94ca6d397ba19226996ccb3c8e57083c549.zip
[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect() after cache()
When df.cache() method called, the `withCachedData` of `QueryExecution` has been created, which mean it will not look up the cached tables when action method called afterward. Author: Cheng Hao <hao.cheng@intel.com> Closes #5714 from chenghao-intel/SPARK-7158 and squashes the following commits: 58ea8aa [Cheng Hao] style issue 2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager a5647d9 [Cheng Hao] hide the queryExecution of DataFrame fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for cache/persist/unpersist
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala26
2 files changed, 27 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 5fcc48a679..a4b38d364d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
sqlContext.conf.useCompression,
sqlContext.conf.columnBatchSize,
storageLevel,
- query.queryExecution.executedPlan,
+ sqlContext.executePlan(query.logicalPlan).executedPlan,
tableName))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3ca5ff347d..14ecd4e9a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
)
}
+ test("SPARK-7158 collect and take return different results") {
+ import java.util.UUID
+ import org.apache.spark.sql.types._
+
+ val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
+ // we except the id is materialized once
+ def id: () => String = () => { UUID.randomUUID().toString() }
+
+ val dfWithId = df.withColumn("id", callUDF(id, StringType))
+ // Make a new DataFrame (actually the same reference to the old one)
+ val cached = dfWithId.cache()
+ // Trigger the cache
+ val d0 = dfWithId.collect()
+ val d1 = cached.collect()
+ val d2 = cached.collect()
+
+ // Since the ID is only materialized once, then all of the records
+ // should come from the cache, not by re-computing. Otherwise, the ID
+ // will be different
+ assert(d0.map(_(0)) === d2.map(_(0)))
+ assert(d0.map(_(1)) === d2.map(_(1)))
+
+ assert(d1.map(_(0)) === d2.map(_(0)))
+ assert(d1.map(_(1)) === d2.map(_(1)))
+ }
+
test("grouping on nested fields") {
sqlContext.read.json(sqlContext.sparkContext.parallelize(
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))