aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-14 10:58:06 -0700
committerAndrew Or <andrew@databricks.com>2016-04-14 10:58:06 -0700
commit1d04c86fc575470e15f6667076377cea102552d7 (patch)
tree0c0b2cc924dcaf4099198e17b352a04183c9b0c1
parenta46f98d3f4ba6a79f4ef789806fec80a7d4f342d (diff)
downloadspark-1d04c86fc575470e15f6667076377cea102552d7.tar.gz
spark-1d04c86fc575470e15f6667076377cea102552d7.tar.bz2
spark-1d04c86fc575470e15f6667076377cea102552d7.zip
[SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object
## What changes were proposed in this pull request? When we clean a closure, if its outermost parent is not a closure, we won't clone and clean it as cloning user's objects is dangerous. However, if it's a REPL line object, which may carry a lot of unnecessary references(like hadoop conf, spark conf, etc.), we should clean it as it's not a user object. This PR improves the check for user's objects to exclude REPL line object. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12327 from cloud-fan/closure.
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala53
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala27
2 files changed, 50 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 2f6924f7de..489688cb08 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -19,7 +19,8 @@ package org.apache.spark.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-import scala.collection.mutable.{Map, Set}
+import scala.collection.mutable.{Map, Set, Stack}
+import scala.language.existentials
import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm5.Opcodes._
@@ -77,35 +78,19 @@ private[spark] object ClosureCleaner extends Logging {
*/
private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
val seen = Set[Class[_]](obj.getClass)
- var stack = List[Class[_]](obj.getClass)
+ val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
- val cr = getClassReader(stack.head)
- stack = stack.tail
+ val cr = getClassReader(stack.pop())
val set = Set[Class[_]]()
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
- stack = cls :: stack
+ stack.push(cls)
}
}
(seen - obj.getClass).toList
}
- private def createNullValue(cls: Class[_]): AnyRef = {
- if (cls.isPrimitive) {
- cls match {
- case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
- case java.lang.Character.TYPE => new java.lang.Character('\u0000')
- case java.lang.Void.TYPE =>
- // This should not happen because `Foo(void x) {}` does not compile.
- throw new IllegalStateException("Unexpected void parameter in constructor")
- case _ => new java.lang.Byte(0: Byte)
- }
- } else {
- null
- }
- }
-
/**
* Clean the given closure in place.
*
@@ -233,16 +218,24 @@ private[spark] object ClosureCleaner extends Logging {
// Note that all outer objects but the outermost one (first one in this list) must be closures
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
var parent: AnyRef = null
- if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
- // The closure is ultimately nested inside a class; keep the object of that
- // class without cloning it since we don't want to clone the user's objects.
- // Note that we still need to keep around the outermost object itself because
- // we need it to clone its child closure later (see below).
- logDebug(s" + outermost object is not a closure, so do not clone it: ${outerPairs.head}")
- parent = outerPairs.head._2 // e.g. SparkContext
- outerPairs = outerPairs.tail
- } else if (outerPairs.size > 0) {
- logDebug(s" + outermost object is a closure, so we just keep it: ${outerPairs.head}")
+ if (outerPairs.size > 0) {
+ val (outermostClass, outermostObject) = outerPairs.head
+ if (isClosure(outermostClass)) {
+ logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
+ } else if (outermostClass.getName.startsWith("$line")) {
+ // SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
+ // as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.
+ logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
+ } else {
+ // The closure is ultimately nested inside a class; keep the object of that
+ // class without cloning it since we don't want to clone the user's objects.
+ // Note that we still need to keep around the outermost object itself because
+ // we need it to clone its child closure later (see below).
+ logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
+ outerPairs.head)
+ parent = outermostObject // e.g. SparkContext
+ outerPairs = outerPairs.tail
+ }
} else {
logDebug(" + there are no enclosing objects!")
}
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 7e10f15226..d3dafe9c42 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
+
+ test("should clone and clean line object in ClosureCleaner") {
+ val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+ """
+ |import org.apache.spark.rdd.RDD
+ |
+ |val lines = sc.textFile("pom.xml")
+ |case class Data(s: String)
+ |val dataRDD = lines.map(line => Data(line.take(3)))
+ |dataRDD.cache.count
+ |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
+ |repartitioned.cache.count
+ |
+ |def getCacheSize(rdd: RDD[_]) = {
+ | sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
+ |}
+ |val cacheSize1 = getCacheSize(dataRDD)
+ |val cacheSize2 = getCacheSize(repartitioned)
+ |
+ |// The cache size of dataRDD and the repartitioned one should be similar.
+ |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
+ |assert(deviation < 0.2,
+ | s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
+ """.stripMargin)
+ assertDoesNotContain("AssertionError", output)
+ assertDoesNotContain("Exception", output)
+ }
}