aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorzhonghaihua <793507405@qq.com>2016-04-01 16:23:14 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-04-01 16:23:14 -0500
commitbd7b91cefb0d192d808778e6182dcdd2c143e132 (patch)
treeed8f76bab3aa5042e7f3fa88b4ef2dcd5eb0ddcd /core/src/main/scala/org/apache
parent3e991dbc310a4a33eec7f3909adce50bf8268d04 (diff)
downloadspark-bd7b91cefb0d192d808778e6182dcdd2c143e132.tar.gz
spark-bd7b91cefb0d192d808778e6182dcdd2c143e132.tar.bz2
spark-bd7b91cefb0d192d808778e6182dcdd2c143e132.zip
[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…
Currently, when max number of executor failures reached the `maxNumExecutorFailures`, `ApplicationMaster` will be killed and re-register another one.This time, `YarnAllocator` will be created a new instance. But, the value of property `executorIdCounter` in `YarnAllocator` will reset to `0`. Then the Id of new executor will starting from `1`. This will confuse with the executor has already created before, which will cause FetchFailedException. This situation is just in yarn client mode, so this is an issue in yarn client mode. For more details, [link to jira issues SPARK-12864](https://issues.apache.org/jira/browse/SPARK-12864) This PR introduce a mechanism to initialize `executorIdCounter` after `ApplicationMaster` killed. Author: zhonghaihua <793507405@qq.com> Closes #10794 from zhonghaihua/initExecutorIdCounterAfterAMKilled.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
2 files changed, 8 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 8d5c11dc36..46a829114e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {
case object RetrieveSparkProps extends CoarseGrainedClusterMessage
+ case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
+
// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index eb4f5331d6..70470cc6d2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -79,6 +79,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
+ // The num of current max ExecutorId used to re-register appMaster
+ protected var currentExecutorIdCounter = 0
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -156,6 +159,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
+ if (currentExecutorIdCounter < executorId.toInt) {
+ currentExecutorIdCounter = executorId.toInt
+ }
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")