aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-04 09:07:22 -0800
commit8790ee6d69e50ca84eb849742be48f2476743b5b (patch)
treed99e02c1bc02a92135c7da2f5745e57fd04663af
parent9b214cea896056e7d0a69ae9d3c282e1f027d5b9 (diff)
downloadspark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.gz
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.tar.bz2
spark-8790ee6d69e50ca84eb849742be48f2476743b5b.zip
[SPARK-10622][CORE][YARN] Differentiate dead from "mostly dead" executors.
In YARN mode, when preemption is enabled, we may leave executors in a zombie state while we wait to retrieve the reason for which the executor exited. This is so that we don't account for failed tasks that were running on a preempted executor. The issue is that while we wait for this information, the scheduler might decide to schedule tasks on the executor, which will never be able to run them. Other side effects include the block manager still considering the executor available to cache blocks, for example. So, when we know that an executor went down but we don't know why, stop everything related to the executor, except its running tasks. Only when we know the reason for the exit (or give up waiting for it) we do update the running tasks. This is achieved by a new `disableExecutor()` method in the `Schedulable` interface. For managers that do not behave like this (i.e. every one but YARN), the existing `executorLost()` method will behave the same way it did before. On top of that change, a few minor changes that made debugging easier, and fixed some other minor issues: - The cluster-mode AM was printing a misleading log message every time an executor disconnected from the driver (because the akka actor system was shared between driver and AM). - Avoid sending unnecessary requests for an executor's exit reason when we already know it was explicitly disabled / killed. This avoids both multiple requests, and unnecessary requests that would just cause warning messages on the AM (in the explicit kill case). - Tone down a log message about the executor being lost when it exited normally (e.g. preemption) - Wake up the AM monitor thread when requests for executor loss reasons arrive too, so that we can more quickly remove executors from this zombie state. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #8887 from vanzin/SPARK-10622.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala36
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala5
7 files changed, 157 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 33edf25043..47a5cbff49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,15 @@ private[spark] object ExecutorExited {
}
}
+/**
+ * A loss reason that means we don't yet know why the executor exited.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the real loss reason
+ * is known.
+ */
+private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
+
private[spark]
case class SlaveLost(_message: String = "Slave lost")
extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1c7bfe89c0..43d7d80b7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -468,11 +468,20 @@ private[spark] class TaskSchedulerImpl(
removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
- // We may get multiple executorLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor " + executorId + " (already removed): " + reason)
+ executorIdToHost.get(executorId) match {
+ case Some(_) =>
+ // If the host mapping still exists, it means we don't know the loss reason for the
+ // executor. So call removeExecutor() to update tasks running on that executor when
+ // the real loss reason is finally known.
+ removeExecutor(executorId, reason)
+
+ case None =>
+ // We may get multiple executorLost() calls with different loss reasons. For example,
+ // one may be triggered by a dropped connection from the slave while another may be a
+ // report of executor termination from Mesos. We produce log messages for both so we
+ // eventually report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
+ }
}
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -482,7 +491,11 @@ private[spark] class TaskSchedulerImpl(
}
}
- /** Remove an executor from all our data structures and mark it as lost */
+ /**
+ * Remove an executor from all our data structures and mark it as lost. If the executor's loss
+ * reason is not yet known, do not yet remove its association with its host nor update the status
+ * of any running tasks, since the loss reason defines whether we'll fail those tasks.
+ */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
@@ -497,8 +510,11 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- executorIdToHost -= executorId
- rootPool.executorLost(executorId, host, reason)
+
+ if (reason != LossReasonPending) {
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host, reason)
+ }
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ebce5021b1..f71d98feac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -73,6 +73,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The number of pending tasks which is locality required
protected var localityAwareTasks = 0
+ // Executors that have been lost, but for which we don't yet know the real exit reason.
+ protected val executorsPendingLossReason = new HashSet[String]
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -184,7 +187,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
+ val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
@@ -202,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Filter out executors under killing
- if (!executorsPendingToRemove.contains(executorId)) {
+ if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
@@ -210,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ private def executorIsAlive(executorId: String): Boolean = synchronized {
+ !executorsPendingToRemove.contains(executorId) &&
+ !executorsPendingLossReason.contains(executorId)
+ }
+
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
@@ -246,6 +254,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingToRemove -= executorId
+ executorsPendingLossReason -= executorId
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
@@ -256,6 +265,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ /**
+ * Stop making resource offers for the given executor. The executor is marked as lost with
+ * the loss reason still pending.
+ *
+ * @return Whether executor was alive.
+ */
+ protected def disableExecutor(executorId: String): Boolean = {
+ val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+ if (executorIsAlive(executorId)) {
+ executorsPendingLossReason += executorId
+ true
+ } else {
+ false
+ }
+ }
+
+ if (shouldDisable) {
+ logInfo(s"Disabling executor $executorId.")
+ scheduler.executorLost(executorId, LossReasonPending)
+ }
+
+ shouldDisable
+ }
+
override def onStop() {
reviveThread.shutdownNow()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d75d6f673e..80da37b09b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -115,15 +115,12 @@ private[spark] abstract class YarnSchedulerBackend(
* (e.g., preemption), according to the application master, then we pass that information down
* to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
* not count towards a job failure.
- *
- * TODO there's a race condition where while we are querying the ApplicationMaster for
- * the executor loss reason, there is the potential that tasks will be scheduled on
- * the executor that failed. We should fix this by having this onDisconnected event
- * also "blacklist" executors so that tasks are not assigned to them.
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
- yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ if (disableExecutor(executorId)) {
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index c2edd4c317..2afb595e6f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
}
}
+ test("tasks are not re-scheduled while executor loss reason is pending") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val attempt1 = FakeTask.createTaskSet(1)
+
+ // submit attempt 1, offer resources, task gets scheduled
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
+ assert(1 === taskDescriptions.length)
+
+ // mark executor0 as dead but pending fail reason
+ taskScheduler.executorLost("executor0", LossReasonPending)
+
+ // offer some more resources on a different executor, nothing should change
+ val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(0 === taskDescriptions2.length)
+
+ // provide the actual loss reason for executor0
+ taskScheduler.executorLost("executor0", SlaveLost("oops"))
+
+ // executor0's tasks should have failed now that the loss reason is known, so offering more
+ // resources should make them be scheduled on the new executor.
+ val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(1 === taskDescriptions3.length)
+ assert("executor1" === taskDescriptions3(0).executorId)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 12ae350e4c..50ae7ffeec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -87,8 +87,27 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
+
+ // Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()
+ // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+ // requests to RM.
+ private val heartbeatInterval = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ math.max(0, math.min(expiryInterval / 2,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+ }
+
+ // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+ // being requested.
+ private val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+ // Next wait interval before allocator poll.
+ private var nextAllocationInterval = initialAllocationInterval
+
// Fields used in client mode.
private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _
@@ -332,19 +351,6 @@ private[spark] class ApplicationMaster(
}
private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
-
- // we want to check more frequently for pending containers
- val initialAllocationInterval = math.min(heartbeatInterval,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
-
- var nextAllocationInterval = initialAllocationInterval
-
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -377,19 +383,19 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
- val sleepInterval =
- if (numPendingAllocate > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
allocatorLock.synchronized {
+ val sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Sleeping for $sleepInterval.")
allocatorLock.wait(sleepInterval)
}
} catch {
@@ -560,6 +566,11 @@ private[spark] class ApplicationMaster(
userThread
}
+ private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+ nextAllocationInterval = initialAllocationInterval
+ allocatorLock.notifyAll()
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
@@ -581,11 +592,9 @@ private[spark] class ApplicationMaster(
case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
- allocatorLock.synchronized {
- if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
- localityAwareTasks, hostToLocalTaskCount)) {
- allocatorLock.notifyAll()
- }
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
+ resetAllocatorInterval()
}
case None =>
@@ -603,17 +612,19 @@ private[spark] class ApplicationMaster(
case GetExecutorLossReason(eid) =>
Option(allocator) match {
- case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
- case None => logWarning(s"Container allocator is not ready to find" +
- s" executor loss reasons yet.")
+ case Some(a) =>
+ a.enqueueGetLossReasonRequest(eid, context)
+ resetAllocatorInterval()
+ case None =>
+ logWarning("Container allocator is not ready to find executor loss reasons yet.")
}
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a0cf1b4aa4..4d9e777cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -550,6 +550,10 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+ pendingLossReasonRequests.size
+ }
+
/**
* Split the pending container requests into 3 groups based on current localities of pending
* tasks.
@@ -582,6 +586,7 @@ private[yarn] class YarnAllocator(
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}
+
}
private object YarnAllocator {