aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorzhonghaihua <793507405@qq.com>2016-04-01 16:23:14 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-04-01 16:23:14 -0500
commitbd7b91cefb0d192d808778e6182dcdd2c143e132 (patch)
treeed8f76bab3aa5042e7f3fa88b4ef2dcd5eb0ddcd /yarn/src
parent3e991dbc310a4a33eec7f3909adce50bf8268d04 (diff)
downloadspark-bd7b91cefb0d192d808778e6182dcdd2c143e132.tar.gz
spark-bd7b91cefb0d192d808778e6182dcdd2c143e132.tar.bz2
spark-bd7b91cefb0d192d808778e6182dcdd2c143e132.zip
[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…
Currently, when max number of executor failures reached the `maxNumExecutorFailures`, `ApplicationMaster` will be killed and re-register another one.This time, `YarnAllocator` will be created a new instance. But, the value of property `executorIdCounter` in `YarnAllocator` will reset to `0`. Then the Id of new executor will starting from `1`. This will confuse with the executor has already created before, which will cause FetchFailedException. This situation is just in yarn client mode, so this is an issue in yarn client mode. For more details, [link to jira issues SPARK-12864](https://issues.apache.org/jira/browse/SPARK-12864) This PR introduce a mechanism to initialize `executorIdCounter` after `ApplicationMaster` killed. Author: zhonghaihua <793507405@qq.com> Closes #10794 from zhonghaihua/initExecutorIdCounterAfterAMKilled.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala3
2 files changed, 21 insertions, 2 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7d71a642f6..b0bfe855e9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -40,6 +40,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ThreadUtils
/**
@@ -83,8 +84,23 @@ private[yarn] class YarnAllocator(
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
@volatile private var numExecutorsRunning = 0
- // Used to generate a unique ID per executor
- private var executorIdCounter = 0
+
+ /**
+ * Used to generate a unique ID per executor
+ *
+ * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
+ * the id of new executor will start from 1, this will conflict with the executor has
+ * already created before. So, we should initialize the `executorIdCounter` by getting
+ * the max executorId from driver.
+ *
+ * And this situation of executorId conflict is just in yarn client mode, so this is an issue
+ * in yarn client mode. For more details, can check in jira.
+ *
+ * @see SPARK-12864
+ */
+ private var executorIdCounter: Int =
+ driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+
@volatile private var numExecutorsFailed = 0
@volatile private var targetNumExecutors =
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index a8781636f2..5aeaf44732 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -292,6 +292,9 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}
+
+ case RetrieveLastAllocatedExecutorId =>
+ context.reply(currentExecutorIdCounter)
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {