diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-06-12 16:37:44 -0700 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-06-12 16:37:44 -0700 |
commit | caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f (patch) | |
tree | 7edef67ff3399c61e0bb12d7041db847b5da79cd /sql/core/src/test | |
parent | 20b8f2c32af696c3856221c4c4fcd12c3f068af2 (diff) | |
download | spark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.tar.gz spark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.tar.bz2 spark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.zip |
[SPARK-15870][SQL] DataFrame can't execute after uncacheTable.
## What changes were proposed in this pull request?
If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following:
```
val selectStar = sql("SELECT * FROM testData WHERE key = 1")
selectStar.createOrReplaceTempView("selectStar")
spark.catalog.cacheTable("selectStar")
checkAnswer(
selectStar,
Seq(Row(1, "1")))
spark.catalog.uncacheTable("selectStar")
checkAnswer(
selectStar,
Seq(Row(1, "1")))
```
, then the uncached `DataFrame` can't execute because of `Task not serializable` exception like:
```
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
...
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
...
```
Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable` doesn't work.
This pr reverts a part of cf38fe0 not to unregister `batchStats` accumulator, which is not needed to be unregistered here because it will be done by `ContextCleaner` after it is collected by GC.
## How was this patch tested?
Added a test to check if DataFrame can execute after uncacheTable and other existing tests.
But I made a test to check if the accumulator was cleared as `ignore` because the test would be flaky.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #13596 from ueshin/issues/SPARK-15870.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e08a9ab7e6..44bafa55bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql +import scala.collection.mutable.HashSet import scala.concurrent.duration._ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ +import org.apache.spark.CleanerListener import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchange @@ -321,7 +323,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } - test("Clear accumulators when uncacheTable to prevent memory leaking") { + test("Ensure accumulators to be cleared after GC when uncacheTable") { sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1") sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2") @@ -333,17 +335,39 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("SELECT * FROM t1").count() sql("SELECT * FROM t2").count() + val toBeCleanedAccIds = new HashSet[Long] + val accId1 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head + toBeCleanedAccIds += accId1 val accId2 = spark.table("t1").queryExecution.withCachedData.collect { case i: InMemoryRelation => i.batchStats.id }.head + toBeCleanedAccIds += accId2 + + val cleanerListener = new CleanerListener { + def rddCleaned(rddId: Int): Unit = {} + def shuffleCleaned(shuffleId: Int): Unit = {} + def broadcastCleaned(broadcastId: Long): Unit = {} + def accumCleaned(accId: Long): Unit = { + toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId } + } + def checkpointCleaned(rddId: Long): Unit = {} + } + spark.sparkContext.cleaner.get.attachListener(cleanerListener) spark.catalog.uncacheTable("t1") spark.catalog.uncacheTable("t2") + System.gc() + + eventually(timeout(10 seconds)) { + assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty }, + "batchStats accumulators should be cleared after GC when uncacheTable") + } + assert(AccumulatorContext.get(accId1).isEmpty) assert(AccumulatorContext.get(accId2).isEmpty) } @@ -513,4 +537,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext spark.catalog.uncacheTable("t2") } } + + test("SPARK-15870 DataFrame can't execute after uncacheTable") { + val selectStar = sql("SELECT * FROM testData WHERE key = 1") + selectStar.createOrReplaceTempView("selectStar") + + spark.catalog.cacheTable("selectStar") + checkAnswer( + selectStar, + Seq(Row(1, "1"))) + + spark.catalog.uncacheTable("selectStar") + checkAnswer( + selectStar, + Seq(Row(1, "1"))) + } } |