aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-07 22:39:06 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-07 22:39:06 -0700
commit1ae60bcb3673118c9838ecc6afac1d015fe79def (patch)
tree615ef01bcdf8d5a7ef142f20e05af7ea5ed0a272
parentfff3728552ccd2fc02fc4b7b750a9e3698869718 (diff)
parent606bb1b450064a2b909e4275ce45325dbbef4eca (diff)
downloadspark-1ae60bcb3673118c9838ecc6afac1d015fe79def.tar.gz
spark-1ae60bcb3673118c9838ecc6afac1d015fe79def.tar.bz2
spark-1ae60bcb3673118c9838ecc6afac1d015fe79def.zip
Merge pull request #634 from xiajunluan/master
[Spark-753] Fix ClusterSchedulSuite unit test failed
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala17
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala9
2 files changed, 19 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index a5d6285c99..13120edf63 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -40,15 +40,24 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
+ var compare:Int = 0
if (s1Needy && !s2Needy) {
- res = true
+ return true
} else if (!s1Needy && s2Needy) {
- res = false
+ return false
} else if (s1Needy && s2Needy) {
- res = minShareRatio1 <= minShareRatio2
+ compare = minShareRatio1.compareTo(minShareRatio2)
+ } else {
+ compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
+ }
+
+ if (compare < 0) {
+ res = true
+ } else if (compare > 0) {
+ res = false
} else {
- res = taskToWeightRatio1 <= taskToWeightRatio2
+ return s1.name < s2.name
}
return res
}
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index a39418b716..c861597c6b 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
@@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue()
- for (taskSet <- taskSetQueue)
- {
+ /* Just for Test*/
+ for (manager <- taskSetQueue) {
+ logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+ }
+ for (taskSet <- taskSetQueue) {
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
case Some(task) =>
return taskSet.stageId