aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-14 10:58:06 -0700
committerAndrew Or <andrew@databricks.com>2016-04-14 10:58:06 -0700
commit1d04c86fc575470e15f6667076377cea102552d7 (patch)
tree0c0b2cc924dcaf4099198e17b352a04183c9b0c1 /repl
parenta46f98d3f4ba6a79f4ef789806fec80a7d4f342d (diff)
downloadspark-1d04c86fc575470e15f6667076377cea102552d7.tar.gz
spark-1d04c86fc575470e15f6667076377cea102552d7.tar.bz2
spark-1d04c86fc575470e15f6667076377cea102552d7.zip
[SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object
## What changes were proposed in this pull request? When we clean a closure, if its outermost parent is not a closure, we won't clone and clean it as cloning user's objects is dangerous. However, if it's a REPL line object, which may carry a lot of unnecessary references(like hadoop conf, spark conf, etc.), we should clean it as it's not a user object. This PR improves the check for user's objects to exclude REPL line object. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12327 from cloud-fan/closure.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala27
1 files changed, 27 insertions, 0 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 7e10f15226..d3dafe9c42 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
+
+ test("should clone and clean line object in ClosureCleaner") {
+ val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+ """
+ |import org.apache.spark.rdd.RDD
+ |
+ |val lines = sc.textFile("pom.xml")
+ |case class Data(s: String)
+ |val dataRDD = lines.map(line => Data(line.take(3)))
+ |dataRDD.cache.count
+ |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
+ |repartitioned.cache.count
+ |
+ |def getCacheSize(rdd: RDD[_]) = {
+ | sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
+ |}
+ |val cacheSize1 = getCacheSize(dataRDD)
+ |val cacheSize2 = getCacheSize(repartitioned)
+ |
+ |// The cache size of dataRDD and the repartitioned one should be similar.
+ |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
+ |assert(deviation < 0.2,
+ | s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
+ """.stripMargin)
+ assertDoesNotContain("AssertionError", output)
+ assertDoesNotContain("Exception", output)
+ }
}