diff options
author | Oleksii Kostyliev <etander@gmail.com> | 2015-05-15 11:19:56 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-15 11:21:06 -0700 |
commit | c58b9c6167133aa0a5ececcfb5e4a379ab7a3ef1 (patch) | |
tree | 780d440221a4502388fa647803a4d939f0939666 /core | |
parent | dfdae5800c6a4f9b8e941138f61b784b24b0b00b (diff) | |
download | spark-c58b9c6167133aa0a5ececcfb5e4a379ab7a3ef1.tar.gz spark-c58b9c6167133aa0a5ececcfb5e4a379ab7a3ef1.tar.bz2 spark-c58b9c6167133aa0a5ececcfb5e4a379ab7a3ef1.zip |
[SPARK-7233] [CORE] Detect REPL mode once
<h3>Description</h3>
Detect REPL mode once per JVM lifespan.
Previous behavior was to check presence of interpreter mode every time a job was submitted. In the case of execution of multiple short-living jobs this was causing massive mutual blocks between submission threads.
For more details please refer to https://issues.apache.org/jira/browse/SPARK-7233.
<h3>Notes</h3>
* I inverted the return value in case of catching an exception from `true` to `false`. It seems more logical to assume that if the REPL class is not found, we aren't in the interpreter mode.
* I'd personally would call `classForName` with just a Spark classloader (`org.apache.spark.util.Utils#getSparkClassLoader`) but `org.apache.spark.util.Utils#getContextOrSparkClassLoader` is said to be preferable.
* I struggled to come up with a concise, readable and clear unit test. Suggestions are welcome if you feel necessary.
Author: Oleksii Kostyliev <etander@gmail.com>
Author: Oleksii Kostyliev <okostyliev@thunderhead.com>
Closes #5835 from preeze/SPARK-7233 and squashes the following commits:
69bb9e4 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements
26dcc24 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements
c6f9685 [Oleksii Kostyliev] Merge remote-tracking branch 'remotes/upstream/master' into SPARK-7233
b78a983 [Oleksii Kostyliev] SPARK-7527: revert the fix and let it be addressed separately at a later stage
b64d441 [Oleksii Kostyliev] SPARK-7233: inline inInterpreter parameter into instantiateClass
86e2606 [Oleksii Kostyliev] SPARK-7233, SPARK-7527: Handle interpreter mode properly.
c7ee69c [Oleksii Kostyliev] Merge remote-tracking branch 'upstream/master' into SPARK-7233
d6c07fc [Oleksii Kostyliev] SPARK-7233: properly handle the inverted meaning of isInInterpreter
c319039 [Oleksii Kostyliev] SPARK-7233: move inInterpreter to Utils and make it lazy
(cherry picked from commit b1b9d5802e3d185f42711ab043a21c9d1eb4763f)
Signed-off-by: Andrew Or <andrew@databricks.com>
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 14 |
2 files changed, 17 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 6fe32e469c..6f2966bd4f 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s" + fields accessed by starting closure: " + accessedFields.size) accessedFields.foreach { f => logDebug(" " + f) } - val inInterpreter = { - try { - val interpClass = Class.forName("spark.repl.Main") - interpClass.getMethod("interp").invoke(null) != null - } catch { - case _: ClassNotFoundException => true - } - } - // List of outer (class, object) pairs, ordered from outermost to innermost // Note that all outer objects but the outermost one (first one in this list) must be closures var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse @@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging { // required fields from the original object. We need the parent here because the Java // language specification requires the first constructor parameter of any closure to be // its enclosing object. - val clone = instantiateClass(cls, parent, inInterpreter) + val clone = instantiateClass(cls, parent) for (fieldName <- accessedFields(cls)) { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) @@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging { private def instantiateClass( cls: Class[_], - enclosingObject: AnyRef, - inInterpreter: Boolean): AnyRef = { - if (!inInterpreter) { + enclosingObject: AnyRef): AnyRef = { + if (!Utils.isInInterpreter) { // This is a bona fide closure class, whose constructor has no effects // other than to set its fields, so use its constructor val cons = cls.getConstructors()(0) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 48843b4ae5..6a7d1fae33 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging { } } + lazy val isInInterpreter: Boolean = { + try { + val interpClass = classForName("spark.repl.Main") + interpClass.getMethod("interp").invoke(null) != null + } catch { + // Returning true seems to be a mistake. + // Currently changing it to false causes tests failures in Streaming. + // For a more detailed discussion, please, refer to + // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. + // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 + case _: ClassNotFoundException => true + } + } + /** * Return a well-formed URI for the file described by a user input string. * |