From ecd6d75c6a88232c40070baed3dd67bdf77f0c69 Mon Sep 17 00:00:00 2001 From: Andrew xia Date: Tue, 21 May 2013 06:49:23 +0800 Subject: fix bug of unit tests --- .../spark/scheduler/cluster/ClusterScheduler.scala | 6 +- .../spark/scheduler/ClusterSchedulerSuite.scala | 72 ++++++++++++---------- 2 files changed, 43 insertions(+), 35 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 9547f4f6dd..053d4b8e4a 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -352,7 +352,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) executorsByHostPort(hostPort) += execId availableCpus(i) -= 1 launchedTask = true - + case None => {} } } @@ -373,7 +373,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } - + if (tasks.size > 0) { hasLaunchedTask = true } @@ -522,7 +522,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) hostPortsAlive -= hostPort hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort) } - + val execs = executorsByHostPort.getOrElse(hostPort, new HashSet) execs -= executorId if (execs.isEmpty) { diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index 7af749fb5c..a39418b716 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -67,12 +67,6 @@ class DummyTaskSetManager( return true } -// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { -// var leafSchedulableQueue = new ArrayBuffer[Schedulable] -// leafSchedulableQueue += this -// return leafSchedulableQueue -// } - def taskFinished() { decreaseRunningTasks(1) tasksFinished +=1 @@ -94,17 +88,10 @@ class DummyTask(stageId: Int) extends Task[Int](stageId) } } -class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { - - val sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) - var tasks = ArrayBuffer[Task[_]]() - val task = new DummyTask(0) - val taskSet = new TaskSet(tasks.toArray,0,0,0,null) - tasks += task +class ClusterSchedulerSuite extends FunSuite with LocalSparkContext { - def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = { - new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet) + def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = { + new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet) } def resourceOffer(rootPool: Pool): Int = { @@ -125,13 +112,20 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { } test("FIFO Scheduler Test") { + sc = new SparkContext("local", "ClusterSchedulerSuite") + val clusterScheduler = new ClusterScheduler(sc) + var tasks = ArrayBuffer[Task[_]]() + val task = new DummyTask(0) + tasks += task + val taskSet = new TaskSet(tasks.toArray,0,0,0,null) + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = createDummyTaskSetManager(0, 0, 2) - val taskSetManager1 = createDummyTaskSetManager(0, 1, 2) - val taskSetManager2 = createDummyTaskSetManager(0, 2, 2) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) @@ -145,6 +139,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { } test("Fair Scheduler Test") { + sc = new SparkContext("local", "ClusterSchedulerSuite") + val clusterScheduler = new ClusterScheduler(sc) + var tasks = ArrayBuffer[Task[_]]() + val task = new DummyTask(0) + tasks += task + val taskSet = new TaskSet(tasks.toArray,0,0,0,null) + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() System.setProperty("spark.fairscheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) @@ -167,15 +168,15 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val properties2 = new Properties() properties2.setProperty("spark.scheduler.cluster.fair.pool","2") - val taskSetManager10 = createDummyTaskSetManager(1, 0, 1) - val taskSetManager11 = createDummyTaskSetManager(1, 1, 1) - val taskSetManager12 = createDummyTaskSetManager(1, 2, 2) + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - val taskSetManager23 = createDummyTaskSetManager(2, 3, 2) - val taskSetManager24 = createDummyTaskSetManager(2, 4, 2) + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -195,6 +196,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { } test("Nested Pool Test") { + sc = new SparkContext("local", "ClusterSchedulerSuite") + val clusterScheduler = new ClusterScheduler(sc) + var tasks = ArrayBuffer[Task[_]]() + val task = new DummyTask(0) + tasks += task + val taskSet = new TaskSet(tasks.toArray,0,0,0,null) + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) @@ -211,23 +219,23 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - val taskSetManager000 = createDummyTaskSetManager(0, 0, 5) - val taskSetManager001 = createDummyTaskSetManager(0, 1, 5) + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - val taskSetManager010 = createDummyTaskSetManager(1, 2, 5) - val taskSetManager011 = createDummyTaskSetManager(1, 3, 5) + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - val taskSetManager100 = createDummyTaskSetManager(2, 4, 5) - val taskSetManager101 = createDummyTaskSetManager(2, 5, 5) + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = createDummyTaskSetManager(3, 6, 5) - val taskSetManager111 = createDummyTaskSetManager(3, 7, 5) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) -- cgit v1.2.3