From c15b552dd547a129c7f0d082dab4eebbd64bee02 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 27 Jun 2016 16:38:03 -0500 Subject: [SPARK-16106][CORE] TaskSchedulerImpl should properly track executors added to existing hosts ## What changes were proposed in this pull request? TaskSchedulerImpl used to only set `newExecAvailable` when a new *host* was added, not when a new executor was added to an existing host. It also didn't update some internal state tracking live executors until a task was scheduled on the executor. This patch changes it to properly update as soon as it knows about a new executor. ## How was this patch tested? added a unit test, ran everything via jenkins. Author: Imran Rashid Closes #13826 from squito/SPARK-16106_executorByHosts. --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 8 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 168 +++++++++++++-------- 2 files changed, 111 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4282606589..821e3ee7f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -266,7 +266,6 @@ private[spark] class TaskSchedulerImpl( taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 - executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true @@ -293,11 +292,14 @@ private[spark] class TaskSchedulerImpl( // Also track if new executor is added var newExecAvail = false for (o <- offers) { - executorIdToHost(o.executorId) = o.host - executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() + } + if (!executorIdToTaskCount.contains(o.executorId)) { + executorsByHost(o.host) += o.executorId executorAdded(o.executorId, o.host) + executorIdToHost(o.executorId) = o.host + executorIdToTaskCount(o.executorId) = 0 newExecAvail = true } for (rack <- getRackForHost(o.host)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a09a602d13..34b8d55339 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import org.scalatest.BeforeAndAfterEach + import org.apache.spark._ import org.apache.spark.internal.Logging @@ -27,18 +29,63 @@ class FakeSchedulerBackend extends SchedulerBackend { def defaultParallelism(): Int = 1 } -class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with Logging { +class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach + with Logging { - test("Scheduler does not always schedule tasks on the same workers") { + + var failedTaskSetException: Option[Throwable] = None + var failedTaskSetReason: String = null + var failedTaskSet = false + + var taskScheduler: TaskSchedulerImpl = null + var dagScheduler: DAGScheduler = null + + override def beforeEach(): Unit = { + super.beforeEach() + failedTaskSet = false + failedTaskSetException = None + failedTaskSetReason = null + } + + override def afterEach(): Unit = { + super.afterEach() + if (taskScheduler != null) { + taskScheduler.stop() + taskScheduler = null + } + if (dagScheduler != null) { + dagScheduler.stop() + dagScheduler = null + } + } + + def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) + confs.foreach { case (k, v) => + sc.conf.set(k, v) + } + taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} + dagScheduler = new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} + override def executorAdded(execId: String, host: String): Unit = {} + override def taskSetFailed( + taskSet: TaskSet, + reason: String, + exception: Option[Throwable]): Unit = { + // Normally the DAGScheduler puts this in the event loop, which will eventually fail + // dependent jobs + failedTaskSet = true + failedTaskSetReason = reason + failedTaskSetException = exception + } } + taskScheduler + } + test("Scheduler does not always schedule tasks on the same workers") { + val taskScheduler = setupScheduler() val numFreeCores = 1 val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) @@ -58,20 +105,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) assert(count > 0) assert(count < numTrials) + assert(!failedTaskSet) } test("Scheduler correctly accounts for multiple CPUs per task") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskCpus = 2 - - sc.conf.set("spark.task.cpus", taskCpus.toString) - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } + val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) @@ -96,22 +135,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(1 === taskDescriptions.length) assert("executor0" === taskDescriptions(0).executorId) + assert(!failedTaskSet) } test("Scheduler does not crash when tasks are not serializable") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskCpus = 2 - - sc.conf.set("spark.task.cpus", taskCpus.toString) - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - val dagScheduler = new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } + val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) val numFreeCores = 1 - taskScheduler.setDAGScheduler(dagScheduler) val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), @@ -119,26 +149,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L taskScheduler.submitTasks(taskSet) var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(0 === taskDescriptions.length) + assert(failedTaskSet) + assert(failedTaskSetReason.contains("Failed to serialize task")) // Now check that we can still submit tasks - // Even if one of the tasks has not-serializable tasks, the other task set should + // Even if one of the task sets has not-serializable tasks, the other task set should // still be processed without error - taskScheduler.submitTasks(taskSet) taskScheduler.submitTasks(FakeTask.createTaskSet(1)) + taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - val dagScheduler = new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } - taskScheduler.setDAGScheduler(dagScheduler) + val taskScheduler = setupScheduler() val attempt1 = FakeTask.createTaskSet(1, 0) val attempt2 = FakeTask.createTaskSet(1, 1) taskScheduler.submitTasks(attempt1) @@ -153,17 +177,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) .get.isZombie = true taskScheduler.submitTasks(attempt3) + assert(!failedTaskSet) } test("don't schedule more tasks after a taskset is zombie") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } + val taskScheduler = setupScheduler() val numFreeCores = 1 val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) @@ -191,17 +209,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert(1 === taskDescriptions3.length) val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get assert(mgr.taskSet.stageAttemptId === 1) + assert(!failedTaskSet) } test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } + val taskScheduler = setupScheduler() val numFreeCores = 10 val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) @@ -236,17 +248,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get assert(mgr.taskSet.stageAttemptId === 1) } + assert(!failedTaskSet) } test("tasks are not re-scheduled while executor loss reason is pending") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") - val taskScheduler = new TaskSchedulerImpl(sc) - taskScheduler.initialize(new FakeSchedulerBackend) - // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. - new DAGScheduler(sc, taskScheduler) { - override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} - override def executorAdded(execId: String, host: String) {} - } + val taskScheduler = setupScheduler() val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1)) @@ -272,6 +278,44 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten assert(1 === taskDescriptions3.length) assert("executor1" === taskDescriptions3(0).executorId) + assert(!failedTaskSet) } + test("SPARK-16106 locality levels updated if executor added to existing host") { + val taskScheduler = setupScheduler() + + taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0, + (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _* + )) + + val taskDescs = taskScheduler.resourceOffers(Seq( + new WorkerOffer("executor0", "host0", 1), + new WorkerOffer("executor1", "host1", 1) + )).flatten + // only schedule one task because of locality + assert(taskDescs.size === 1) + + val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get + assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY)) + // we should know about both executors, even though we only scheduled tasks on one of them + assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0"))) + assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1"))) + + // when executor2 is added, we should realize that we can run process-local tasks. + // And we should know its alive on the host. + val secondTaskDescs = taskScheduler.resourceOffers( + Seq(new WorkerOffer("executor2", "host0", 1))).flatten + assert(secondTaskDescs.size === 1) + assert(mgr.myLocalityLevels.toSet === + Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY)) + assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0", "executor2"))) + assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1"))) + + // And even if we don't have anything left to schedule, another resource offer on yet another + // executor should also update the set of live executors + val thirdTaskDescs = taskScheduler.resourceOffers( + Seq(new WorkerOffer("executor3", "host1", 1))).flatten + assert(thirdTaskDescs.size === 0) + assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) + } } -- cgit v1.2.3