aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-09-29 15:36:40 -0400
committerAndrew Or <andrewor14@gmail.com>2016-09-29 15:36:40 -0400
commit7f779e7439127efa0e3611f7745e1c8423845198 (patch)
tree2f9b734d0a9b310c5b623d519258abfb4d850132 /core/src
parent958200497affb40f05e321c2b0e252d365ae02f4 (diff)
downloadspark-7f779e7439127efa0e3611f7745e1c8423845198.tar.gz
spark-7f779e7439127efa0e3611f7745e1c8423845198.tar.bz2
spark-7f779e7439127efa0e3611f7745e1c8423845198.zip
[SPARK-17648][CORE] TaskScheduler really needs offers to be an IndexedSeq
## What changes were proposed in this pull request? The Seq[WorkerOffer] is accessed by index, so it really should be an IndexedSeq, otherwise an O(n) operation becomes O(n^2). In practice this hasn't been an issue b/c where these offers are generated, the call to `.toSeq` just happens to create an IndexedSeq anyway.I got bitten by this in performance tests I was doing, and its better for the types to be more precise so eg. a change in Scala doesn't destroy performance. ## How was this patch tested? Unit tests via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #15221 from squito/SPARK-17648.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala32
5 files changed, 24 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 52a7186cbf..0ad4730fe2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -252,7 +252,7 @@ private[spark] class TaskSchedulerImpl(
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
- tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
+ tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
@@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl(
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
- def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index edc3c19937..2d09863166 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -216,7 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toSeq
+ }.toIndexedSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
@@ -233,7 +233,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
- val workOffers = Seq(
+ val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index e386052814..7a73e8ed8a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -81,7 +81,7 @@ private[spark] class LocalEndpoint(
}
def reviveOffers() {
- val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
+ val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 14f52a6be9..5cd548bbc7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -366,13 +366,13 @@ private[spark] abstract class MockBackend(
*/
def executorIdToExecutor: Map[String, ExecutorTaskStatus]
- private def generateOffers(): Seq[WorkerOffer] = {
+ private def generateOffers(): IndexedSeq[WorkerOffer] = {
executorIdToExecutor.values.filter { exec =>
exec.freeCores > 0
}.map { exec =>
WorkerOffer(executorId = exec.executorId, host = exec.host,
cores = exec.freeCores)
- }.toSeq
+ }.toIndexedSeq
}
/**
@@ -381,8 +381,7 @@ private[spark] abstract class MockBackend(
* scheduling.
*/
override def reviveOffers(): Unit = {
- val offers: Seq[WorkerOffer] = generateOffers()
- val newTaskDescriptions = taskScheduler.resourceOffers(offers).flatten
+ val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
// get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
// tests from introducing a race if they need it
val newTasks = taskScheduler.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 100b15740c..61787b54f8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("Scheduler does not always schedule tasks on the same workers") {
val taskScheduler = setupScheduler()
val numFreeCores = 1
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
// Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
// get scheduled on the same executor. While there is a chance this test will fail
@@ -112,7 +112,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
// Give zero core offers. Should not generate any tasks
- val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
+ val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
val taskSet = FakeTask.createTaskSet(1)
taskScheduler.submitTasks(taskSet)
@@ -121,7 +121,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// No tasks should run as we only have 1 core free.
val numFreeCores = 1
- val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
@@ -129,7 +129,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Now change the offers to have 2 cores in one executor and verify if it
// is chosen.
- val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -144,7 +144,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val numFreeCores = 1
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
- val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
@@ -184,7 +184,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskScheduler = setupScheduler()
val numFreeCores = 1
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
val attempt1 = FakeTask.createTaskSet(10)
// submit attempt 1, offer some resources, some tasks get scheduled
@@ -216,7 +216,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val taskScheduler = setupScheduler()
val numFreeCores = 10
- val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
val attempt1 = FakeTask.createTaskSet(10)
// submit attempt 1, offer some resources, some tasks get scheduled
@@ -254,8 +254,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("tasks are not re-scheduled while executor loss reason is pending") {
val taskScheduler = setupScheduler()
- val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
- val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+ val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1))
+ val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)
// submit attempt 1, offer resources, task gets scheduled
@@ -296,7 +296,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
- val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+ val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1)
)).flatten
@@ -313,7 +313,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// on that executor, and make sure that the other task (not the failed one) is assigned there
taskScheduler.executorLost("executor1", SlaveLost("oops"))
val nextTaskAttempts =
- taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
+ taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
// Note: Its OK if some future change makes this already realize the taskset has become
// unschedulable at this point (though in the current implementation, we're sure it will not)
assert(nextTaskAttempts.size === 1)
@@ -323,7 +323,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// now we should definitely realize that our task set is unschedulable, because the only
// task left can't be scheduled on any executors due to the blacklist
- taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+ taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
sc.listenerBus.waitUntilEmpty(100000)
assert(tsm.isZombie)
assert(failedTaskSet)
@@ -348,7 +348,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
- val offers = Seq(
+ val offers = IndexedSeq(
// each offer has more than enough free cores for the entire task set, so when combined
// with the locality preferences, we schedule all tasks on one executor
new WorkerOffer("executor0", "host0", 4),
@@ -380,7 +380,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
(0 until 2).map { _ => Seq(TaskLocation("host0", "executor2"))}: _*
))
- val taskDescs = taskScheduler.resourceOffers(Seq(
+ val taskDescs = taskScheduler.resourceOffers(IndexedSeq(
new WorkerOffer("executor0", "host0", 1),
new WorkerOffer("executor1", "host1", 1)
)).flatten
@@ -396,7 +396,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// when executor2 is added, we should realize that we can run process-local tasks.
// And we should know its alive on the host.
val secondTaskDescs = taskScheduler.resourceOffers(
- Seq(new WorkerOffer("executor2", "host0", 1))).flatten
+ IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten
assert(secondTaskDescs.size === 1)
assert(mgr.myLocalityLevels.toSet ===
Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY))
@@ -406,7 +406,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// And even if we don't have anything left to schedule, another resource offer on yet another
// executor should also update the set of live executors
val thirdTaskDescs = taskScheduler.resourceOffers(
- Seq(new WorkerOffer("executor3", "host1", 1))).flatten
+ IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten
assert(thirdTaskDescs.size === 0)
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}