diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-06-11 18:01:32 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-06-11 18:01:47 -0700 |
commit | 767cc94ca6d397ba19226996ccb3c8e57083c549 (patch) | |
tree | 51137e263c24f768b0cb46894a5870e80ad59f1a | |
parent | 337c16d57e40cb4967bf85269baae14745f161db (diff) | |
download | spark-767cc94ca6d397ba19226996ccb3c8e57083c549.tar.gz spark-767cc94ca6d397ba19226996ccb3c8e57083c549.tar.bz2 spark-767cc94ca6d397ba19226996ccb3c8e57083c549.zip |
[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect() after cache()
When df.cache() method called, the `withCachedData` of `QueryExecution` has been created, which mean it will not look up the cached tables when action method called afterward.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #5714 from chenghao-intel/SPARK-7158 and squashes the following commits:
58ea8aa [Cheng Hao] style issue
2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager
a5647d9 [Cheng Hao] hide the queryExecution of DataFrame
fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for cache/persist/unpersist
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 26 |
2 files changed, 27 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5fcc48a679..a4b38d364d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { sqlContext.conf.useCompression, sqlContext.conf.columnBatchSize, storageLevel, - query.queryExecution.executedPlan, + sqlContext.executePlan(query.logicalPlan).executedPlan, tableName)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3ca5ff347d..14ecd4e9a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { ) } + test("SPARK-7158 collect and take return different results") { + import java.util.UUID + import org.apache.spark.sql.types._ + + val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") + // we except the id is materialized once + def id: () => String = () => { UUID.randomUUID().toString() } + + val dfWithId = df.withColumn("id", callUDF(id, StringType)) + // Make a new DataFrame (actually the same reference to the old one) + val cached = dfWithId.cache() + // Trigger the cache + val d0 = dfWithId.collect() + val d1 = cached.collect() + val d2 = cached.collect() + + // Since the ID is only materialized once, then all of the records + // should come from the cache, not by re-computing. Otherwise, the ID + // will be different + assert(d0.map(_(0)) === d2.map(_(0))) + assert(d0.map(_(1)) === d2.map(_(1))) + + assert(d1.map(_(0)) === d2.map(_(0))) + assert(d1.map(_(1)) === d2.map(_(1))) + } + test("grouping on nested fields") { sqlContext.read.json(sqlContext.sparkContext.parallelize( """{"nested": {"attribute": 1}, "value": 2}""" :: Nil)) |