diff options
author | Andrew xia <junluan.xia@intel.com> | 2013-05-17 05:10:38 +0800 |
---|---|---|
committer | Andrew xia <junluan.xia@intel.com> | 2013-05-17 05:10:38 +0800 |
commit | c6e2770bfe940a4f4f26f75c9ba228faea7316f0 (patch) | |
tree | 02da2f9a2b2987d0641d4ffaf72357ecf264e319 /core/src/test/scala | |
parent | 8436bd5d4a96480ac1871330a28d9d712e64959d (diff) | |
download | spark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.tar.gz spark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.tar.bz2 spark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.zip |
Fix ClusterScheduler bug to avoid allocating tasks to same slave
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index 2eda48196b..8426be7575 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -6,6 +6,7 @@ import org.scalatest.BeforeAndAfter import spark._ import spark.scheduler._ import spark.scheduler.cluster._ +import scala.collection.mutable.ArrayBuffer import java.util.Properties @@ -25,34 +26,34 @@ class DummyTaskSetManager( var numTasks = initNumTasks var tasksFinished = 0 - def increaseRunningTasks(taskNum: Int) { + override def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum if (parent != null) { parent.increaseRunningTasks(taskNum) } } - def decreaseRunningTasks(taskNum: Int) { + override def decreaseRunningTasks(taskNum: Int) { runningTasks -= taskNum if (parent != null) { parent.decreaseRunningTasks(taskNum) } } - def addSchedulable(schedulable: Schedulable) { + override def addSchedulable(schedulable: Schedulable) { } - def removeSchedulable(schedulable: Schedulable) { + override def removeSchedulable(schedulable: Schedulable) { } - def getSchedulableByName(name: String): Schedulable = { + override def getSchedulableByName(name: String): Schedulable = { return null } - def executorLost(executorId: String, host: String): Unit = { + override def executorLost(executorId: String, host: String): Unit = { } - def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = { + override def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = { if (tasksFinished + runningTasks < numTasks) { increaseRunningTasks(1) return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null)) @@ -60,10 +61,16 @@ class DummyTaskSetManager( return None } - def checkSpeculatableTasks(): Boolean = { + override def checkSpeculatableTasks(): Boolean = { return true } + override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { + var leafSchedulableQueue = new ArrayBuffer[Schedulable] + leafSchedulableQueue += this + return leafSchedulableQueue + } + def taskFinished() { decreaseRunningTasks(1) tasksFinished +=1 @@ -80,16 +87,21 @@ class DummyTaskSetManager( class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { - def receiveOffer(rootPool: Pool) : Option[TaskDescription] = { - rootPool.receiveOffer("execId_1", "hostname_1", 1) + def resourceOffer(rootPool: Pool): Int = { + val taskSetQueue = rootPool.getSortedLeafSchedulable() + for (taskSet <- taskSetQueue) + { + taskSet.slaveOffer("execId_1", "hostname_1", 1) match { + case Some(task) => + return task.taskSetId.toInt + case None => {} + } + } + -1 } def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) { - receiveOffer(rootPool) match { - case Some(task) => - assert(task.taskSetId.toInt === expectedTaskSetId) - case _ => - } + assert(resourceOffer(rootPool) === expectedTaskSetId) } test("FIFO Scheduler Test") { @@ -105,9 +117,9 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { schedulableBuilder.addTaskSetManager(taskSetManager2, null) checkTaskSetId(rootPool, 0) - receiveOffer(rootPool) + resourceOffer(rootPool) checkTaskSetId(rootPool, 1) - receiveOffer(rootPool) + resourceOffer(rootPool) taskSetManager1.abort() checkTaskSetId(rootPool, 2) } |