aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala30
1 files changed, 24 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4d9e777cb4..7e39c3ea56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
@@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
// was lost.
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+ // Maintain loss reasons for already released executors, it will be added when executor loss
+ // reason is got from AM-RM call, and be removed after querying this loss reason.
+ private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
- val container = executorIdToContainer.remove(executorId).get
- containerIdToExecutorId.remove(container.getId)
+ val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
@@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
- pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
- // Notify application of executor loss reasons so it can decide whether it should abort
- pendingRequests.foreach(_.reply(exitReason))
+ pendingLossReasonRequests.remove(eid) match {
+ case Some(pendingRequests) =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+
+ case None =>
+ // We cannot find executor for pending reasons. This is because completed container
+ // is processed before querying pending result. We should store it for later query.
+ // This is usually happened when explicitly killing a container, the result will be
+ // returned in one AM-RM communication. So query RPC will be later than this completed
+ // container process.
+ releasedExecutorLossReasons.put(eid, exitReason)
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
@@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else if (releasedExecutorLossReasons.contains(eid)) {
+ // Executor is already released explicitly before getting the loss reason, so directly send
+ // the pre-stored lost reason
+ context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ context.sendFailure(
+ new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}