aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-20 21:03:51 -0700
committerReynold Xin <rxin@apache.org>2013-10-20 21:03:51 -0700
commit5b9380e0173b3d3d13235ae912e9ccc2a974b98b (patch)
tree0492bcbcac5f87494a9a426ae9599e4c7cf9cd1b
parent261bcf27b3ef157fd0844d0b8285f81229705f99 (diff)
parent7414805e4efdfa893805389f001634a989ae3bda (diff)
downloadspark-5b9380e0173b3d3d13235ae912e9ccc2a974b98b.tar.gz
spark-5b9380e0173b3d3d13235ae912e9ccc2a974b98b.tar.bz2
spark-5b9380e0173b3d3d13235ae912e9ccc2a974b98b.zip
Merge pull request #89 from rxin/executor
Don't setup the uncaught exception handler in local mode. This avoids unit test failures for Spark streaming. java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54) at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108) at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108) at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41) at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66) at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala43
1 files changed, 23 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 032eb04f43..b773346df3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,30 +74,33 @@ private[spark] class Executor(
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
Thread.currentThread.setContextClassLoader(replClassLoader)
- // Make any thread terminations due to uncaught exceptions kill the entire
- // executor process to avoid surprising stalls.
- Thread.setDefaultUncaughtExceptionHandler(
- new Thread.UncaughtExceptionHandler {
- override def uncaughtException(thread: Thread, exception: Throwable) {
- try {
- logError("Uncaught exception in thread " + thread, exception)
-
- // We may have been called from a shutdown hook. If so, we must not call System.exit().
- // (If we do, we will deadlock.)
- if (!Utils.inShutdown()) {
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ if (!isLocal) {
+ // Setup an uncaught exception handler for non-local mode.
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ }
}
+ } catch {
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
- } catch {
- case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
- case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
- }
- )
+ )
+ }
val executorSource = new ExecutorSource(this, executorId)