aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2014-03-25 13:05:30 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-03-25 13:05:30 -0700
commitf8111eaeb0e35f6aa9b1e3ec1173fff207174155 (patch)
tree19ddfc891045437de89124055f6ff3344ae70530 /core
parent71d4ed271bcbddb154643bd44297ed77190e75cf (diff)
downloadspark-f8111eaeb0e35f6aa9b1e3ec1173fff207174155.tar.gz
spark-f8111eaeb0e35f6aa9b1e3ec1173fff207174155.tar.bz2
spark-f8111eaeb0e35f6aa9b1e3ec1173fff207174155.zip
SPARK-1319: Fix scheduler to account for tasks using > 1 CPUs.
Move CPUS_PER_TASK to TaskSchedulerImpl as the value is a constant and use it in both Mesos and CoarseGrained scheduler backends. Thanks @kayousterhout for the design discussion Author: Shivaram Venkataraman <shivaram@eecs.berkeley.edu> Closes #219 from shivaram/multi-cpus and squashes the following commits: 5c7d685 [Shivaram Venkataraman] Don't pass availableCpus to TaskSetManager 260e4d5 [Shivaram Venkataraman] Add a check for non-zero CPUs in TaskSetManager 73fcf6f [Shivaram Venkataraman] Add documentation for spark.task.cpus 647bc45 [Shivaram Venkataraman] Fix scheduler to account for tasks using > 1 CPUs. Move CPUS_PER_TASK to TaskSchedulerImpl as the value is a constant and use it in both Mesos and CoarseGrained scheduler backends.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala83
6 files changed, 99 insertions, 63 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 30bceb47b9..a92922166f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -62,6 +62,9 @@ private[spark] class TaskSchedulerImpl(
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
+ // CPUs to request per task
+ val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
@@ -228,16 +231,18 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
- for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetId(tid) = taskSet.taskSet.id
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHost(host) += execId
- availableCpus(i) -= taskSet.CPUS_PER_TASK
- assert (availableCpus(i) >= 0)
- launchedTask = true
+ if (availableCpus(i) >= CPUS_PER_TASK) {
+ for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = taskSet.taskSet.id
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= CPUS_PER_TASK
+ assert (availableCpus(i) >= 0)
+ launchedTask = true
+ }
}
}
} while (launchedTask)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a73343c1c0..86d2050a03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -56,9 +56,6 @@ private[spark] class TaskSetManager(
{
val conf = sched.sc.conf
- // CPUs to request per task
- val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
-
/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
@@ -384,11 +381,10 @@ private[spark] class TaskSetManager(
def resourceOffer(
execId: String,
host: String,
- availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
- if (!isZombie && availableCpus >= CPUS_PER_TASK) {
+ if (!isZombie) {
val curTime = clock.getTime()
var allowedLocality = getAllowedLocalityLevel(curTime)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index fad0373157..990e01a3e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -89,7 +89,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
- freeCores(executorId) += 1
+ freeCores(executorId) += scheduler.CPUS_PER_TASK
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
@@ -140,7 +140,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- freeCores(task.executorId) -= 1
+ freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 4092dd04b1..dfdcafe19f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -246,7 +246,7 @@ private[spark] class MesosSchedulerBackend(
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
+ .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9274e01632..356e28dd19 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -80,7 +80,6 @@ class FakeTaskSetManager(
override def resourceOffer(
execId: String,
host: String,
- availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
@@ -125,7 +124,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
- taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
+ taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
@@ -293,4 +292,43 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(count > 0)
assert(count < numTrials)
}
+
+ test("Scheduler correctly accounts for multiple CPUs per task") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskCpus = 2
+
+ sc.conf.set("spark.task.cpus", taskCpus.toString)
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ val dagScheduler = new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ // Give zero core offers. Should not generate any tasks
+ val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
+ new WorkerOffer("executor1", "host1", 0))
+ val taskSet = FakeTask.createTaskSet(1)
+ taskScheduler.submitTasks(taskSet)
+ var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten
+ assert(0 === taskDescriptions.length)
+
+ // No tasks should run as we only have 1 core free.
+ val numFreeCores = 1
+ val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ new WorkerOffer("executor1", "host1", numFreeCores))
+ taskScheduler.submitTasks(taskSet)
+ taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
+ assert(0 === taskDescriptions.length)
+
+ // Now change the offers to have 2 cores in one executor and verify if it
+ // is chosen.
+ val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ new WorkerOffer("executor1", "host1", numFreeCores))
+ taskScheduler.submitTasks(taskSet)
+ taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+ assert(1 === taskDescriptions.length)
+ assert("executor0" === taskDescriptions(0).executorId)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 9af5d3a303..c92b6dc96c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -93,19 +93,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
- // Offer a host with no CPUs
- assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
-
// Offer a host with process-local as the constraint; this should work because the TaskSet
// above won't have any locality preferences
- val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+ val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
assert(sched.startedTasks.contains(0))
// Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -121,7 +118,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First three offers should all find tasks
for (i <- 0 until 3) {
- val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+ val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
@@ -129,7 +126,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.startedTasks.toSet === Set(0, 1, 2))
// Re-offer the host -- now we should get no more tasks
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -157,35 +154,35 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1, exec1 again: the last task, which has no prefs, should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+ assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
// Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+ assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
// Offer host1, exec1 again, at ANY level: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1, exec1 again, at ANY level: task 1 should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
}
test("delay scheduling with fallback") {
@@ -203,29 +200,29 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1 again: nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1 again: second task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Offer host1 again: third task (on host2) should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// Offer host2: fifth task (also on host2) should get chosen
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+ assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4)
// Now that we've launched a local task, we should no longer launch the task for host3
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+ assert(manager.resourceOffer("exec2", "host2", ANY) === None)
clock.advance(LOCALITY_WAIT)
// After another delay, we can go ahead and launch that task non-locally
- assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+ assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
}
test("delay scheduling with failed hosts") {
@@ -240,24 +237,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1 again: third task should be chosen immediately because host3 is not up
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// After this, nothing should get chosen
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Now mark host2 as dead
sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2")
// Task 1 should immediately be launched on host1 because its original host is gone
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Now that all tasks have launched, nothing new should be launched anywhere else
- assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
- assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+ assert(manager.resourceOffer("exec1", "host1", ANY) === None)
+ assert(manager.resourceOffer("exec2", "host2", ANY) === None)
}
test("task result lost") {
@@ -267,14 +264,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Tell it the task has finished but the result was lost.
manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
assert(sched.endedTasks(0) === TaskResultLost)
// Re-offer the host -- now we should get task 0 again.
- assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
}
test("repeated failures lead to task set abortion") {
@@ -287,7 +284,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
(1 to manager.maxTaskFailures).foreach { index =>
- val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
+ val offerResult = manager.resourceOffer("exec1", "host1", ANY)
assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
@@ -317,7 +314,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
- val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+ val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@@ -328,15 +325,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
- assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
- assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
}
// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
- val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
+ val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
@@ -348,12 +345,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
- assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
}
// Run the task on exec2 - should work, and then fail it on exec2
{
- val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
+ val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@@ -364,20 +361,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
- assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
+ assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
}
// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)
{
- val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
+ val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")
- assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
+ assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)