aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala42
1 files changed, 40 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9274e01632..356e28dd19 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -80,7 +80,6 @@ class FakeTaskSetManager(
override def resourceOffer(
execId: String,
host: String,
- availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
@@ -125,7 +124,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
- taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
+ taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
@@ -293,4 +292,43 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(count > 0)
assert(count < numTrials)
}
+
+ test("Scheduler correctly accounts for multiple CPUs per task") {
+ sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val taskCpus = 2
+
+ sc.conf.set("spark.task.cpus", taskCpus.toString)
+ val taskScheduler = new TaskSchedulerImpl(sc)
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+ val dagScheduler = new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+ override def executorAdded(execId: String, host: String) {}
+ }
+
+ // Give zero core offers. Should not generate any tasks
+ val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
+ new WorkerOffer("executor1", "host1", 0))
+ val taskSet = FakeTask.createTaskSet(1)
+ taskScheduler.submitTasks(taskSet)
+ var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten
+ assert(0 === taskDescriptions.length)
+
+ // No tasks should run as we only have 1 core free.
+ val numFreeCores = 1
+ val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
+ new WorkerOffer("executor1", "host1", numFreeCores))
+ taskScheduler.submitTasks(taskSet)
+ taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
+ assert(0 === taskDescriptions.length)
+
+ // Now change the offers to have 2 cores in one executor and verify if it
+ // is chosen.
+ val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
+ new WorkerOffer("executor1", "host1", numFreeCores))
+ taskScheduler.submitTasks(taskSet)
+ taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+ assert(1 === taskDescriptions.length)
+ assert("executor0" === taskDescriptions(0).executorId)
+ }
}