aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
12 files changed, 92 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 34c0696bfc..ac09c6c497 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -135,7 +135,7 @@ private[deploy] object DeployMessages {
}
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
- exitStatus: Option[Int])
+ exitStatus: Option[Int], workerLost: Boolean)
case class ApplicationRemoved(message: String)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 7a60f08aad..93f58ce637 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -174,12 +174,12 @@ private[spark] class StandaloneAppClient(
cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
- case ExecutorUpdated(id, state, message, exitStatus) =>
+ case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
- listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
+ listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
}
case MasterChanged(masterRef, masterWebUiUrl) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 370b16ce42..64255ec92b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener {
def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
- def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
+ def executorRemoved(
+ fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dfffc47703..dcf41638e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -252,7 +252,7 @@ private[deploy] class Master(
appInfo.resetRetryCount()
}
- exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
+ exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
@@ -766,7 +766,7 @@ private[deploy] class Master(
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
- exec.id, ExecutorState.LOST, Some("worker lost"), None))
+ exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4eb7c81f9e..dd47c1dbbe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -239,8 +239,8 @@ class DAGScheduler(
/**
* Called by TaskScheduler implementation when an executor fails.
*/
- def executorLost(execId: String): Unit = {
- eventProcessLoop.post(ExecutorLost(execId))
+ def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
+ eventProcessLoop.post(ExecutorLost(execId, reason))
}
/**
@@ -1281,7 +1281,7 @@ class DAGScheduler(
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
+ handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
}
}
@@ -1306,15 +1306,16 @@ class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
- * executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed
- * occurred, in which case we presume all shuffle data related to this executor to be lost.
+ * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
+ * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
+ * presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
private[scheduler] def handleExecutorLost(
execId: String,
- fetchFailed: Boolean,
+ filesLost: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
@@ -1322,7 +1323,8 @@ class DAGScheduler(
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
- if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
+ if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
+ logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
stage.removeOutputsOnExecutor(execId)
@@ -1624,8 +1626,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
- case ExecutorLost(execId) =>
- dagScheduler.handleExecutorLost(execId, fetchFailed = false)
+ case ExecutorLost(execId, reason) =>
+ val filesLost = reason match {
+ case SlaveLost(_, true) => true
+ case _ => false
+ }
+ dagScheduler.handleExecutorLost(execId, filesLost)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 8c76112482..03781a2a2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent(
private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
-private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
+private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
+ extends DAGSchedulerEvent
private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 642bf81ac0..46a35b6a2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed
*/
private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
+/**
+ * @param _message human readable loss reason
+ * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
+ */
private[spark]
-case class SlaveLost(_message: String = "Slave lost")
+case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7d905538c6..ee5cbfeb47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -346,6 +346,7 @@ private[spark] class TaskSchedulerImpl(
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
+ var reason: Option[ExecutorLossReason] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -353,8 +354,9 @@ private[spark] class TaskSchedulerImpl(
val execId = taskIdToExecutorId(tid)
if (executorIdToTaskCount.contains(execId)) {
- removeExecutor(execId,
+ reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
+ removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
@@ -387,7 +389,8 @@ private[spark] class TaskSchedulerImpl(
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
- dagScheduler.executorLost(failedExecutor.get)
+ assert(reason.isDefined)
+ dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
@@ -513,7 +516,7 @@ private[spark] class TaskSchedulerImpl(
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor.isDefined) {
- dagScheduler.executorLost(failedExecutor.get)
+ dagScheduler.executorLost(failedExecutor.get, reason)
backend.reviveOffers()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 5068bf2e66..04d40e2907 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -150,10 +150,11 @@ private[spark] class StandaloneSchedulerBackend(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
- override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
+ override def executorRemoved(
+ fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
- case None => SlaveLost(message)
+ case None => SlaveLost(message, workerLost = workerLost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 416efaa75b..bc58fb2a36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -210,7 +210,8 @@ class AppClientSuite
execAddedList.add(id)
}
- def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = {
+ def executorRemoved(
+ id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
execRemovedList.add(id)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 33824749ae..6787b30261 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
@@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def beforeEach(): Unit = {
super.beforeEach()
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ init(new SparkConf())
+ }
+
+ private def init(testConf: SparkConf): Unit = {
+ sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
@@ -621,6 +626,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
+ private val shuffleFileLossTests = Seq(
+ ("slave lost with shuffle service", SlaveLost("", false), true, false),
+ ("worker lost with shuffle service", SlaveLost("", true), true, true),
+ ("worker lost without shuffle service", SlaveLost("", true), false, true),
+ ("executor failure with shuffle service", ExecutorKilled, true, false),
+ ("executor failure without shuffle service", ExecutorKilled, false, true))
+
+ for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
+ val maybeLost = if (expectFileLoss) {
+ "lost"
+ } else {
+ "not lost"
+ }
+ test(s"shuffle files $maybeLost when $eventDescription") {
+ // reset the test context with the right shuffle service config
+ afterEach()
+ val conf = new SparkConf()
+ conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+ init(conf)
+ assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+ submit(reduceRdd, Array(0))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
+ runEvent(ExecutorLost("exec-hostA", event))
+ if (expectFileLoss) {
+ intercept[MetadataFetchFailedException] {
+ mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
+ }
+ } else {
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ }
+ }
+ }
// Helper function to validate state when creating tests for task failures
private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
@@ -628,7 +673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(stageAttempt.stageAttemptId == attempt)
}
-
// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
@@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
@@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
))
// then one executor dies, and a task fails in stage 1
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
@@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
makeMapStatus("hostA", reduceRdd.partitions.length)))
// now that host goes down
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// so we resubmit those tasks
runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
@@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
@@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// Pretend host A was lost
val oldEpoch = mapOutputTracker.getEpoch
- runEvent(ExecutorLost("exec-hostA"))
+ runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 36d1c5690f..7d6ad08036 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
override def executorAdded(execId: String, host: String) {}
- override def executorLost(execId: String) {}
+ override def executorLost(execId: String, reason: ExecutorLossReason) {}
override def taskSetFailed(
taskSet: TaskSet,