aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-05-21 06:49:23 +0800
committerAndrew xia <junluan.xia@intel.com>2013-05-21 06:49:23 +0800
commitecd6d75c6a88232c40070baed3dd67bdf77f0c69 (patch)
tree9200bb4f1a521ae460057233582487061a780484 /core
parent3d4672eaa9ebda0a2510a84087bb5bc6d15fc99c (diff)
downloadspark-ecd6d75c6a88232c40070baed3dd67bdf77f0c69.tar.gz
spark-ecd6d75c6a88232c40070baed3dd67bdf77f0c69.tar.bz2
spark-ecd6d75c6a88232c40070baed3dd67bdf77f0c69.zip
fix bug of unit tests
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala6
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala72
2 files changed, 43 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 9547f4f6dd..053d4b8e4a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -352,7 +352,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHostPort(hostPort) += execId
availableCpus(i) -= 1
launchedTask = true
-
+
case None => {}
}
}
@@ -373,7 +373,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
-
+
if (tasks.size > 0) {
hasLaunchedTask = true
}
@@ -522,7 +522,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
hostPortsAlive -= hostPort
hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
}
-
+
val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
execs -= executorId
if (execs.isEmpty) {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 7af749fb5c..a39418b716 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -67,12 +67,6 @@ class DummyTaskSetManager(
return true
}
-// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
-// var leafSchedulableQueue = new ArrayBuffer[Schedulable]
-// leafSchedulableQueue += this
-// return leafSchedulableQueue
-// }
-
def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
@@ -94,17 +88,10 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
-class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
-
- val sc = new SparkContext("local", "ClusterSchedulerSuite")
- val clusterScheduler = new ClusterScheduler(sc)
- var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
- val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
- tasks += task
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
- new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
+ new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
@@ -125,13 +112,20 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("FIFO Scheduler Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
- val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
- val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
- val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
+ val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
+ val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
+ val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
@@ -145,6 +139,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("Fair Scheduler Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
@@ -167,15 +168,15 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
- val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
- val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
- val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
+ val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
+ val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
+ val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
- val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
- val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
+ val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
+ val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
@@ -195,6 +196,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("Nested Pool Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
@@ -211,23 +219,23 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
- val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
- val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
+ val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
+ val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
- val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
- val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
+ val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
+ val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
- val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
- val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
+ val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
+ val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
- val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
- val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
+ val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
+ val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)