aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala36
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala79
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala5
7 files changed, 157 insertions, 50 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 33edf25043..47a5cbff49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,15 @@ private[spark] object ExecutorExited {
}
}
+/**
+ * A loss reason that means we don't yet know why the executor exited.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the real loss reason
+ * is known.
+ */
+private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
+
private[spark]
case class SlaveLost(_message: String = "Slave lost")
extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1c7bfe89c0..43d7d80b7a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -468,11 +468,20 @@ private[spark] class TaskSchedulerImpl(
removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
- // We may get multiple executorLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor " + executorId + " (already removed): " + reason)
+ executorIdToHost.get(executorId) match {
+ case Some(_) =>
+ // If the host mapping still exists, it means we don't know the loss reason for the
+ // executor. So call removeExecutor() to update tasks running on that executor when
+ // the real loss reason is finally known.
+ removeExecutor(executorId, reason)
+
+ case None =>
+ // We may get multiple executorLost() calls with different loss reasons. For example,
+ // one may be triggered by a dropped connection from the slave while another may be a
+ // report of executor termination from Mesos. We produce log messages for both so we
+ // eventually report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
+ }
}
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -482,7 +491,11 @@ private[spark] class TaskSchedulerImpl(
}
}
- /** Remove an executor from all our data structures and mark it as lost */
+ /**
+ * Remove an executor from all our data structures and mark it as lost. If the executor's loss
+ * reason is not yet known, do not yet remove its association with its host nor update the status
+ * of any running tasks, since the loss reason defines whether we'll fail those tasks.
+ */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
@@ -497,8 +510,11 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- executorIdToHost -= executorId
- rootPool.executorLost(executorId, host, reason)
+
+ if (reason != LossReasonPending) {
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host, reason)
+ }
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ebce5021b1..f71d98feac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -73,6 +73,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The number of pending tasks which is locality required
protected var localityAwareTasks = 0
+ // Executors that have been lost, but for which we don't yet know the real exit reason.
+ protected val executorsPendingLossReason = new HashSet[String]
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -184,7 +187,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
- val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
+ val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
@@ -202,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Filter out executors under killing
- if (!executorsPendingToRemove.contains(executorId)) {
+ if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
@@ -210,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ private def executorIsAlive(executorId: String): Boolean = synchronized {
+ !executorsPendingToRemove.contains(executorId) &&
+ !executorsPendingLossReason.contains(executorId)
+ }
+
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
@@ -246,6 +254,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingToRemove -= executorId
+ executorsPendingLossReason -= executorId
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
@@ -256,6 +265,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
+ /**
+ * Stop making resource offers for the given executor. The executor is marked as lost with
+ * the loss reason still pending.
+ *
+ * @return Whether executor was alive.
+ */
+ protected def disableExecutor(executorId: String): Boolean = {
+ val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+ if (executorIsAlive(executorId)) {
+ executorsPendingLossReason += executorId
+ true
+ } else {
+ false
+ }
+ }
+
+ if (shouldDisable) {
+ logInfo(s"Disabling executor $executorId.")
+ scheduler.executorLost(executorId, LossReasonPending)
+ }
+
+ shouldDisable
+ }
+
override def onStop() {
reviveThread.shutdownNow()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d75d6f673e..80da37b09b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -115,15 +115,12 @@ private[spark] abstract class YarnSchedulerBackend(
* (e.g., preemption), according to the application master, then we pass that information down
* to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
* not count towards a job failure.
- *
- * TODO there's a race condition where while we are querying the ApplicationMaster for
- * the executor loss reason, there is the potential that tasks will be scheduled on
- * the executor that failed. We should fix this by having this onDisconnected event
- * also "blacklist" executors so that tasks are not assigned to them.
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
- yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ if (disableExecutor(executorId)) {
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index c2edd4c317..2afb595e6f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
}
}
+ test("tasks are not re-scheduled while executor loss reason is pending") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val attempt1 = FakeTask.createTaskSet(1)
+
+ // submit attempt 1, offer resources, task gets scheduled
+ taskScheduler.submitTasks(attempt1)
+ val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
+ assert(1 === taskDescriptions.length)
+
+ // mark executor0 as dead but pending fail reason
+ taskScheduler.executorLost("executor0", LossReasonPending)
+
+ // offer some more resources on a different executor, nothing should change
+ val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(0 === taskDescriptions2.length)
+
+ // provide the actual loss reason for executor0
+ taskScheduler.executorLost("executor0", SlaveLost("oops"))
+
+ // executor0's tasks should have failed now that the loss reason is known, so offering more
+ // resources should make them be scheduled on the new executor.
+ val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
+ assert(1 === taskDescriptions3.length)
+ assert("executor1" === taskDescriptions3(0).executorId)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 12ae350e4c..50ae7ffeec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -87,8 +87,27 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
+
+ // Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()
+ // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+ // requests to RM.
+ private val heartbeatInterval = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ math.max(0, math.min(expiryInterval / 2,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+ }
+
+ // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+ // being requested.
+ private val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+ // Next wait interval before allocator poll.
+ private var nextAllocationInterval = initialAllocationInterval
+
// Fields used in client mode.
private var rpcEnv: RpcEnv = null
private var amEndpoint: RpcEndpointRef = _
@@ -332,19 +351,6 @@ private[spark] class ApplicationMaster(
}
private def launchReporterThread(): Thread = {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
-
- // we want to check more frequently for pending containers
- val initialAllocationInterval = math.min(heartbeatInterval,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
-
- var nextAllocationInterval = initialAllocationInterval
-
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -377,19 +383,19 @@ private[spark] class ApplicationMaster(
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
- val sleepInterval =
- if (numPendingAllocate > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- logDebug(s"Number of pending allocations is $numPendingAllocate. " +
- s"Sleeping for $sleepInterval.")
allocatorLock.synchronized {
+ val sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Sleeping for $sleepInterval.")
allocatorLock.wait(sleepInterval)
}
} catch {
@@ -560,6 +566,11 @@ private[spark] class ApplicationMaster(
userThread
}
+ private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+ nextAllocationInterval = initialAllocationInterval
+ allocatorLock.notifyAll()
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
*/
@@ -581,11 +592,9 @@ private[spark] class ApplicationMaster(
case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
Option(allocator) match {
case Some(a) =>
- allocatorLock.synchronized {
- if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
- localityAwareTasks, hostToLocalTaskCount)) {
- allocatorLock.notifyAll()
- }
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
+ resetAllocatorInterval()
}
case None =>
@@ -603,17 +612,19 @@ private[spark] class ApplicationMaster(
case GetExecutorLossReason(eid) =>
Option(allocator) match {
- case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
- case None => logWarning(s"Container allocator is not ready to find" +
- s" executor loss reasons yet.")
+ case Some(a) =>
+ a.enqueueGetLossReasonRequest(eid, context)
+ resetAllocatorInterval()
+ case None =>
+ logWarning("Container allocator is not ready to find executor loss reasons yet.")
}
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a0cf1b4aa4..4d9e777cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -550,6 +550,10 @@ private[yarn] class YarnAllocator(
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
+ private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+ pendingLossReasonRequests.size
+ }
+
/**
* Split the pending container requests into 3 groups based on current localities of pending
* tasks.
@@ -582,6 +586,7 @@ private[yarn] class YarnAllocator(
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}
+
}
private object YarnAllocator {