From 7f779e7439127efa0e3611f7745e1c8423845198 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 29 Sep 2016 15:36:40 -0400 Subject: [SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq ## What changes were proposed in this pull request? The Seq[WorkerOffer] is accessed by index, so it really should be an IndexedSeq, otherwise an O(n) operation becomes O(n^2). In practice this hasn't been an issue b/c where these offers are generated, the call to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by this in performance tests I was doing, and its better for the types to be more precise so eg. a change in Scala doesn't destroy performance. ## How was this patch tested? Unit tests via jenkins. Author: Imran Rashid Closes #15221 from squito/SPARK-17648. --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 +-- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +-- .../scheduler/local/LocalSchedulerBackend.scala | 2 +- .../scheduler/SchedulerIntegrationSuite.scala | 7 ++--- .../spark/scheduler/TaskSchedulerImplSuite.scala | 32 +++++++++++----------- 5 files changed, 24 insertions(+), 25 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 52a7186cbf..0ad4730fe2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { + tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId @@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl( * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ - def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index edc3c19937..2d09863166 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) - }.toSeq + }.toIndexedSeq launchTasks(scheduler.resourceOffers(workOffers)) } @@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Filter out executors under killing if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) - val workOffers = Seq( + val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index e386052814..7a73e8ed8a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -81,7 +81,7 @@ private[spark] class LocalEndpoint( } def reviveOffers() { - val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) + val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 14f52a6be9..5cd548bbc7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -366,13 +366,13 @@ private[spark] abstract class MockBackend( */ def executorIdToExecutor: Map[String, ExecutorTaskStatus] - private def generateOffers(): Seq[WorkerOffer] = { + private def generateOffers(): IndexedSeq[WorkerOffer] = { executorIdToExecutor.values.filter { exec => exec.freeCores > 0 }.map { exec => WorkerOffer(executorId = exec.executorId, host = exec.host, cores = exec.freeCores) - }.toSeq + }.toIndexedSeq } /** @@ -381,8 +381,7 @@ private[spark] abstract class MockBackend( * scheduling. */ override def reviveOffers(): Unit = { - val offers: Seq[WorkerOffer] = generateOffers() - val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten + val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual // tests from introducing a race if they need it val newTasks = taskScheduler.synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 100b15740c..61787b54f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler does not always schedule tasks on the same workers") { val taskScheduler = setupScheduler() val numFreeCores = 1 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) // Repeatedly try to schedule a 1-task job, and make sure that it doesn't always // get scheduled on the same executor. While there is a chance this test will fail @@ -112,7 +112,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val taskCpus = 2 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) // Give zero core offers. Should not generate any tasks - val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), + val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) val taskSet = FakeTask.createTaskSet(1) taskScheduler.submitTasks(taskSet) @@ -121,7 +121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // No tasks should run as we only have 1 core free. val numFreeCores = 1 - val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), + val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten @@ -129,7 +129,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Now change the offers to have 2 cores in one executor and verify if it // is chosen. - val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), + val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), new WorkerOffer("executor1", "host1", numFreeCores)) taskScheduler.submitTasks(taskSet) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten @@ -144,7 +144,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val numFreeCores = 1 val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) - val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), + val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), new WorkerOffer("executor1", "host1", numFreeCores)) taskScheduler.submitTasks(taskSet) var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten @@ -184,7 +184,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val taskScheduler = setupScheduler() val numFreeCores = 1 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) val attempt1 = FakeTask.createTaskSet(10) // submit attempt 1, offer some resources, some tasks get scheduled @@ -216,7 +216,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val taskScheduler = setupScheduler() val numFreeCores = 10 - val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores)) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) val attempt1 = FakeTask.createTaskSet(10) // submit attempt 1, offer some resources, some tasks get scheduled @@ -254,8 +254,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("tasks are not re-scheduled while executor loss reason is pending") { val taskScheduler = setupScheduler() - val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) - val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1)) + val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1)) + val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1)) val attempt1 = FakeTask.createTaskSet(1) // submit attempt 1, offer resources, task gets scheduled @@ -296,7 +296,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(taskSet) val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get - val firstTaskAttempts = taskScheduler.resourceOffers(Seq( + val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( new WorkerOffer("executor0", "host0", 1), new WorkerOffer("executor1", "host1", 1) )).flatten @@ -313,7 +313,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // on that executor, and make sure that the other task (not the failed one) is assigned there taskScheduler.executorLost("executor1", SlaveLost("oops")) val nextTaskAttempts = - taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten + taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten // Note: Its OK if some future change makes this already realize the taskset has become // unschedulable at this point (though in the current implementation, we're sure it will not) assert(nextTaskAttempts.size === 1) @@ -323,7 +323,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // now we should definitely realize that our task set is unschedulable, because the only // task left can't be scheduled on any executors due to the blacklist - taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))) + taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) sc.listenerBus.waitUntilEmpty(100000) assert(tsm.isZombie) assert(failedTaskSet) @@ -348,7 +348,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(taskSet) val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get - val offers = Seq( + val offers = IndexedSeq( // each offer has more than enough free cores for the entire task set, so when combined // with the locality preferences, we schedule all tasks on one executor new WorkerOffer("executor0", "host0", 4), @@ -380,7 +380,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _* )) - val taskDescs = taskScheduler.resourceOffers(Seq( + val taskDescs = taskScheduler.resourceOffers(IndexedSeq( new WorkerOffer("executor0", "host0", 1), new WorkerOffer("executor1", "host1", 1) )).flatten @@ -396,7 +396,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // when executor2 is added, we should realize that we can run process-local tasks. // And we should know its alive on the host. val secondTaskDescs = taskScheduler.resourceOffers( - Seq(new WorkerOffer("executor2", "host0", 1))).flatten + IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten assert(secondTaskDescs.size === 1) assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY)) @@ -406,7 +406,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // And even if we don't have anything left to schedule, another resource offer on yet another // executor should also update the set of live executors val thirdTaskDescs = taskScheduler.resourceOffers( - Seq(new WorkerOffer("executor3", "host1", 1))).flatten + IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten assert(thirdTaskDescs.size === 0) assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) } -- cgit v1.2.3