aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-03-16 11:13:38 +0800
committerAndrew xia <junluan.xia@intel.com>2013-03-16 11:13:38 +0800
commit5892393140eb024a32585b6d5b51146ddde8f63a (patch)
tree2eeff8496102baa718d0cecc6e644ed5cf80bf06 /core
parent0b64e5f1ac0492aac6fca383c7877fbfce7d4cf1 (diff)
downloadspark-5892393140eb024a32585b6d5b51146ddde8f63a.tar.gz
spark-5892393140eb024a32585b6d5b51146ddde8f63a.tar.bz2
spark-5892393140eb024a32585b6d5b51146ddde8f63a.zip
refactor fair scheduler implementation
1.Chage "pool" properties to be the memeber of ActiveJob 2.Abstract the Schedulable of Pool and TaskSetManager 3.Abstract the FIFO and FS comparator algorithm 4.Miscellaneous changing of class define and construction
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala32
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala48
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala37
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala248
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala92
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala69
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala38
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala6
13 files changed, 415 insertions, 194 deletions
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index 5a4e9a582d..b6d3c2c089 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -2,6 +2,8 @@ package spark.scheduler
import spark.TaskContext
+import java.util.Properties
+
/**
* Tracks information about an active job in the DAGScheduler.
*/
@@ -11,7 +13,8 @@ private[spark] class ActiveJob(
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
- val listener: JobListener) {
+ val listener: JobListener,
+ val properties: Properties) {
val numPartitions = partitions.length
val finished = Array.fill[Boolean](numPartitions)(false)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 2ad73f3232..717cc27739 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -89,6 +89,8 @@ class DAGScheduler(
// stray messages to detect.
val failedGeneration = new HashMap[String, Long]
+ val idToActiveJob = new HashMap[Int, ActiveJob]
+
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
@@ -129,11 +131,11 @@ class DAGScheduler(
* The priority value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now).
*/
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int, properties: Properties): Stage = {
+ private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
- val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority, properties)
+ val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
@@ -144,7 +146,7 @@ class DAGScheduler(
* as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority.
*/
- private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int, properties: Properties): Stage = {
+ private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
if (shuffleDep != None) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
@@ -152,7 +154,7 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
}
val id = nextStageId.getAndIncrement()
- val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority, properties), priority, properties)
+ val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority)
idToStage(id) = stage
stageToInfos(stage) = StageInfo(stage)
stage
@@ -162,7 +164,7 @@ class DAGScheduler(
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided priority if they haven't already been created with a lower priority.
*/
- private def getParentStages(rdd: RDD[_], priority: Int, properties: Properties): List[Stage] = {
+ private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
@@ -173,7 +175,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- parents += getShuffleMapStage(shufDep, priority, properties)
+ parents += getShuffleMapStage(shufDep, priority)
case _ =>
visit(dep.rdd)
}
@@ -194,7 +196,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.priority, stage.properties)
+ val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
missing += mapStage
}
@@ -239,7 +241,8 @@ class DAGScheduler(
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
- resultHandler: (Int, U) => Unit, properties: Properties = null)
+ resultHandler: (Int, U) => Unit,
+ properties: Properties = null)
{
if (partitions.size == 0) {
return
@@ -260,7 +263,8 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
callSite: String,
- timeout: Long, properties: Properties = null)
+ timeout: Long,
+ properties: Properties = null)
: PartialResult[R] =
{
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
@@ -278,8 +282,8 @@ class DAGScheduler(
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
val runId = nextRunId.getAndIncrement()
- val finalStage = newStage(finalRDD, None, runId, properties)
- val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
+ val finalStage = newStage(finalRDD, None, runId)
+ val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
@@ -290,6 +294,7 @@ class DAGScheduler(
// Compute very short actions like first() or take() with no parent stages locally.
runLocally(job)
} else {
+ idToActiveJob(runId) = job
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
@@ -459,8 +464,9 @@ class DAGScheduler(
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
+ val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
- new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, stage.properties))
+ new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
@@ -665,7 +671,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.priority, stage.properties)
+ val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
visitedStages += mapStage
visit(mapStage.rdd)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 97afa27a60..bc54cd601d 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -1,7 +1,6 @@
package spark.scheduler
import java.net.URI
-import java.util.Properties
import spark._
import spark.storage.BlockManagerId
@@ -26,8 +25,7 @@ private[spark] class Stage(
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
- val priority: Int,
- val properties: Properties = null)
+ val priority: Int)
extends Logging {
val isShuffleMap = shuffleDep != None
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 5e960eb59d..092b0a0cfc 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -143,7 +143,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
- synchronized {
+ synchronized {
+
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
@@ -152,25 +153,33 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHost(o.hostname) = new HashSet()
}
}
-
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- val taskSetIds = taskSetQueuesManager.receiveOffer(tasks, offers)
- //We populate the necessary bookkeeping structures
- for (i <- 0 until offers.size) {
- val execId = offers(i).executorId
- val host = offers(i).hostname
- for(j <- 0 until tasks(i).size) {
- val tid = tasks(i)(j).taskId
- val taskSetid = taskSetIds(i)(j)
- taskIdToTaskSetId(tid) = taskSetid
- taskSetTaskIds(taskSetid) += tid
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHost(host) += execId
- }
+ val availableCpus = offers.map(o => o.cores).toArray
+ for (i <- 0 until offers.size)
+ {
+ var launchedTask = true
+ val execId = offers(i).executorId
+ val host = offers(i).hostname
+ while (availableCpus(i) > 0 && launchedTask)
+ {
+ launchedTask = false
+ taskSetQueuesManager.receiveOffer(execId,host,availableCpus(i)) match {
+ case Some(task) =>
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = task.taskSetId
+ taskSetTaskIds(task.taskSetId) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= 1
+ launchedTask = true
+
+ case None => {}
+ }
+ }
}
-
if (tasks.size > 0) {
hasLaunchedTask = true
}
@@ -219,10 +228,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
if (failedExecutor != None) {
- listener.executorLost(failedExecutor.get)
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
+
// Also revive offers if a task had failed for some reason other than host lost
backend.reviveOffers()
}
@@ -289,7 +299,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Call listener.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor != None) {
- listener.executorLost(failedExecutor.get)
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
index 99a9c94222..868b11c8d6 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
@@ -10,6 +10,7 @@ import spark.Logging
private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
+ val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm()
override def addTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue += manager
@@ -27,31 +28,19 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
- override def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]] = {
- val taskSetIds = offers.map(o => new ArrayBuffer[String](o.cores))
- val availableCpus = offers.map(o => o.cores).toArray
- var launchedTask = false
- for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
- do {
- launchedTask = false
- for (i <- 0 until offers.size) {
- val execId = offers(i).executorId
- val host = offers(i).hostname
- manager.slaveOffer(execId, host, availableCpus(i)) match {
- case Some(task) =>
- tasks(i) += task
- taskSetIds(i) += manager.taskSet.id
- availableCpus(i) -= 1
- launchedTask = true
-
- case None => {}
- }
- }
- } while (launchedTask)
+ override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] =
+ {
+ for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator))
+ {
+ val task = manager.slaveOffer(execId,host,avaiableCpus)
+ if (task != None)
+ {
+ return task
+ }
}
- return taskSetIds
+ return None
}
-
+
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (ts <- activeTaskSetsQueue) {
@@ -60,4 +49,4 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
return shouldRevive
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
index ca308a5229..4e26cedfda 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
@@ -10,174 +10,166 @@ import scala.util.control.Breaks._
import scala.xml._
import spark.Logging
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* A Fair Implementation of the TaskSetQueuesManager
*
- * The current implementation makes the following assumptions: A pool has a fixed configuration of weight.
- * Within a pool, it just uses FIFO.
- * Also, currently we assume that pools are statically defined
- * We currently don't support min shares
+ * Currently we support minShare,weight for fair scheduler between pools
+ * Within a pool, it supports FIFO or FS
+ * Also, currently we could allocate pools dynamically
+ *
*/
private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val poolNameToPool= new HashMap[String, Pool]
var pools = new ArrayBuffer[Pool]
+ val poolScheduleAlgorithm = new FairSchedulingAlgorithm()
+ val POOL_FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
+ val POOL_DEFAULT_POOL_NAME = "default"
+ val POOL_MINIMUM_SHARES_PROPERTY = "minShares"
+ val POOL_SCHEDULING_MODE_PROPERTY = "schedulingMode"
+ val POOL_WEIGHT_PROPERTY = "weight"
+ val POOL_POOL_NAME_PROPERTY = "@name"
+ val POOL_POOLS_PROPERTY = "pool"
+ val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
+ val POOL_DEFAULT_MINIMUM_SHARES = 2
+ val POOL_DEFAULT_WEIGHT = 1
loadPoolProperties()
def loadPoolProperties() {
//first check if the file exists
val file = new File(schedulerAllocFile)
- if(!file.exists()) {
- //if file does not exist, we just create 1 pool, default
- val pool = new Pool("default",100)
- pools += pool
- poolNameToPool("default") = pool
- logInfo("Created a default pool with weight = 100")
- }
- else {
+ if(file.exists())
+ {
val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ "pool")) {
- if((poolNode \ "weight").text != ""){
- val pool = new Pool((poolNode \ "@name").text,(poolNode \ "weight").text.toInt)
- pools += pool
- poolNameToPool((poolNode \ "@name").text) = pool
- logInfo("Created pool "+ pool.name +"with weight = "+pool.weight)
- } else {
- val pool = new Pool((poolNode \ "@name").text,100)
- pools += pool
- poolNameToPool((poolNode \ "@name").text) = pool
- logInfo("Created pool "+ pool.name +"with weight = 100")
+ for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
+ var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
+ var minShares = POOL_DEFAULT_MINIMUM_SHARES
+ var weight = POOL_DEFAULT_WEIGHT
+
+
+ val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
+ if( xmlSchedulingMode != "")
+ {
+ try
+ {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ }
+ catch{
+ case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
}
- }
- if(!poolNameToPool.contains("default")) {
- val pool = new Pool("default", 100)
+
+ val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
+ if(xmlMinShares != "")
+ {
+ minShares = xmlMinShares.toInt
+ }
+
+ val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
+ if(xmlWeight != "")
+ {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName,schedulingMode,minShares,weight)
pools += pool
- poolNameToPool("default") = pool
- logInfo("Created a default pool with weight = 100")
+ poolNameToPool(poolName) = pool
+ logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,schedulingMode,minShares,weight))
}
-
- }
+ }
+
+ if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
+ {
+ val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
+ pools += pool
+ poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
+ logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ }
}
override def addTaskSetManager(manager: TaskSetManager) {
- var poolName = "default"
- if(manager.taskSet.properties != null)
- poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
- if(poolNameToPool.contains(poolName))
- poolNameToPool(poolName).activeTaskSetsQueue += manager
- else
- poolNameToPool("default").activeTaskSetsQueue += manager
- logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
-
+ var poolName = POOL_DEFAULT_POOL_NAME
+ if(manager.taskSet.properties != null)
+ {
+ poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
+ if(!poolNameToPool.contains(poolName))
+ {
+ //we will create a new pool that user has configured in app,but not contained in xml file
+ val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
+ pools += pool
+ poolNameToPool(poolName) = pool
+ logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ }
+ }
+ poolNameToPool(poolName).addTaskSetManager(manager)
+ logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
}
override def removeTaskSetManager(manager: TaskSetManager) {
- var poolName = "default"
- if(manager.taskSet.properties != null)
- poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
- if(poolNameToPool.contains(poolName))
- poolNameToPool(poolName).activeTaskSetsQueue -= manager
- else
- poolNameToPool("default").activeTaskSetsQueue -= manager
+
+ var poolName = POOL_DEFAULT_POOL_NAME
+ if(manager.taskSet.properties != null)
+ {
+ poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
+ }
+ logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
+ val pool = poolNameToPool(poolName)
+ pool.removeTaskSetManager(manager)
+ pool.setRunningTasks(pool.getRunningTasks() - manager.getRunningTasks())
+
}
override def taskFinished(manager: TaskSetManager) {
- var poolName = "default"
- if(manager.taskSet.properties != null)
- poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
- if(poolNameToPool.contains(poolName))
- poolNameToPool(poolName).numRunningTasks -= 1
- else
- poolNameToPool("default").numRunningTasks -= 1
+ var poolName = POOL_DEFAULT_POOL_NAME
+ if(manager.taskSet.properties != null)
+ {
+ poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
+ }
+ val pool = poolNameToPool(poolName)
+ pool.setRunningTasks(pool.getRunningTasks() - 1)
+ manager.setRunningTasks(manager.getRunningTasks() - 1)
}
override def removeExecutor(executorId: String, host: String) {
for (pool <- pools) {
- pool.activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ pool.removeExecutor(executorId,host)
}
}
- /**
- * This is the comparison function used for sorting to determine which
- * pool to allocate next based on fairness.
- * The algorithm is as follows: we sort by the pool's running tasks to weight ratio
- * (pools number running tast / pool's weight)
- */
- def poolFairCompFn(pool1: Pool, pool2: Pool): Boolean = {
- val tasksToWeightRatio1 = pool1.numRunningTasks.toDouble / pool1.weight.toDouble
- val tasksToWeightRatio2 = pool2.numRunningTasks.toDouble / pool2.weight.toDouble
- var res = Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2)
- if (res == 0) {
- //Jobs are tied in fairness ratio. We break the tie by name
- res = pool1.name.compareTo(pool2.name)
- }
- if (res < 0)
- return true
- else
- return false
+ override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] =
+ {
+
+ val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
+ for(pool <- sortedPools)
+ {
+ logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.getMinShare(),pool.getRunningTasks()))
+ }
+ for (pool <- sortedPools)
+ {
+ val task = pool.receiveOffer(execId,host,avaiableCpus)
+ if(task != None)
+ {
+ pool.setRunningTasks(pool.getRunningTasks() + 1)
+ return task
+ }
+ }
+ return None
}
- override def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]] = {
- val taskSetIds = offers.map(o => new ArrayBuffer[String](o.cores))
- val availableCpus = offers.map(o => o.cores).toArray
- var launchedTask = false
-
- for (i <- 0 until offers.size) { //we loop through the list of offers
- val execId = offers(i).executorId
- val host = offers(i).hostname
- var breakOut = false
- while(availableCpus(i) > 0 && !breakOut) {
- breakable{
- launchedTask = false
- for (pool <- pools.sortWith(poolFairCompFn)) { //we loop through the list of pools
- if(!pool.activeTaskSetsQueue.isEmpty) {
- //sort the tasksetmanager in the pool
- pool.activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))
- for(manager <- pool.activeTaskSetsQueue) { //we loop through the activeTaskSets in this pool
- //Make an offer
- manager.slaveOffer(execId, host, availableCpus(i)) match {
- case Some(task) =>
- tasks(i) += task
- taskSetIds(i) += manager.taskSet.id
- availableCpus(i) -= 1
- pool.numRunningTasks += 1
- launchedTask = true
- logInfo("launched task for pool"+pool.name);
- break
- case None => {}
- }
- }
- }
- }
- //If there is not one pool that can assign the task then we have to exit the outer loop and continue to the next offer
- if(!launchedTask){
- breakOut = true
- }
- }
- }
- }
- return taskSetIds
- }
-
- override def checkSpeculatableTasks(): Boolean = {
+ override def checkSpeculatableTasks(): Boolean =
+ {
var shouldRevive = false
- for (pool <- pools) {
- for (ts <- pool.activeTaskSetsQueue) {
- shouldRevive |= ts.checkSpeculatableTasks()
- }
+ for (pool <- pools)
+ {
+ shouldRevive |= pool.checkSpeculatableTasks()
}
return shouldRevive
}
-}
-/**
- * An internal representation of a pool. It contains an ArrayBuffer of TaskSets and also weight
- */
-class Pool(val name: String, val weight: Int)
-{
- var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
- var numRunningTasks: Int = 0
-} \ No newline at end of file
+ }
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
new file mode 100644
index 0000000000..7b58a99582
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -0,0 +1,92 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Logging
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+/**
+ * An interface for
+ *
+ */
+private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,val minShare:Int, val weight:Int) extends Schedulable with Logging {
+
+ var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
+ var numRunningTasks: Int = 0
+ var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
+ {
+ schedulingMode match
+ {
+ case SchedulingMode.FAIR =>
+ val schedule = new FairSchedulingAlgorithm()
+ schedule
+ case SchedulingMode.FIFO =>
+ val schedule = new FIFOSchedulingAlgorithm()
+ schedule
+ }
+ }
+
+ override def getMinShare():Int =
+ {
+ return minShare
+ }
+
+ override def getRunningTasks():Int =
+ {
+ return numRunningTasks
+ }
+
+ def setRunningTasks(taskNum : Int)
+ {
+ numRunningTasks = taskNum
+ }
+
+ override def getWeight(): Int =
+ {
+ return weight
+ }
+
+ def addTaskSetManager(manager:TaskSetManager)
+ {
+ activeTaskSetsQueue += manager
+ }
+
+ def removeTaskSetManager(manager:TaskSetManager)
+ {
+ activeTaskSetsQueue -= manager
+ }
+
+ def removeExecutor(executorId: String, host: String)
+ {
+ activeTaskSetsQueue.foreach(_.executorLost(executorId,host))
+ }
+
+ def checkSpeculatableTasks(): Boolean =
+ {
+ var shouldRevive = false
+ for(ts <- activeTaskSetsQueue)
+ {
+ shouldRevive |= ts.checkSpeculatableTasks()
+ }
+ return shouldRevive
+ }
+
+ def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] =
+ {
+ val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
+ for(manager <- sortedActiveTasksSetQueue)
+ {
+
+ logDebug("taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(manager.taskSet.id,manager.numTasks,manager.getMinShare(),manager.getWeight(),manager.getRunningTasks()))
+ }
+ for(manager <- sortedActiveTasksSetQueue)
+ {
+ val task = manager.slaveOffer(execId,host,availableCpus)
+ if (task != None)
+ {
+ manager.setRunningTasks(manager.getRunningTasks() + 1)
+ return task
+ }
+ }
+ return None
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
new file mode 100644
index 0000000000..837f9c4983
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -0,0 +1,21 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
+ */
+private[spark] trait Schedulable {
+
+ def getMinShare(): Int
+ def getRunningTasks(): Int
+ def getPriority(): Int =
+ {
+ return 0
+ }
+ def getWeight(): Int
+ def getStageId(): Int =
+ {
+ return 0
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
new file mode 100644
index 0000000000..f8919e7374
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -0,0 +1,69 @@
+package spark.scheduler.cluster
+
+/**
+ * An interface for sort algorithm
+ * FIFO: FIFO algorithm for TaskSetManagers
+ * FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers
+ */
+private[spark] trait SchedulingAlgorithm {
+ def comparator(s1: Schedulable,s2: Schedulable): Boolean
+}
+
+private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
+{
+ override def comparator(s1: Schedulable, s2: Schedulable): Boolean =
+ {
+ val priority1 = s1.getPriority()
+ val priority2 = s2.getPriority()
+ var res = Math.signum(priority1 - priority2)
+ if (res == 0)
+ {
+ val stageId1 = s1.getStageId()
+ val stageId2 = s2.getStageId()
+ res = Math.signum(stageId1 - stageId2)
+ }
+ if (res < 0)
+ {
+ return true
+ }
+ else
+ {
+ return false
+ }
+ }
+}
+
+private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
+{
+ def comparator(s1: Schedulable, s2:Schedulable): Boolean =
+ {
+ val minShare1 = s1.getMinShare()
+ val minShare2 = s2.getMinShare()
+ val s1Needy = s1.getRunningTasks() < minShare1
+ val s2Needy = s2.getRunningTasks() < minShare2
+ val minShareRatio1 = s1.getRunningTasks().toDouble / Math.max(minShare1,1.0).toDouble
+ val minShareRatio2 = s2.getRunningTasks().toDouble / Math.max(minShare2,1.0).toDouble
+ val taskToWeightRatio1 = s1.getRunningTasks().toDouble / s1.getWeight().toDouble
+ val taskToWeightRatio2 = s2.getRunningTasks().toDouble / s2.getWeight().toDouble
+ var res:Boolean = true
+
+ if(s1Needy && !s2Needy)
+ {
+ res = true
+ }
+ else if(!s1Needy && s2Needy)
+ {
+ res = false
+ }
+ else if (s1Needy && s2Needy)
+ {
+ res = minShareRatio1 <= minShareRatio2
+ }
+ else
+ {
+ res = taskToWeightRatio1 <= taskToWeightRatio2
+ }
+ return res
+ }
+}
+
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
new file mode 100644
index 0000000000..6be4f3cd84
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -0,0 +1,8 @@
+package spark.scheduler.cluster
+
+object SchedulingMode extends Enumeration("FAIR","FIFO")
+{
+ type SchedulingMode = Value
+
+ val FAIR,FIFO = Value
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index b41e951be9..cdd004c94b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,6 +5,7 @@ import spark.util.SerializableBuffer
private[spark] class TaskDescription(
val taskId: Long,
+ val taskSetId: String,
val executorId: String,
val name: String,
_serializedTask: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 015092b60b..723c3b46bd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,7 +17,7 @@ import java.nio.ByteBuffer
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Logging {
+private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Schedulable with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
@@ -28,6 +28,9 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
+ val TASKSET_MINIMIUM_SHARES = 1
+
+ val TASKSET_WEIGHT = 1
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
@@ -43,6 +46,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
+ var numRunningTasks =0;
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -96,6 +100,36 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
+ override def getMinShare(): Int =
+ {
+ return TASKSET_MINIMIUM_SHARES
+ }
+
+ override def getRunningTasks(): Int =
+ {
+ return numRunningTasks
+ }
+
+ def setRunningTasks(taskNum :Int)
+ {
+ numRunningTasks = taskNum
+ }
+
+ override def getPriority(): Int =
+ {
+ return priority
+ }
+
+ override def getWeight(): Int =
+ {
+ return TASKSET_WEIGHT
+ }
+
+ override def getStageId(): Int =
+ {
+ return taskSet.stageId
+ }
+
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
@@ -222,7 +256,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
- return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId,taskSet.id,execId, taskName, serializedTask))
}
case _ =>
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
index b0c30e9e8b..c117ee7a85 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
@@ -12,8 +12,6 @@ private[spark] trait TaskSetQueuesManager {
def removeTaskSetManager(manager: TaskSetManager): Unit
def taskFinished(manager: TaskSetManager): Unit
def removeExecutor(executorId: String, host: String): Unit
- //The receiveOffers function, accepts tasks and offers. It populates the tasks to the actual task from TaskSet
- //It returns a list of TaskSet ID that corresponds to each assigned tasks
- def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]]
+ def receiveOffer(execId: String, host:String, avaiableCpus:Double):Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
-} \ No newline at end of file
+}