aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-16 11:43:18 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-16 11:43:18 -0800
commit24477d2705bcf2a851acc241deb8376c5450dc73 (patch)
treece7a35acdc88edc37dba68796aa7e40385e77264 /yarn
parentace0db47141ffd457c2091751038fc291f6d5a8b (diff)
downloadspark-24477d2705bcf2a851acc241deb8376c5450dc73.tar.gz
spark-24477d2705bcf2a851acc241deb8376c5450dc73.tar.bz2
spark-24477d2705bcf2a851acc241deb8376c5450dc73.zip
[SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue
Currently if dynamic allocation is enabled, explicitly killing executor will not get response, so the executor metadata is wrong in driver side. Which will make dynamic allocation on Yarn fail to work. The problem is `disableExecutor` returns false for pending killing executors when `onDisconnect` is detected, so no further implementation is done. One solution is to bypass these explicitly killed executors to use `super.onDisconnect` to remove executor. This is simple. Another solution is still querying the loss reason for these explicitly kill executors. Since executor may get killed and informed in the same AM-RM communication, so current way of adding pending loss reason request is not worked (container complete is already processed), here we should store this loss reason for later query. Here this PR chooses solution 2. Please help to review. vanzin I think this part is changed by you previously, would you please help to review? Thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #9684 from jerryshao/SPARK-11718.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala30
1 files changed, 24 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4d9e777cb4..7e39c3ea56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
@@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
// was lost.
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+ // Maintain loss reasons for already released executors, it will be added when executor loss
+ // reason is got from AM-RM call, and be removed after querying this loss reason.
+ private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
- val container = executorIdToContainer.remove(executorId).get
- containerIdToExecutorId.remove(container.getId)
+ val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
@@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
- pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
- // Notify application of executor loss reasons so it can decide whether it should abort
- pendingRequests.foreach(_.reply(exitReason))
+ pendingLossReasonRequests.remove(eid) match {
+ case Some(pendingRequests) =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+
+ case None =>
+ // We cannot find executor for pending reasons. This is because completed container
+ // is processed before querying pending result. We should store it for later query.
+ // This is usually happened when explicitly killing a container, the result will be
+ // returned in one AM-RM communication. So query RPC will be later than this completed
+ // container process.
+ releasedExecutorLossReasons.put(eid, exitReason)
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
@@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else if (releasedExecutorLossReasons.contains(eid)) {
+ // Executor is already released explicitly before getting the loss reason, so directly send
+ // the pre-stored lost reason
+ context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ context.sendFailure(
+ new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}