From d19753b9c78857acae441dce3133fbb6c5855f95 Mon Sep 17 00:00:00 2001 From: Andrew xia Date: Sat, 18 May 2013 06:45:19 +0800 Subject: expose TaskSetManager type to resourceOffer function in ClusterScheduler --- .../spark/scheduler/cluster/ClusterScheduler.scala | 14 +-- .../main/scala/spark/scheduler/cluster/Pool.scala | 12 +-- .../spark/scheduler/cluster/Schedulable.scala | 3 +- .../spark/scheduler/cluster/TaskDescription.scala | 1 - .../spark/scheduler/cluster/TaskSetManager.scala | 12 +-- .../spark/scheduler/ClusterSchedulerSuite.scala | 112 ++++++++++++--------- 6 files changed, 84 insertions(+), 70 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 4caafcc1d3..e6399a3547 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -165,24 +165,24 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = offers.map(o => o.cores).toArray var launchedTask = false - val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable() - for (schedulable <- sortedLeafSchedulable) + val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue() + for (manager <- sortedTaskSetQueue) { - logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks)) + logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks)) } - for (schedulable <- sortedLeafSchedulable) { + for (manager <- sortedTaskSetQueue) { do { launchedTask = false for (i <- 0 until offers.size) { var launchedTask = true val execId = offers(i).executorId val host = offers(i).hostname - schedulable.slaveOffer(execId,host,availableCpus(i)) match { + manager.slaveOffer(execId,host,availableCpus(i)) match { case Some(task) => tasks(i) += task val tid = task.taskId - taskIdToTaskSetId(tid) = task.taskSetId - taskSetTaskIds(task.taskSetId) += tid + taskIdToTaskSetId(tid) = manager.taskSet.id + taskSetTaskIds(manager.taskSet.id) += tid taskIdToExecutorId(tid) = execId activeExecutorIds += execId executorsByHost(host) += execId diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala index ae603e7dd9..4dc15f413c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala @@ -75,17 +75,13 @@ private[spark] class Pool( return shouldRevive } - override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { - return None - } - - override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { - var leafSchedulableQueue = new ArrayBuffer[Schedulable] + override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { + var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { - leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable() + sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() } - return leafSchedulableQueue + return sortedTaskSetQueue } override def increaseRunningTasks(taskNum: Int) { diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala index c620588e14..6bb7525b49 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala @@ -21,7 +21,6 @@ private[spark] trait Schedulable { def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String): Unit - def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] def checkSpeculatableTasks(): Boolean - def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] + def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index cdd004c94b..b41e951be9 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -5,7 +5,6 @@ import spark.util.SerializableBuffer private[spark] class TaskDescription( val taskId: Long, - val taskSetId: String, val executorId: String, val name: String, _serializedTask: ByteBuffer) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 80edbe77a1..b9d2dbf487 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -198,7 +198,7 @@ private[spark] class TaskSetManager( } // Respond to an offer of a single slave from the scheduler by finding a task - override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -234,7 +234,7 @@ private[spark] class TaskSetManager( logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) - return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask)) + return Some(new TaskDescription(taskId, execId, taskName, serializedTask)) } case _ => } @@ -398,10 +398,10 @@ private[spark] class TaskSetManager( //nothing } - override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { - var leafSchedulableQueue = new ArrayBuffer[Schedulable] - leafSchedulableQueue += this - return leafSchedulableQueue + override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { + var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] + sortedTaskSetQueue += this + return sortedTaskSetQueue } override def executorLost(execId: String, hostname: String) { diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index 8426be7575..956cc7421c 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -13,18 +13,20 @@ import java.util.Properties class DummyTaskSetManager( initPriority: Int, initStageId: Int, - initNumTasks: Int) - extends Schedulable { - - var parent: Schedulable = null - var weight = 1 - var minShare = 2 - var runningTasks = 0 - var priority = initPriority - var stageId = initStageId - var name = "TaskSet_"+stageId - var numTasks = initNumTasks - var tasksFinished = 0 + initNumTasks: Int, + clusterScheduler: ClusterScheduler, + taskSet: TaskSet) + extends TaskSetManager(clusterScheduler,taskSet) { + + parent = null + weight = 1 + minShare = 2 + runningTasks = 0 + priority = initPriority + stageId = initStageId + name = "TaskSet_"+stageId + override val numTasks = initNumTasks + tasksFinished = 0 override def increaseRunningTasks(taskNum: Int) { runningTasks += taskNum @@ -41,11 +43,11 @@ class DummyTaskSetManager( } override def addSchedulable(schedulable: Schedulable) { - } - + } + override def removeSchedulable(schedulable: Schedulable) { } - + override def getSchedulableByName(name: String): Schedulable = { return null } @@ -65,11 +67,11 @@ class DummyTaskSetManager( return true } - override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { - var leafSchedulableQueue = new ArrayBuffer[Schedulable] - leafSchedulableQueue += this - return leafSchedulableQueue - } +// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = { +// var leafSchedulableQueue = new ArrayBuffer[Schedulable] +// leafSchedulableQueue += this +// return leafSchedulableQueue +// } def taskFinished() { decreaseRunningTasks(1) @@ -85,10 +87,28 @@ class DummyTaskSetManager( } } +class DummyTask(stageId: Int) extends Task[Int](stageId) +{ + def run(attemptId: Long): Int = { + return 0 + } +} + class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { - + + val sc = new SparkContext("local", "ClusterSchedulerSuite") + val clusterScheduler = new ClusterScheduler(sc) + var tasks = ArrayBuffer[Task[_]]() + val task = new DummyTask(0) + val taskSet = new TaskSet(tasks.toArray,0,0,0,null) + tasks += task + + def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = { + new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet) + } + def resourceOffer(rootPool: Pool): Int = { - val taskSetQueue = rootPool.getSortedLeafSchedulable() + val taskSetQueue = rootPool.getSortedTaskSetQueue() for (taskSet <- taskSetQueue) { taskSet.slaveOffer("execId_1", "hostname_1", 1) match { @@ -109,13 +129,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = new DummyTaskSetManager(0, 0, 2) - val taskSetManager1 = new DummyTaskSetManager(0, 1, 2) - val taskSetManager2 = new DummyTaskSetManager(0, 2, 2) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) - + checkTaskSetId(rootPool, 0) resourceOffer(rootPool) checkTaskSetId(rootPool, 1) @@ -130,7 +150,7 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - + assert(rootPool.getSchedulableByName("default") != null) assert(rootPool.getSchedulableByName("1") != null) assert(rootPool.getSchedulableByName("2") != null) @@ -146,16 +166,16 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { properties1.setProperty("spark.scheduler.cluster.fair.pool","1") val properties2 = new Properties() properties2.setProperty("spark.scheduler.cluster.fair.pool","2") - - val taskSetManager10 = new DummyTaskSetManager(1, 0, 1) - val taskSetManager11 = new DummyTaskSetManager(1, 1, 1) - val taskSetManager12 = new DummyTaskSetManager(1, 2, 2) + + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - - val taskSetManager23 = new DummyTaskSetManager(2, 3, 2) - val taskSetManager24 = new DummyTaskSetManager(2, 4, 2) + + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -190,27 +210,27 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - - val taskSetManager000 = new DummyTaskSetManager(0, 0, 5) - val taskSetManager001 = new DummyTaskSetManager(0, 1, 5) + + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - - val taskSetManager010 = new DummyTaskSetManager(1, 2, 5) - val taskSetManager011 = new DummyTaskSetManager(1, 3, 5) + + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - - val taskSetManager100 = new DummyTaskSetManager(2, 4, 5) - val taskSetManager101 = new DummyTaskSetManager(2, 5, 5) + + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = new DummyTaskSetManager(3, 6, 5) - val taskSetManager111 = new DummyTaskSetManager(3, 7, 5) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) - + checkTaskSetId(rootPool, 0) checkTaskSetId(rootPool, 4) checkTaskSetId(rootPool, 6) -- cgit v1.2.3