aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-05-18 06:45:19 +0800
committerAndrew xia <junluan.xia@intel.com>2013-05-18 06:45:19 +0800
commitd19753b9c78857acae441dce3133fbb6c5855f95 (patch)
tree6d430f9c768f2264dba32e2c62857e7ba279f21a /core
parentc6e2770bfe940a4f4f26f75c9ba228faea7316f0 (diff)
downloadspark-d19753b9c78857acae441dce3133fbb6c5855f95.tar.gz
spark-d19753b9c78857acae441dce3133fbb6c5855f95.tar.bz2
spark-d19753b9c78857acae441dce3133fbb6c5855f95.zip
expose TaskSetManager type to resourceOffer function in ClusterScheduler
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala12
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala112
6 files changed, 84 insertions, 70 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 4caafcc1d3..e6399a3547 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -165,24 +165,24 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
var launchedTask = false
- val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
- for (schedulable <- sortedLeafSchedulable)
+ val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
+ for (manager <- sortedTaskSetQueue)
{
- logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
+ logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
}
- for (schedulable <- sortedLeafSchedulable) {
+ for (manager <- sortedTaskSetQueue) {
do {
launchedTask = false
for (i <- 0 until offers.size) {
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
- schedulable.slaveOffer(execId,host,availableCpus(i)) match {
+ manager.slaveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
- taskIdToTaskSetId(tid) = task.taskSetId
- taskSetTaskIds(task.taskSetId) += tid
+ taskIdToTaskSetId(tid) = manager.taskSet.id
+ taskSetTaskIds(manager.taskSet.id) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index ae603e7dd9..4dc15f413c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -75,17 +75,13 @@ private[spark] class Pool(
return shouldRevive
}
- override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
- return None
- }
-
- override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
- var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
- leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
+ sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
- return leafSchedulableQueue
+ return sortedTaskSetQueue
}
override def increaseRunningTasks(taskNum: Int) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index c620588e14..6bb7525b49 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -21,7 +21,6 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
- def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
- def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
+ def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index cdd004c94b..b41e951be9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,7 +5,6 @@ import spark.util.SerializableBuffer
private[spark] class TaskDescription(
val taskId: Long,
- val taskSetId: String,
val executorId: String,
val name: String,
_serializedTask: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 80edbe77a1..b9d2dbf487 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -234,7 +234,7 @@ private[spark] class TaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
- return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
@@ -398,10 +398,10 @@ private[spark] class TaskSetManager(
//nothing
}
- override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
- var leafSchedulableQueue = new ArrayBuffer[Schedulable]
- leafSchedulableQueue += this
- return leafSchedulableQueue
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+ sortedTaskSetQueue += this
+ return sortedTaskSetQueue
}
override def executorLost(execId: String, hostname: String) {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 8426be7575..956cc7421c 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -13,18 +13,20 @@ import java.util.Properties
class DummyTaskSetManager(
initPriority: Int,
initStageId: Int,
- initNumTasks: Int)
- extends Schedulable {
-
- var parent: Schedulable = null
- var weight = 1
- var minShare = 2
- var runningTasks = 0
- var priority = initPriority
- var stageId = initStageId
- var name = "TaskSet_"+stageId
- var numTasks = initNumTasks
- var tasksFinished = 0
+ initNumTasks: Int,
+ clusterScheduler: ClusterScheduler,
+ taskSet: TaskSet)
+ extends TaskSetManager(clusterScheduler,taskSet) {
+
+ parent = null
+ weight = 1
+ minShare = 2
+ runningTasks = 0
+ priority = initPriority
+ stageId = initStageId
+ name = "TaskSet_"+stageId
+ override val numTasks = initNumTasks
+ tasksFinished = 0
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
@@ -41,11 +43,11 @@ class DummyTaskSetManager(
}
override def addSchedulable(schedulable: Schedulable) {
- }
-
+ }
+
override def removeSchedulable(schedulable: Schedulable) {
}
-
+
override def getSchedulableByName(name: String): Schedulable = {
return null
}
@@ -65,11 +67,11 @@ class DummyTaskSetManager(
return true
}
- override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
- var leafSchedulableQueue = new ArrayBuffer[Schedulable]
- leafSchedulableQueue += this
- return leafSchedulableQueue
- }
+// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+// var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+// leafSchedulableQueue += this
+// return leafSchedulableQueue
+// }
def taskFinished() {
decreaseRunningTasks(1)
@@ -85,10 +87,28 @@ class DummyTaskSetManager(
}
}
+class DummyTask(stageId: Int) extends Task[Int](stageId)
+{
+ def run(attemptId: Long): Int = {
+ return 0
+ }
+}
+
class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
-
+
+ val sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+ tasks += task
+
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
+ new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
+ }
+
def resourceOffer(rootPool: Pool): Int = {
- val taskSetQueue = rootPool.getSortedLeafSchedulable()
+ val taskSetQueue = rootPool.getSortedTaskSetQueue()
for (taskSet <- taskSetQueue)
{
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
@@ -109,13 +129,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
- val taskSetManager0 = new DummyTaskSetManager(0, 0, 2)
- val taskSetManager1 = new DummyTaskSetManager(0, 1, 2)
- val taskSetManager2 = new DummyTaskSetManager(0, 2, 2)
+ val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
+ val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
+ val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-
+
checkTaskSetId(rootPool, 0)
resourceOffer(rootPool)
checkTaskSetId(rootPool, 1)
@@ -130,7 +150,7 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
-
+
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
@@ -146,16 +166,16 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
-
- val taskSetManager10 = new DummyTaskSetManager(1, 0, 1)
- val taskSetManager11 = new DummyTaskSetManager(1, 1, 1)
- val taskSetManager12 = new DummyTaskSetManager(1, 2, 2)
+
+ val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
+ val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
+ val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-
- val taskSetManager23 = new DummyTaskSetManager(2, 3, 2)
- val taskSetManager24 = new DummyTaskSetManager(2, 4, 2)
+
+ val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
+ val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
@@ -190,27 +210,27 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
-
- val taskSetManager000 = new DummyTaskSetManager(0, 0, 5)
- val taskSetManager001 = new DummyTaskSetManager(0, 1, 5)
+
+ val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
+ val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
-
- val taskSetManager010 = new DummyTaskSetManager(1, 2, 5)
- val taskSetManager011 = new DummyTaskSetManager(1, 3, 5)
+
+ val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
+ val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
-
- val taskSetManager100 = new DummyTaskSetManager(2, 4, 5)
- val taskSetManager101 = new DummyTaskSetManager(2, 5, 5)
+
+ val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
+ val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
- val taskSetManager110 = new DummyTaskSetManager(3, 6, 5)
- val taskSetManager111 = new DummyTaskSetManager(3, 7, 5)
+ val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
+ val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
-
+
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 6)