From 606bb1b450064a2b909e4275ce45325dbbef4eca Mon Sep 17 00:00:00 2001 From: Andrew xia Date: Fri, 31 May 2013 15:40:41 +0800 Subject: Fix schedulingAlgorithm bugs for unit test --- .../spark/scheduler/cluster/SchedulingAlgorithm.scala | 17 +++++++++++++---- .../scala/spark/scheduler/ClusterSchedulerSuite.scala | 9 ++++++--- 2 files changed, 19 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala index a5d6285c99..13120edf63 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala @@ -40,15 +40,24 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var res:Boolean = true + var compare:Int = 0 if (s1Needy && !s2Needy) { - res = true + return true } else if (!s1Needy && s2Needy) { - res = false + return false } else if (s1Needy && s2Needy) { - res = minShareRatio1 <= minShareRatio2 + compare = minShareRatio1.compareTo(minShareRatio2) + } else { + compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) + } + + if (compare < 0) { + res = true + } else if (compare > 0) { + res = false } else { - res = taskToWeightRatio1 <= taskToWeightRatio2 + return s1.name < s2.name } return res } diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index a39418b716..c861597c6b 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId) } } -class ClusterSchedulerSuite extends FunSuite with LocalSparkContext { +class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = { new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet) @@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext { def resourceOffer(rootPool: Pool): Int = { val taskSetQueue = rootPool.getSortedTaskSetQueue() - for (taskSet <- taskSetQueue) - { + /* Just for Test*/ + for (manager <- taskSetQueue) { + logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) + } + for (taskSet <- taskSetQueue) { taskSet.slaveOffer("execId_1", "hostname_1", 1) match { case Some(task) => return taskSet.stageId -- cgit v1.2.3