aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala27
1 files changed, 27 insertions, 0 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 7e10f15226..d3dafe9c42 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
+
+ test("should clone and clean line object in ClosureCleaner") {
+ val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+ """
+ |import org.apache.spark.rdd.RDD
+ |
+ |val lines = sc.textFile("pom.xml")
+ |case class Data(s: String)
+ |val dataRDD = lines.map(line => Data(line.take(3)))
+ |dataRDD.cache.count
+ |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
+ |repartitioned.cache.count
+ |
+ |def getCacheSize(rdd: RDD[_]) = {
+ | sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
+ |}
+ |val cacheSize1 = getCacheSize(dataRDD)
+ |val cacheSize2 = getCacheSize(repartitioned)
+ |
+ |// The cache size of dataRDD and the repartitioned one should be similar.
+ |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
+ |assert(deviation < 0.2,
+ | s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
+ """.stripMargin)
+ assertDoesNotContain("AssertionError", output)
+ assertDoesNotContain("Exception", output)
+ }
}