diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-14 10:58:06 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-14 10:58:06 -0700 |
commit | 1d04c86fc575470e15f6667076377cea102552d7 (patch) | |
tree | 0c0b2cc924dcaf4099198e17b352a04183c9b0c1 /repl/scala-2.11 | |
parent | a46f98d3f4ba6a79f4ef789806fec80a7d4f342d (diff) | |
download | spark-1d04c86fc575470e15f6667076377cea102552d7.tar.gz spark-1d04c86fc575470e15f6667076377cea102552d7.tar.bz2 spark-1d04c86fc575470e15f6667076377cea102552d7.zip |
[SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object
## What changes were proposed in this pull request?
When we clean a closure, if its outermost parent is not a closure, we won't clone and clean it as cloning user's objects is dangerous. However, if it's a REPL line object, which may carry a lot of unnecessary references(like hadoop conf, spark conf, etc.), we should clean it as it's not a user object.
This PR improves the check for user's objects to exclude REPL line object.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12327 from cloud-fan/closure.
Diffstat (limited to 'repl/scala-2.11')
-rw-r--r-- | repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 7e10f15226..d3dafe9c42 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) } + + test("should clone and clean line object in ClosureCleaner") { + val output = runInterpreterInPasteMode("local-cluster[1,4,4096]", + """ + |import org.apache.spark.rdd.RDD + | + |val lines = sc.textFile("pom.xml") + |case class Data(s: String) + |val dataRDD = lines.map(line => Data(line.take(3))) + |dataRDD.cache.count + |val repartitioned = dataRDD.repartition(dataRDD.partitions.size) + |repartitioned.cache.count + | + |def getCacheSize(rdd: RDD[_]) = { + | sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum + |} + |val cacheSize1 = getCacheSize(dataRDD) + |val cacheSize2 = getCacheSize(repartitioned) + | + |// The cache size of dataRDD and the repartitioned one should be similar. + |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1 + |assert(deviation < 0.2, + | s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2") + """.stripMargin) + assertDoesNotContain("AssertionError", output) + assertDoesNotContain("Exception", output) + } } |