aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-05-25 21:09:03 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-05-25 21:09:03 -0700
commite8d4b6c296e7b38099e6df263d38e0f811d2a6d1 (patch)
tree104ad47df3ac73fa115304eb084d55b634c646fa
parent9a3c344679aebf96483794c3292abc6f5f91061d (diff)
parentecd6d75c6a88232c40070baed3dd67bdf77f0c69 (diff)
downloadspark-e8d4b6c296e7b38099e6df263d38e0f811d2a6d1.tar.gz
spark-e8d4b6c296e7b38099e6df263d38e0f811d2a6d1.tar.bz2
spark-e8d4b6c296e7b38099e6df263d38e0f811d2a6d1.zip
Merge pull request #529 from xiajunluan/master
[SPARK-663]Implement Fair Scheduler in Spark Cluster Scheduler
-rw-r--r--core/src/main/scala/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSet.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala49
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala104
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala27
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala115
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala56
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala59
-rw-r--r--core/src/test/resources/fairscheduler.xml14
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala247
15 files changed, 716 insertions, 41 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 69b4c5d20d..bc05d08fd6 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -2,12 +2,16 @@ package spark
import java.io._
import java.net.URI
+import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
import akka.actor.Actor._
@@ -233,6 +237,19 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
+ // Thread Local variable that can be used by users to pass information down the stack
+ private val localProperties = new DynamicVariable[Properties](null)
+
+ def initLocalProperties() {
+ localProperties.value = new Properties()
+ }
+
+ def addLocalProperties(key: String, value: String) {
+ if(localProperties.value == null) {
+ localProperties.value = new Properties()
+ }
+ localProperties.value.setProperty(key,value)
+ }
// Post init
taskScheduler.postStartHook()
@@ -616,7 +633,7 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
@@ -695,12 +712,11 @@ class SparkContext(
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
- timeout: Long
- ): PartialResult[R] = {
+ timeout: Long): PartialResult[R] = {
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index 5a4e9a582d..105eaecb22 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -2,6 +2,8 @@ package spark.scheduler
import spark.TaskContext
+import java.util.Properties
+
/**
* Tracks information about an active job in the DAGScheduler.
*/
@@ -11,7 +13,8 @@ private[spark] class ActiveJob(
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
- val listener: JobListener) {
+ val listener: JobListener,
+ val properties: Properties) {
val numPartitions = partitions.length
val finished = Array.fill[Boolean](numPartitions)(false)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b18248d2b5..7feeb97542 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -4,6 +4,7 @@ import cluster.TaskInfo
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
+import java.util.Properties
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -93,6 +94,8 @@ class DAGScheduler(
// stray messages to detect.
val failedGeneration = new HashMap[String, Long]
+ val idToActiveJob = new HashMap[Int, ActiveJob]
+
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
@@ -225,13 +228,14 @@ class DAGScheduler(
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
- resultHandler: (Int, U) => Unit)
+ resultHandler: (Int, U) => Unit,
+ properties: Properties = null)
: (JobSubmitted, JobWaiter[U]) =
{
assert(partitions.size > 0)
val waiter = new JobWaiter(partitions.size, resultHandler)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)
+ val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
return (toSubmit, waiter)
}
@@ -241,13 +245,14 @@ class DAGScheduler(
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
- resultHandler: (Int, U) => Unit)
+ resultHandler: (Int, U) => Unit,
+ properties: Properties = null)
{
if (partitions.size == 0) {
return
}
val (toSubmit, waiter) = prepareJob(
- finalRdd, func, partitions, callSite, allowLocal, resultHandler)
+ finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)
waiter.awaitResult() match {
case JobSucceeded => {}
@@ -262,13 +267,14 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
callSite: String,
- timeout: Long)
+ timeout: Long,
+ properties: Properties = null)
: PartialResult[R] =
{
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
- eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener))
+ eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties))
return listener.awaitResult() // Will throw an exception if the job fails
}
@@ -278,10 +284,10 @@ class DAGScheduler(
*/
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
- case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
+ case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
- val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
+ val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
@@ -292,6 +298,7 @@ class DAGScheduler(
// Compute very short actions like first() or take() with no parent stages locally.
runLocally(job)
} else {
+ idToActiveJob(runId) = job
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
@@ -333,7 +340,7 @@ class DAGScheduler(
submitStage(stage)
}
}
-
+
/**
* Check for waiting or failed stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
@@ -464,8 +471,9 @@ class DAGScheduler(
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
+ val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
- new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
+ new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
@@ -727,7 +735,7 @@ class DAGScheduler(
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
-
+
sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index b46bb863f0..acad915f13 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -1,5 +1,7 @@
package spark.scheduler
+import java.util.Properties
+
import spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map
@@ -20,7 +22,8 @@ private[spark] case class JobSubmitted(
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
- listener: JobListener)
+ listener: JobListener,
+ properties: Properties = null)
extends DAGSchedulerEvent
private[spark] case class CompletionEvent(
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 552061e46b..7fc9e13fd9 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -26,7 +26,7 @@ private[spark] class Stage(
val parents: List[Stage],
val priority: Int)
extends Logging {
-
+
val isShuffleMap = shuffleDep != None
val numPartitions = rdd.partitions.size
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
@@ -60,7 +60,7 @@ private[spark] class Stage(
numAvailableOutputs -= 1
}
}
-
+
def removeOutputsOnExecutor(execId: String) {
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
index a3002ca477..e4b5fcaedb 100644
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSet.scala
@@ -1,11 +1,18 @@
package spark.scheduler
+import java.util.Properties
+
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
-private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
- val id: String = stageId + "." + attempt
+private[spark] class TaskSet(
+ val tasks: Array[Task[_]],
+ val stageId: Int,
+ val attempt: Int,
+ val priority: Int,
+ val properties: Properties) {
+ val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index cf4483f144..053d4b8e4a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -56,7 +56,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager]
- var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
@@ -96,12 +95,28 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val mapOutputTracker = SparkEnv.get.mapOutputTracker
+ var schedulableBuilder: SchedulableBuilder = null
+ var rootPool: Pool = null
+
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
def initialize(context: SchedulerBackend) {
backend = context
+ //default scheduler is FIFO
+ val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
+ //temporarily set rootPool name to empty
+ rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+ schedulableBuilder = {
+ schedulingMode match {
+ case "FIFO" =>
+ new FIFOSchedulableBuilder(rootPool)
+ case "FAIR" =>
+ new FairSchedulableBuilder(rootPool)
+ }
+ }
+ schedulableBuilder.buildPools()
// resolve executorId to hostPort mapping.
def executorToHostPort(executorId: String, defaultHostPort: String): String = {
executorIdToHostPort.getOrElse(executorId, defaultHostPort)
@@ -112,6 +127,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
}
+
def newTaskId(): Long = nextTaskId.getAndIncrement()
override def start() {
@@ -163,7 +179,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
this.synchronized {
val manager = new TaskSetManager(this, taskSet)
activeTaskSets(taskSet.id) = manager
- activeTaskSetsQueue += manager
+ schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) {
@@ -186,7 +202,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def taskSetFinished(manager: TaskSetManager) {
this.synchronized {
activeTaskSets -= manager.taskSet.id
- activeTaskSetsQueue -= manager
+ manager.parent.removeSchedulable(manager)
+ logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id)
@@ -235,9 +252,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
map
}
var launchedTask = false
-
-
- for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
+ val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
+ for (manager <- sortedTaskSetQueue)
+ {
+ logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+ }
+ for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks.
val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
@@ -332,10 +352,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHostPort(hostPort) += execId
availableCpus(i) -= 1
launchedTask = true
-
+
case None => {}
+ }
}
- }
// Loop once more - when lastLoop = true, then we try to schedule task on all nodes irrespective of
// data locality (we still go in order of priority : but that would not change anything since
// if data local tasks had been available, we would have scheduled them already)
@@ -353,7 +373,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
-
+
if (tasks.size > 0) {
hasLaunchedTask = true
}
@@ -406,6 +426,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.reviveOffers()
}
if (taskFailed) {
+
// Also revive offers if a task had failed for some reason other than host lost
backend.reviveOffers()
}
@@ -452,9 +473,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def checkSpeculatableTasks() {
var shouldRevive = false
synchronized {
- for (ts <- activeTaskSetsQueue) {
- shouldRevive |= ts.checkSpeculatableTasks()
- }
+ shouldRevive = rootPool.checkSpeculatableTasks()
}
if (shouldRevive) {
backend.reviveOffers()
@@ -464,7 +483,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Check for pending tasks in all our active jobs.
def hasPendingTasks(): Boolean = {
synchronized {
- activeTaskSetsQueue.exists( _.hasPendingTasks() )
+ rootPool.hasPendingTasks()
}
}
@@ -503,14 +522,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
hostPortsAlive -= hostPort
hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
}
-
+
val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
execs -= executorId
if (execs.isEmpty) {
executorsByHostPort -= hostPort
}
executorIdToHostPort -= executorId
- activeTaskSetsQueue.foreach(_.executorLost(executorId, hostPort))
+ rootPool.executorLost(executorId, hostPort)
}
def executorGained(execId: String, hostPort: String) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
new file mode 100644
index 0000000000..941ba7a3f1
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -0,0 +1,104 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+import spark.Logging
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
+/**
+ * An Schedulable entity that represent collection of Pools or TaskSetManagers
+ */
+
+private[spark] class Pool(
+ val poolName: String,
+ val schedulingMode: SchedulingMode,
+ initMinShare: Int,
+ initWeight: Int)
+ extends Schedulable
+ with Logging {
+
+ var schedulableQueue = new ArrayBuffer[Schedulable]
+ var schedulableNameToSchedulable = new HashMap[String, Schedulable]
+
+ var weight = initWeight
+ var minShare = initMinShare
+ var runningTasks = 0
+
+ var priority = 0
+ var stageId = 0
+ var name = poolName
+ var parent:Schedulable = null
+
+ var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+ schedulingMode match {
+ case SchedulingMode.FAIR =>
+ new FairSchedulingAlgorithm()
+ case SchedulingMode.FIFO =>
+ new FIFOSchedulingAlgorithm()
+ }
+ }
+
+ override def addSchedulable(schedulable: Schedulable) {
+ schedulableQueue += schedulable
+ schedulableNameToSchedulable(schedulable.name) = schedulable
+ schedulable.parent= this
+ }
+
+ override def removeSchedulable(schedulable: Schedulable) {
+ schedulableQueue -= schedulable
+ schedulableNameToSchedulable -= schedulable.name
+ }
+
+ override def getSchedulableByName(schedulableName: String): Schedulable = {
+ if (schedulableNameToSchedulable.contains(schedulableName)) {
+ return schedulableNameToSchedulable(schedulableName)
+ }
+ for (schedulable <- schedulableQueue) {
+ var sched = schedulable.getSchedulableByName(schedulableName)
+ if (sched != null) {
+ return sched
+ }
+ }
+ return null
+ }
+
+ override def executorLost(executorId: String, host: String) {
+ schedulableQueue.foreach(_.executorLost(executorId, host))
+ }
+
+ override def checkSpeculatableTasks(): Boolean = {
+ var shouldRevive = false
+ for (schedulable <- schedulableQueue) {
+ shouldRevive |= schedulable.checkSpeculatableTasks()
+ }
+ return shouldRevive
+ }
+
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+ val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
+ for (schedulable <- sortedSchedulableQueue) {
+ sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
+ }
+ return sortedTaskSetQueue
+ }
+
+ override def increaseRunningTasks(taskNum: Int) {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
+ }
+
+ override def decreaseRunningTasks(taskNum: Int) {
+ runningTasks -= taskNum
+ if (parent != null) {
+ parent.decreaseRunningTasks(taskNum)
+ }
+ }
+
+ override def hasPendingTasks(): Boolean = {
+ schedulableQueue.exists(_.hasPendingTasks())
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
new file mode 100644
index 0000000000..2dd9c0564f
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -0,0 +1,27 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * An interface for schedulable entities.
+ * there are two type of Schedulable entities(Pools and TaskSetManagers)
+ */
+private[spark] trait Schedulable {
+ var parent: Schedulable
+ def weight: Int
+ def minShare: Int
+ def runningTasks: Int
+ def priority: Int
+ def stageId: Int
+ def name: String
+
+ def increaseRunningTasks(taskNum: Int): Unit
+ def decreaseRunningTasks(taskNum: Int): Unit
+ def addSchedulable(schedulable: Schedulable): Unit
+ def removeSchedulable(schedulable: Schedulable): Unit
+ def getSchedulableByName(name: String): Schedulable
+ def executorLost(executorId: String, host: String): Unit
+ def checkSpeculatableTasks(): Boolean
+ def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
+ def hasPendingTasks(): Boolean
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
new file mode 100644
index 0000000000..18cc15c2a5
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -0,0 +1,115 @@
+package spark.scheduler.cluster
+
+import java.io.{File, FileInputStream, FileOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.util.control.Breaks._
+import scala.xml._
+
+import spark.Logging
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
+import java.util.Properties
+
+/**
+ * An interface to build Schedulable tree
+ * buildPools: build the tree nodes(pools)
+ * addTaskSetManager: build the leaf nodes(TaskSetManagers)
+ */
+private[spark] trait SchedulableBuilder {
+ def buildPools()
+ def addTaskSetManager(manager: Schedulable, properties: Properties)
+}
+
+private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+
+ override def buildPools() {
+ //nothing
+ }
+
+ override def addTaskSetManager(manager: Schedulable, properties: Properties) {
+ rootPool.addSchedulable(manager)
+ }
+}
+
+private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+
+ val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+ val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
+ val DEFAULT_POOL_NAME = "default"
+ val MINIMUM_SHARES_PROPERTY = "minShare"
+ val SCHEDULING_MODE_PROPERTY = "schedulingMode"
+ val WEIGHT_PROPERTY = "weight"
+ val POOL_NAME_PROPERTY = "@name"
+ val POOLS_PROPERTY = "pool"
+ val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
+ val DEFAULT_MINIMUM_SHARE = 2
+ val DEFAULT_WEIGHT = 1
+
+ override def buildPools() {
+ val file = new File(schedulerAllocFile)
+ if (file.exists()) {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
+
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ }
+
+ //finally create "default" pool
+ if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
+ val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ rootPool.addSchedulable(pool)
+ logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
+ }
+ }
+
+ override def addTaskSetManager(manager: Schedulable, properties: Properties) {
+ var poolName = DEFAULT_POOL_NAME
+ var parentPool = rootPool.getSchedulableByName(poolName)
+ if (properties != null) {
+ poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
+ parentPool = rootPool.getSchedulableByName(poolName)
+ if (parentPool == null) {
+ //we will create a new pool that user has configured in app instead of being defined in xml file
+ parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ rootPool.addSchedulable(parentPool)
+ logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
+ }
+ }
+ parentPool.addSchedulable(manager)
+ logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
new file mode 100644
index 0000000000..a5d6285c99
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -0,0 +1,56 @@
+package spark.scheduler.cluster
+
+/**
+ * An interface for sort algorithm
+ * FIFO: FIFO algorithm between TaskSetManagers
+ * FS: FS algorithm between Pools, and FIFO or FS within Pools
+ */
+private[spark] trait SchedulingAlgorithm {
+ def comparator(s1: Schedulable, s2: Schedulable): Boolean
+}
+
+private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
+ override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
+ val priority1 = s1.priority
+ val priority2 = s2.priority
+ var res = Math.signum(priority1 - priority2)
+ if (res == 0) {
+ val stageId1 = s1.stageId
+ val stageId2 = s2.stageId
+ res = Math.signum(stageId1 - stageId2)
+ }
+ if (res < 0) {
+ return true
+ } else {
+ return false
+ }
+ }
+}
+
+private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
+ override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
+ val minShare1 = s1.minShare
+ val minShare2 = s2.minShare
+ val runningTasks1 = s1.runningTasks
+ val runningTasks2 = s2.runningTasks
+ val s1Needy = runningTasks1 < minShare1
+ val s2Needy = runningTasks2 < minShare2
+ val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble
+ val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble
+ val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
+ val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
+ var res:Boolean = true
+
+ if (s1Needy && !s2Needy) {
+ res = true
+ } else if (!s1Needy && s2Needy) {
+ res = false
+ } else if (s1Needy && s2Needy) {
+ res = minShareRatio1 <= minShareRatio2
+ } else {
+ res = taskToWeightRatio1 <= taskToWeightRatio2
+ }
+ return res
+ }
+}
+
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
new file mode 100644
index 0000000000..6e0c6793e0
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -0,0 +1,7 @@
+package spark.scheduler.cluster
+
+object SchedulingMode extends Enumeration("FAIR","FIFO"){
+
+ type SchedulingMode = Value
+ val FAIR,FIFO = Value
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index db5869db63..18d105e0a4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -54,7 +54,11 @@ private[spark] object TaskLocality extends Enumeration("PROCESS_LOCAL", "NODE_LO
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Logging {
+private[spark] class TaskSetManager(
+ sched: ClusterScheduler,
+ val taskSet: TaskSet)
+ extends Schedulable
+ with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -72,7 +76,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
- val priority = taskSet.priority
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
@@ -81,6 +84,14 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
+ var weight = 1
+ var minShare = 0
+ var runningTasks = 0
+ var priority = taskSet.priority
+ var stageId = taskSet.stageId
+ var name = "TaskSet_"+taskSet.stageId.toString
+ var parent:Schedulable = null
+
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -464,6 +475,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
+ increaseRunningTasks(1)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
@@ -498,6 +510,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
val index = info.index
info.markSuccessful()
+ decreaseRunningTasks(1)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
@@ -526,6 +539,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
val index = info.index
info.markFailed()
+ decreaseRunningTasks(1)
if (!finished(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
@@ -540,6 +554,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
finished(index) = true
tasksFinished += 1
sched.taskSetFinished(this)
+ decreaseRunningTasks(runningTasks)
return
case taskResultTooBig: TaskResultTooBigFailure =>
@@ -604,10 +619,44 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
+ decreaseRunningTasks(runningTasks)
sched.taskSetFinished(this)
}
- def executorLost(execId: String, hostPort: String) {
+ override def increaseRunningTasks(taskNum: Int) {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
+ }
+
+ override def decreaseRunningTasks(taskNum: Int) {
+ runningTasks -= taskNum
+ if (parent != null) {
+ parent.decreaseRunningTasks(taskNum)
+ }
+ }
+
+ //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+ override def getSchedulableByName(name: String): Schedulable = {
+ return null
+ }
+
+ override def addSchedulable(schedulable:Schedulable) {
+ //nothing
+ }
+
+ override def removeSchedulable(schedulable:Schedulable) {
+ //nothing
+ }
+
+ override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+ sortedTaskSetQueue += this
+ return sortedTaskSetQueue
+ }
+
+ override def executorLost(execId: String, hostPort: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// If some task has preferred locations only on hostname, and there are no more executors there,
@@ -653,7 +702,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
* we don't scan the whole task set. It might also help to make this sorted by launch time.
*/
- def checkSpeculatableTasks(): Boolean = {
+ override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished.
if (numTasks == 1 || tasksFinished == numTasks) {
return false
@@ -685,7 +734,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return foundTasks
}
- def hasPendingTasks(): Boolean = {
+ override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksFinished < numTasks
}
}
diff --git a/core/src/test/resources/fairscheduler.xml b/core/src/test/resources/fairscheduler.xml
new file mode 100644
index 0000000000..5a688b0ebb
--- /dev/null
+++ b/core/src/test/resources/fairscheduler.xml
@@ -0,0 +1,14 @@
+<allocations>
+<pool name="1">
+ <minShare>2</minShare>
+ <weight>1</weight>
+ <schedulingMode>FIFO</schedulingMode>
+</pool>
+<pool name="2">
+ <minShare>3</minShare>
+ <weight>1</weight>
+ <schedulingMode>FIFO</schedulingMode>
+</pool>
+<pool name="3">
+</pool>
+</allocations>
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
new file mode 100644
index 0000000000..a39418b716
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -0,0 +1,247 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+
+import spark._
+import spark.scheduler._
+import spark.scheduler.cluster._
+import scala.collection.mutable.ArrayBuffer
+
+import java.util.Properties
+
+class DummyTaskSetManager(
+ initPriority: Int,
+ initStageId: Int,
+ initNumTasks: Int,
+ clusterScheduler: ClusterScheduler,
+ taskSet: TaskSet)
+ extends TaskSetManager(clusterScheduler,taskSet) {
+
+ parent = null
+ weight = 1
+ minShare = 2
+ runningTasks = 0
+ priority = initPriority
+ stageId = initStageId
+ name = "TaskSet_"+stageId
+ override val numTasks = initNumTasks
+ tasksFinished = 0
+
+ override def increaseRunningTasks(taskNum: Int) {
+ runningTasks += taskNum
+ if (parent != null) {
+ parent.increaseRunningTasks(taskNum)
+ }
+ }
+
+ override def decreaseRunningTasks(taskNum: Int) {
+ runningTasks -= taskNum
+ if (parent != null) {
+ parent.decreaseRunningTasks(taskNum)
+ }
+ }
+
+ override def addSchedulable(schedulable: Schedulable) {
+ }
+
+ override def removeSchedulable(schedulable: Schedulable) {
+ }
+
+ override def getSchedulableByName(name: String): Schedulable = {
+ return null
+ }
+
+ override def executorLost(executorId: String, host: String): Unit = {
+ }
+
+ override def slaveOffer(execId: String, host: String, avaiableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+ if (tasksFinished + runningTasks < numTasks) {
+ increaseRunningTasks(1)
+ return Some(new TaskDescription(0, execId, "task 0:0", null))
+ }
+ return None
+ }
+
+ override def checkSpeculatableTasks(): Boolean = {
+ return true
+ }
+
+ def taskFinished() {
+ decreaseRunningTasks(1)
+ tasksFinished +=1
+ if (tasksFinished == numTasks) {
+ parent.removeSchedulable(this)
+ }
+ }
+
+ def abort() {
+ decreaseRunningTasks(runningTasks)
+ parent.removeSchedulable(this)
+ }
+}
+
+class DummyTask(stageId: Int) extends Task[Int](stageId)
+{
+ def run(attemptId: Long): Int = {
+ return 0
+ }
+}
+
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
+
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
+ new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
+ }
+
+ def resourceOffer(rootPool: Pool): Int = {
+ val taskSetQueue = rootPool.getSortedTaskSetQueue()
+ for (taskSet <- taskSetQueue)
+ {
+ taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+ case Some(task) =>
+ return taskSet.stageId
+ case None => {}
+ }
+ }
+ -1
+ }
+
+ def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
+ assert(resourceOffer(rootPool) === expectedTaskSetId)
+ }
+
+ test("FIFO Scheduler Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+ val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
+ schedulableBuilder.buildPools()
+
+ val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
+ val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
+ val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager0, null)
+ schedulableBuilder.addTaskSetManager(taskSetManager1, null)
+ schedulableBuilder.addTaskSetManager(taskSetManager2, null)
+
+ checkTaskSetId(rootPool, 0)
+ resourceOffer(rootPool)
+ checkTaskSetId(rootPool, 1)
+ resourceOffer(rootPool)
+ taskSetManager1.abort()
+ checkTaskSetId(rootPool, 2)
+ }
+
+ test("Fair Scheduler Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+ System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
+ val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+ schedulableBuilder.buildPools()
+
+ assert(rootPool.getSchedulableByName("default") != null)
+ assert(rootPool.getSchedulableByName("1") != null)
+ assert(rootPool.getSchedulableByName("2") != null)
+ assert(rootPool.getSchedulableByName("3") != null)
+ assert(rootPool.getSchedulableByName("1").minShare === 2)
+ assert(rootPool.getSchedulableByName("1").weight === 1)
+ assert(rootPool.getSchedulableByName("2").minShare === 3)
+ assert(rootPool.getSchedulableByName("2").weight === 1)
+ assert(rootPool.getSchedulableByName("3").minShare === 2)
+ assert(rootPool.getSchedulableByName("3").weight === 1)
+
+ val properties1 = new Properties()
+ properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
+ val properties2 = new Properties()
+ properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
+
+ val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
+ val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
+ val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
+ schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
+ schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
+
+ val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
+ val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
+ schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
+ schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
+
+ checkTaskSetId(rootPool, 0)
+ checkTaskSetId(rootPool, 3)
+ checkTaskSetId(rootPool, 3)
+ checkTaskSetId(rootPool, 1)
+ checkTaskSetId(rootPool, 4)
+ checkTaskSetId(rootPool, 2)
+ checkTaskSetId(rootPool, 2)
+ checkTaskSetId(rootPool, 4)
+
+ taskSetManager12.taskFinished()
+ assert(rootPool.getSchedulableByName("1").runningTasks === 3)
+ taskSetManager24.abort()
+ assert(rootPool.getSchedulableByName("2").runningTasks === 2)
+ }
+
+ test("Nested Pool Test") {
+ sc = new SparkContext("local", "ClusterSchedulerSuite")
+ val clusterScheduler = new ClusterScheduler(sc)
+ var tasks = ArrayBuffer[Task[_]]()
+ val task = new DummyTask(0)
+ tasks += task
+ val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+
+ val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
+ val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+ rootPool.addSchedulable(pool0)
+ rootPool.addSchedulable(pool1)
+
+ val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
+ val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+ pool0.addSchedulable(pool00)
+ pool0.addSchedulable(pool01)
+
+ val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
+ val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+ pool1.addSchedulable(pool10)
+ pool1.addSchedulable(pool11)
+
+ val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
+ val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
+ pool00.addSchedulable(taskSetManager000)
+ pool00.addSchedulable(taskSetManager001)
+
+ val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
+ val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
+ pool01.addSchedulable(taskSetManager010)
+ pool01.addSchedulable(taskSetManager011)
+
+ val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
+ val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
+ pool10.addSchedulable(taskSetManager100)
+ pool10.addSchedulable(taskSetManager101)
+
+ val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
+ val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
+ pool11.addSchedulable(taskSetManager110)
+ pool11.addSchedulable(taskSetManager111)
+
+ checkTaskSetId(rootPool, 0)
+ checkTaskSetId(rootPool, 4)
+ checkTaskSetId(rootPool, 6)
+ checkTaskSetId(rootPool, 2)
+ }
+}