aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala30
3 files changed, 29 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 43d7d80b7a..5f136690f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
// If the host mapping still exists, it means we don't know the loss reason for the
// executor. So call removeExecutor() to update tasks running on that executor when
// the real loss reason is finally known.
+ logError(s"Actual reason for lost executor $executorId: ${reason.message}")
removeExecutor(executorId, reason)
case None =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71d98feac..3373caf0d1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
*
- * @return Whether executor was alive.
+ * @return Whether executor should be disabled
*/
protected def disableExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
@@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorsPendingLossReason += executorId
true
} else {
- false
+ // Returns true for explicitly killed executors, we also need to get pending loss reasons;
+ // For others return false.
+ executorsPendingToRemove.contains(executorId)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4d9e777cb4..7e39c3ea56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
@@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
// was lost.
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+ // Maintain loss reasons for already released executors, it will be added when executor loss
+ // reason is got from AM-RM call, and be removed after querying this loss reason.
+ private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
- val container = executorIdToContainer.remove(executorId).get
- containerIdToExecutorId.remove(container.getId)
+ val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
@@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
- pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
- // Notify application of executor loss reasons so it can decide whether it should abort
- pendingRequests.foreach(_.reply(exitReason))
+ pendingLossReasonRequests.remove(eid) match {
+ case Some(pendingRequests) =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+
+ case None =>
+ // We cannot find executor for pending reasons. This is because completed container
+ // is processed before querying pending result. We should store it for later query.
+ // This is usually happened when explicitly killing a container, the result will be
+ // returned in one AM-RM communication. So query RPC will be later than this completed
+ // container process.
+ releasedExecutorLossReasons.put(eid, exitReason)
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
@@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else if (releasedExecutorLossReasons.contains(eid)) {
+ // Executor is already released explicitly before getting the loss reason, so directly send
+ // the pre-stored lost reason
+ context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ context.sendFailure(
+ new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}