diff options
author | KaiXinXiaoLei <huleilei1@huawei.com> | 2015-07-15 22:31:10 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-07-15 22:31:10 +0100 |
commit | 674eb2a4c3ff595760f990daf369ba75d2547593 (patch) | |
tree | cb56d32296d75749bddce0549bd6f9ded963b5a8 /core/src | |
parent | b9a922e260bec1b211437f020be37fab46a85db0 (diff) | |
download | spark-674eb2a4c3ff595760f990daf369ba75d2547593.tar.gz spark-674eb2a4c3ff595760f990daf369ba75d2547593.tar.bz2 spark-674eb2a4c3ff595760f990daf369ba75d2547593.zip |
[SPARK-8974] Catch exceptions in allocation schedule task.
I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like:
2015-07-14 19:02:17,461 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351)
at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382)
at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343)
at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295)
at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248)
when after some minutes, I find a new ApplicationMaster start, and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor does not reduce to zero. I use the default value of "spark.dynamicAllocation.minExecutors".
Author: KaiXinXiaoLei <huleilei1@huawei.com>
Closes #7352 from KaiXinXiaoLei/dym and squashes the following commits:
3603631 [KaiXinXiaoLei] change logError to logWarning
efc4f24 [KaiXinXiaoLei] change file
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0c50b4002c..648bcfe28c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} @@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager( listenerBus.addListener(listener) val scheduleTask = new Runnable() { - override def run(): Unit = Utils.logUncaughtExceptions(schedule()) + override def run(): Unit = { + try { + schedule() + } catch { + case ct: ControlThrowable => + throw ct + case t: Throwable => + logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + } + } } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) } |