aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2012-11-15 16:43:17 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2012-11-16 20:12:31 -0800
commit12c24e786c9f2eec02131a2bc7a5bb463797aa2a (patch)
tree1975b8da60a37295d035d8b0f125a6b3e32dcd39
parentc23a74df0ab1ab105a3ad6b70e93bc0aa614771d (diff)
downloadspark-12c24e786c9f2eec02131a2bc7a5bb463797aa2a.tar.gz
spark-12c24e786c9f2eec02131a2bc7a5bb463797aa2a.tar.bz2
spark-12c24e786c9f2eec02131a2bc7a5bb463797aa2a.zip
Set default uncaught exception handler to exit.
Among other things, should prevent OutOfMemoryErrors in some daemon threads (such as the network manager) from causing a spark executor to enter a state where it cannot make progress but does not report an error.
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala15
2 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4c6ec6cc6e..9f2b0c42c7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -68,7 +68,6 @@ object SparkEnv extends Logging {
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {
-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index dfdb22024e..cb29a6b8b4 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -43,6 +43,21 @@ private[spark] class Executor extends Logging {
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+ System.exit(1)
+ } catch {
+ case t: Throwable => System.exit(2)
+ }
+ }
+ }
+ )
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)