aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala36
5 files changed, 107 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 33edf25043..47a5cbff49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,15 @@ private[spark] object ExecutorExited {
}
}
+/**
+ * A loss reason that means we don't yet know why the executor exited.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the real loss reason
+ * is known.
+ */
+private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
+
private[spark]
case class SlaveLost(_message: String = "Slave lost")
extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1c7bfe89c0..43d7d80b7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -468,11 +468,20 @@ private[spark] class TaskSchedulerImpl(
removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
- // We may get multiple executorLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor " + executorId + " (already removed): " + reason)
+ executorIdToHost.get(executorId) match {
+ case Some(_) =>
+ // If the host mapping still exists, it means we don't know the loss reason for the
+ // executor. So call removeExecutor() to update tasks running on that executor when
+ // the real loss reason is finally known.
+ removeExecutor(executorId, reason)
+
+ case None =>
+ // We may get multiple executorLost() calls with different loss reasons. For example,
+ // one may be triggered by a dropped connection from the slave while another may be a
+ // report of executor termination from Mesos. We produce log messages for both so we
+ // eventually report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
+ }
}
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -482,7 +491,11 @@ private[spark] class TaskSchedulerImpl(
}
}
- /** Remove an executor from all our data structures and mark it as lost */
+ /**
+ * Remove an executor from all our data structures and mark it as lost. If the executor's loss
+ * reason is not yet known, do not yet remove its association with its host nor update the status
+ * of any running tasks, since the loss reason defines whether we'll fail those tasks.
+ */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
@@ -497,8 +510,11 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- executorIdToHost -= executorId
- rootPool.executorLost(executorId, host, reason)
+
+ if (reason != LossReasonPending) {
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host, reason)
+ }
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ebce5021b1..f71d98feac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -73,6 +73,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The number of pending tasks which is locality required
protected var localityAwareTasks = 0
+ // Executors that have been lost, but for which we don't yet know the real exit reason.
+ protected val executorsPendingLossReason = new HashSet[String]
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -184,7 +187,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
+ val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
@@ -202,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Filter out executors under killing
- if (!executorsPendingToRemove.contains(executorId)) {
+ if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
@@ -210,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ private def executorIsAlive(executorId: String): Boolean = synchronized {
+ !executorsPendingToRemove.contains(executorId) &&
+ !executorsPendingLossReason.contains(executorId)
+ }
+
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
@@ -246,6 +254,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingToRemove -= executorId
+ executorsPendingLossReason -= executorId
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
@@ -256,6 +265,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ /**
+ * Stop making resource offers for the given executor. The executor is marked as lost with
+ * the loss reason still pending.
+ *
+ * @return Whether executor was alive.
+ */
+ protected def disableExecutor(executorId: String): Boolean = {
+ val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+ if (executorIsAlive(executorId)) {
+ executorsPendingLossReason += executorId
+ true
+ } else {
+ false
+ }
+ }
+
+ if (shouldDisable) {
+ logInfo(s"Disabling executor $executorId.")
+ scheduler.executorLost(executorId, LossReasonPending)
+ }
+
+ shouldDisable
+ }
+
override def onStop() {
reviveThread.shutdownNow()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d75d6f673e..80da37b09b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -115,15 +115,12 @@ private[spark] abstract class YarnSchedulerBackend(
* (e.g., preemption), according to the application master, then we pass that information down
* to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
* not count towards a job failure.
- *
- * TODO there's a race condition where while we are querying the ApplicationMaster for
- * the executor loss reason, there is the potential that tasks will be scheduled on
- * the executor that failed. We should fix this by having this onDisconnected event
- * also "blacklist" executors so that tasks are not assigned to them.
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
- yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ if (disableExecutor(executorId)) {
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index c2edd4c317..2afb595e6f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
}
}
+ test("tasks are not re-scheduled while executor loss reason is pending") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val attempt1 = FakeTask.createTaskSet(1)
+
+ // submit attempt 1, offer resources, task gets scheduled
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
+ assert(1 === taskDescriptions.length)
+
+ // mark executor0 as dead but pending fail reason
+ taskScheduler.executorLost("executor0", LossReasonPending)
+
+ // offer some more resources on a different executor, nothing should change
+ val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(0 === taskDescriptions2.length)
+
+ // provide the actual loss reason for executor0
+ taskScheduler.executorLost("executor0", SlaveLost("oops"))
+
+ // executor0's tasks should have failed now that the loss reason is known, so offering more
+ // resources should make them be scheduled on the new executor.
+ val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(1 === taskDescriptions3.length)
+ assert("executor1" === taskDescriptions3(0).executorId)
+ }
+
}