diff options
author | Andrew xia <junluan.xia@intel.com> | 2013-05-18 06:45:19 +0800 |
---|---|---|
committer | Andrew xia <junluan.xia@intel.com> | 2013-05-18 06:45:19 +0800 |
commit | d19753b9c78857acae441dce3133fbb6c5855f95 (patch) | |
tree | 6d430f9c768f2264dba32e2c62857e7ba279f21a /core/src/test/scala | |
parent | c6e2770bfe940a4f4f26f75c9ba228faea7316f0 (diff) | |
download | spark-d19753b9c78857acae441dce3133fbb6c5855f95.tar.gz spark-d19753b9c78857acae441dce3133fbb6c5855f95.tar.bz2 spark-d19753b9c78857acae441dce3133fbb6c5855f95.zip |
expose TaskSetManager type to resourceOffer function in ClusterScheduler
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala | 112 |
1 files changed, 66 insertions, 46 deletions
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index 8426be7575..956cc7421c 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -13,18 +13,20 @@ import java.util.Properties class DummyTaskSetManager( initPriority: Int, initStageId: Int, - initNumTasks: Int) - extends Schedulable { - - var parent: Schedulable = null - var weight = 1 - var minShare = 2 - var runningTasks = 0 - var priority = initPriority - var stageId = initStageId - var name = "TaskSet_"+stageId - var numTasks = initNumTasks - var tasksFinished = 0 + initNumTasks: Int, + clusterScheduler: ClusterScheduler, + taskSet: TaskSet) + extends TaskSetManager(clusterScheduler,taskSet) { + + parent = null + weight = 1 + minShare = 2 + runningTasks = 0 + priority = initPriority + stageId = initStageId + name = "TaskSet_"+stageId + override val numTasks = initNumTasks + tasksFinished = 0 override def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum @@ -41,11 +43,11 @@ class DummyTaskSetManager( } override def addSchedulable(schedulable: Schedulable) { - } - + } + override def removeSchedulable(schedulable: Schedulable) { } - + override def getSchedulableByName(name: String): Schedulable = { return null } @@ -65,11 +67,11 @@ class DummyTaskSetManager( return true } - override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { - var leafSchedulableQueue = new ArrayBuffer[Schedulable] - leafSchedulableQueue += this - return leafSchedulableQueue - } +// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { +// var leafSchedulableQueue = new ArrayBuffer[Schedulable] +// leafSchedulableQueue += this +// return leafSchedulableQueue +// } def taskFinished() { decreaseRunningTasks(1) @@ -85,10 +87,28 @@ class DummyTaskSetManager( } } +class DummyTask(stageId: Int) extends Task[Int](stageId) +{ + def run(attemptId: Long): Int = { + return 0 + } +} + class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { - + + val sc = new SparkContext("local", "ClusterSchedulerSuite") + val clusterScheduler = new ClusterScheduler(sc) + var tasks = ArrayBuffer[Task[_]]() + val task = new DummyTask(0) + val taskSet = new TaskSet(tasks.toArray,0,0,0,null) + tasks += task + + def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = { + new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet) + } + def resourceOffer(rootPool: Pool): Int = { - val taskSetQueue = rootPool.getSortedLeafSchedulable() + val taskSetQueue = rootPool.getSortedTaskSetQueue() for (taskSet <- taskSetQueue) { taskSet.slaveOffer("execId_1", "hostname_1", 1) match { @@ -109,13 +129,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = new DummyTaskSetManager(0, 0, 2) - val taskSetManager1 = new DummyTaskSetManager(0, 1, 2) - val taskSetManager2 = new DummyTaskSetManager(0, 2, 2) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) - + checkTaskSetId(rootPool, 0) resourceOffer(rootPool) checkTaskSetId(rootPool, 1) @@ -130,7 +150,7 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - + assert(rootPool.getSchedulableByName("default") != null) assert(rootPool.getSchedulableByName("1") != null) assert(rootPool.getSchedulableByName("2") != null) @@ -146,16 +166,16 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { properties1.setProperty("spark.scheduler.cluster.fair.pool","1") val properties2 = new Properties() properties2.setProperty("spark.scheduler.cluster.fair.pool","2") - - val taskSetManager10 = new DummyTaskSetManager(1, 0, 1) - val taskSetManager11 = new DummyTaskSetManager(1, 1, 1) - val taskSetManager12 = new DummyTaskSetManager(1, 2, 2) + + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - - val taskSetManager23 = new DummyTaskSetManager(2, 3, 2) - val taskSetManager24 = new DummyTaskSetManager(2, 4, 2) + + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -190,27 +210,27 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - - val taskSetManager000 = new DummyTaskSetManager(0, 0, 5) - val taskSetManager001 = new DummyTaskSetManager(0, 1, 5) + + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - - val taskSetManager010 = new DummyTaskSetManager(1, 2, 5) - val taskSetManager011 = new DummyTaskSetManager(1, 3, 5) + + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - - val taskSetManager100 = new DummyTaskSetManager(2, 4, 5) - val taskSetManager101 = new DummyTaskSetManager(2, 5, 5) + + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = new DummyTaskSetManager(3, 6, 5) - val taskSetManager111 = new DummyTaskSetManager(3, 7, 5) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) - + checkTaskSetId(rootPool, 0) checkTaskSetId(rootPool, 4) checkTaskSetId(rootPool, 6) |