diff options
12 files changed, 92 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 34c0696bfc..ac09c6c497 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -135,7 +135,7 @@ private[deploy] object DeployMessages { } case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], - exitStatus: Option[Int]) + exitStatus: Option[Int], workerLost: Boolean) case class ApplicationRemoved(message: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 7a60f08aad..93f58ce637 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -174,12 +174,12 @@ private[spark] class StandaloneAppClient( cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) - case ExecutorUpdated(id, state, message, exitStatus) => + case ExecutorUpdated(id, state, message, exitStatus, workerLost) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) } case MasterChanged(masterRef, masterWebUiUrl) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 370b16ce42..64255ec92b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -36,5 +36,6 @@ private[spark] trait StandaloneAppClientListener { def executorAdded( fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit - def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit + def executorRemoved( + fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index dfffc47703..dcf41638e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -252,7 +252,7 @@ private[deploy] class Master( appInfo.resetRetryCount() } - exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) + exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app @@ -766,7 +766,7 @@ private[deploy] class Master( for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( - exec.id, ExecutorState.LOST, Some("worker lost"), None)) + exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4eb7c81f9e..dd47c1dbbe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -239,8 +239,8 @@ class DAGScheduler( /** * Called by TaskScheduler implementation when an executor fails. */ - def executorLost(execId: String): Unit = { - eventProcessLoop.post(ExecutorLost(execId)) + def executorLost(execId: String, reason: ExecutorLossReason): Unit = { + eventProcessLoop.post(ExecutorLost(execId, reason)) } /** @@ -1281,7 +1281,7 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch)) + handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) } } @@ -1306,15 +1306,16 @@ class DAGScheduler( * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * * We will also assume that we've lost all shuffle blocks associated with the executor if the - * executor serves its own blocks (i.e., we're not using external shuffle) OR a FetchFailed - * occurred, in which case we presume all shuffle data related to this executor to be lost. + * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave + * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we + * presume all shuffle data related to this executor to be lost. * * Optionally the epoch during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ private[scheduler] def handleExecutorLost( execId: String, - fetchFailed: Boolean, + filesLost: Boolean, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { @@ -1322,7 +1323,8 @@ class DAGScheduler( logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) - if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) { + if (filesLost || !env.blockManager.externalShuffleServiceEnabled) { + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleIdToMapStage) { stage.removeOutputsOnExecutor(execId) @@ -1624,8 +1626,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) - case ExecutorLost(execId) => - dagScheduler.handleExecutorLost(execId, fetchFailed = false) + case ExecutorLost(execId, reason) => + val filesLost = reason match { + case SlaveLost(_, true) => true + case _ => false + } + dagScheduler.handleExecutorLost(execId, filesLost) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 8c76112482..03781a2a2b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -77,7 +77,8 @@ private[scheduler] case class CompletionEvent( private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent -private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent +private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason) + extends DAGSchedulerEvent private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 642bf81ac0..46a35b6a2e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -51,6 +51,10 @@ private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed */ private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.") +/** + * @param _message human readable loss reason + * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service) + */ private[spark] -case class SlaveLost(_message: String = "Slave lost") +case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false) extends ExecutorLossReason(_message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7d905538c6..ee5cbfeb47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -346,6 +346,7 @@ private[spark] class TaskSchedulerImpl( def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None + var reason: Option[ExecutorLossReason] = None synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -353,8 +354,9 @@ private[spark] class TaskSchedulerImpl( val execId = taskIdToExecutorId(tid) if (executorIdToTaskCount.contains(execId)) { - removeExecutor(execId, + reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) + removeExecutor(execId, reason.get) failedExecutor = Some(execId) } } @@ -387,7 +389,8 @@ private[spark] class TaskSchedulerImpl( } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor.isDefined) { - dagScheduler.executorLost(failedExecutor.get) + assert(reason.isDefined) + dagScheduler.executorLost(failedExecutor.get, reason.get) backend.reviveOffers() } } @@ -513,7 +516,7 @@ private[spark] class TaskSchedulerImpl( } // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock if (failedExecutor.isDefined) { - dagScheduler.executorLost(failedExecutor.get) + dagScheduler.executorLost(failedExecutor.get, reason) backend.reviveOffers() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 5068bf2e66..04d40e2907 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -150,10 +150,11 @@ private[spark] class StandaloneSchedulerBackend( fullId, hostPort, cores, Utils.megabytesToString(memory))) } - override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) { + override def executorRemoved( + fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) - case None => SlaveLost(message) + case None => SlaveLost(message, workerLost = workerLost) } logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 416efaa75b..bc58fb2a36 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -210,7 +210,8 @@ class AppClientSuite execAddedList.add(id) } - def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit = { + def executorRemoved( + id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { execRemovedList.add(id) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 33824749ae..6787b30261 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} @@ -201,7 +202,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def beforeEach(): Unit = { super.beforeEach() - sc = new SparkContext("local", "DAGSchedulerSuite") + init(new SparkConf()) + } + + private def init(testConf: SparkConf): Unit = { + sc = new SparkContext("local", "DAGSchedulerSuite", testConf) sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() sparkListener.failedStages.clear() @@ -621,6 +626,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + private val shuffleFileLossTests = Seq( + ("slave lost with shuffle service", SlaveLost("", false), true, false), + ("worker lost with shuffle service", SlaveLost("", true), true, true), + ("worker lost without shuffle service", SlaveLost("", true), false, true), + ("executor failure with shuffle service", ExecutorKilled, true, false), + ("executor failure without shuffle service", ExecutorKilled, false, true)) + + for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) { + val maybeLost = if (expectFileLoss) { + "lost" + } else { + "not lost" + } + test(s"shuffle files $maybeLost when $eventDescription") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString) + init(conf) + assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn) + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + runEvent(ExecutorLost("exec-hostA", event)) + if (expectFileLoss) { + intercept[MetadataFetchFailedException] { + mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) + } + } else { + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + } + } + } // Helper function to validate state when creating tests for task failures private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { @@ -628,7 +673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(stageAttempt.stageAttemptId == attempt) } - // Helper functions to extract commonly used code in Fetch Failure test cases private def setupStageAbortTest(sc: SparkContext) { sc.listenerBus.addListener(new EndListener()) @@ -1110,7 +1154,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // pretend we were told hostA went away val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA")) + runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) @@ -1241,7 +1285,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou )) // then one executor dies, and a task fails in stage 1 - runEvent(ExecutorLost("exec-hostA")) + runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) runEvent(makeCompletionEvent( taskSets(1).tasks(0), FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), @@ -1339,7 +1383,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou makeMapStatus("hostA", reduceRdd.partitions.length))) // now that host goes down - runEvent(ExecutorLost("exec-hostA")) + runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) // so we resubmit those tasks runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null)) @@ -1532,7 +1576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou submit(reduceRdd, Array(0)) // blockManagerMaster.removeExecutor("exec-hostA") // pretend we were told hostA went away - runEvent(ExecutorLost("exec-hostA")) + runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks // rather than marking it is as failed and waiting. complete(taskSets(0), Seq( @@ -1999,7 +2043,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Pretend host A was lost val oldEpoch = mapOutputTracker.getEpoch - runEvent(ExecutorLost("exec-hostA")) + runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 36d1c5690f..7d6ad08036 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -46,7 +46,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) override def executorAdded(execId: String, host: String) {} - override def executorLost(execId: String) {} + override def executorLost(execId: String, reason: ExecutorLossReason) {} override def taskSetFailed( taskSet: TaskSet, |