aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-06-12 16:37:44 -0700
committerWenchen Fan <wenchen@databricks.com>2016-06-12 16:37:44 -0700
commitcaebd7f2622340fc081bb9a2ea6a0b246f1e3a3f (patch)
tree7edef67ff3399c61e0bb12d7041db847b5da79cd /sql/core/src/test
parent20b8f2c32af696c3856221c4c4fcd12c3f068af2 (diff)
downloadspark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.tar.gz
spark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.tar.bz2
spark-caebd7f2622340fc081bb9a2ea6a0b246f1e3a3f.zip
[SPARK-15870][SQL] DataFrame can't execute after uncacheTable.
## What changes were proposed in this pull request? If a cached `DataFrame` executed more than once and then do `uncacheTable` like the following: ``` val selectStar = sql("SELECT * FROM testData WHERE key = 1") selectStar.createOrReplaceTempView("selectStar") spark.catalog.cacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) spark.catalog.uncacheTable("selectStar") checkAnswer( selectStar, Seq(Row(1, "1"))) ``` , then the uncached `DataFrame` can't execute because of `Task not serializable` exception like: ``` org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2038) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:357) at org.apache.spark.rdd.RDD.collect(RDD.scala:883) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290) ... Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ... ``` Notice that `DataFrame` uncached with `DataFrame.unpersist()` works, but with `spark.catalog.uncacheTable` doesn't work. This pr reverts a part of cf38fe0 not to unregister `batchStats` accumulator, which is not needed to be unregistered here because it will be done by `ContextCleaner` after it is collected by GC. ## How was this patch tested? Added a test to check if DataFrame can execute after uncacheTable and other existing tests. But I made a test to check if the accumulator was cleared as `ignore` because the test would be flaky. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13596 from ueshin/issues/SPARK-15870.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala41
1 files changed, 40 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e08a9ab7e6..44bafa55bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql
+import scala.collection.mutable.HashSet
import scala.concurrent.duration._
import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
+import org.apache.spark.CleanerListener
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -321,7 +323,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(spark.sharedState.cacheManager.isEmpty)
}
- test("Clear accumulators when uncacheTable to prevent memory leaking") {
+ test("Ensure accumulators to be cleared after GC when uncacheTable") {
sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")
sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")
@@ -333,17 +335,39 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("SELECT * FROM t1").count()
sql("SELECT * FROM t2").count()
+ val toBeCleanedAccIds = new HashSet[Long]
+
val accId1 = spark.table("t1").queryExecution.withCachedData.collect {
case i: InMemoryRelation => i.batchStats.id
}.head
+ toBeCleanedAccIds += accId1
val accId2 = spark.table("t1").queryExecution.withCachedData.collect {
case i: InMemoryRelation => i.batchStats.id
}.head
+ toBeCleanedAccIds += accId2
+
+ val cleanerListener = new CleanerListener {
+ def rddCleaned(rddId: Int): Unit = {}
+ def shuffleCleaned(shuffleId: Int): Unit = {}
+ def broadcastCleaned(broadcastId: Long): Unit = {}
+ def accumCleaned(accId: Long): Unit = {
+ toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId }
+ }
+ def checkpointCleaned(rddId: Long): Unit = {}
+ }
+ spark.sparkContext.cleaner.get.attachListener(cleanerListener)
spark.catalog.uncacheTable("t1")
spark.catalog.uncacheTable("t2")
+ System.gc()
+
+ eventually(timeout(10 seconds)) {
+ assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
+ "batchStats accumulators should be cleared after GC when uncacheTable")
+ }
+
assert(AccumulatorContext.get(accId1).isEmpty)
assert(AccumulatorContext.get(accId2).isEmpty)
}
@@ -513,4 +537,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.catalog.uncacheTable("t2")
}
}
+
+ test("SPARK-15870 DataFrame can't execute after uncacheTable") {
+ val selectStar = sql("SELECT * FROM testData WHERE key = 1")
+ selectStar.createOrReplaceTempView("selectStar")
+
+ spark.catalog.cacheTable("selectStar")
+ checkAnswer(
+ selectStar,
+ Seq(Row(1, "1")))
+
+ spark.catalog.uncacheTable("selectStar")
+ checkAnswer(
+ selectStar,
+ Seq(Row(1, "1")))
+ }
}