aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHarold Lim <harold@cs.duke.edu>2013-03-08 15:15:59 -0500
committerAndrew xia <junluan.xia@intel.com>2013-03-12 13:31:27 +0800
commitb5325182a3e92ff80185850c39cf70680fca46b7 (patch)
tree47fcbcc24965bf2c0c4e8b796acd44bb03537d18
parent54ed7c4af4591ebfec31bd168f830ef3ac01a41f (diff)
downloadspark-b5325182a3e92ff80185850c39cf70680fca46b7.tar.gz
spark-b5325182a3e92ff80185850c39cf70680fca46b7.tar.bz2
spark-b5325182a3e92ff80185850c39cf70680fca46b7.zip
Updated/Refactored the Fair Task Scheduler. It does not inherit ClusterScheduler anymore. Rather, ClusterScheduler internally uses TaskSetQueuesManager that handles the scheduling of taskset queues. This is the class that should be extended to support other scheduling policies
-rw-r--r--core/src/main/scala/spark/SparkContext.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala43
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala63
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala183
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/fair/FairClusterScheduler.scala341
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/fair/FairTaskSetManager.scala130
8 files changed, 312 insertions, 488 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bd2261cf0d..f6ee399898 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -39,7 +39,7 @@ import spark.partial.PartialResult
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler._
import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
+import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler, TaskSetQueuesManager}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
@@ -77,7 +77,7 @@ class SparkContext(
//Set the default task scheduler
if (System.getProperty("spark.cluster.taskscheduler") == null) {
- System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.ClusterScheduler")
+ System.setProperty("spark.cluster.taskscheduler", "spark.scheduler.cluster.FIFOTaskSetQueuesManager")
}
private val isLocal = (master == "local" || master.startsWith("local["))
@@ -144,9 +144,10 @@ class SparkContext(
new LocalScheduler(threads.toInt, maxFailures.toInt, this)
case SPARK_REGEX(sparkUrl) =>
- val scheduler = Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
+ val scheduler = new ClusterScheduler(this)//Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- scheduler.initialize(backend)
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ scheduler.initialize(backend, taskSetQueuesManager)
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
@@ -160,12 +161,13 @@ class SparkContext(
memoryPerSlaveInt, sparkMemEnvInt))
}
- val scheduler = Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
+ val scheduler = new ClusterScheduler(this)//Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- scheduler.initialize(backend)
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ scheduler.initialize(backend, taskSetQueuesManager)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
@@ -176,7 +178,7 @@ class SparkContext(
logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))
}
MesosNativeLibrary.load()
- val scheduler = Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
+ val scheduler = new ClusterScheduler(this)//Class.forName(System.getProperty("spark.cluster.taskscheduler")).getConstructors()(0).newInstance(Array[AnyRef](this):_*).asInstanceOf[ClusterScheduler]
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos://
val backend = if (coarseGrained) {
@@ -184,7 +186,8 @@ class SparkContext(
} else {
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
}
- scheduler.initialize(backend)
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ scheduler.initialize(backend, taskSetQueuesManager)
scheduler
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 26fdef101b..0b5bf7a86c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -27,7 +27,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
- var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
+ // var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
@@ -61,13 +61,16 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
+
+ var taskSetQueuesManager: TaskSetQueuesManager = null
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
- def initialize(context: SchedulerBackend) {
+ def initialize(context: SchedulerBackend, taskSetQueuesManager: TaskSetQueuesManager) {
backend = context
+ this.taskSetQueuesManager = taskSetQueuesManager
}
def newTaskId(): Long = nextTaskId.getAndIncrement()
@@ -99,7 +102,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
this.synchronized {
val manager = new TaskSetManager(this, taskSet)
activeTaskSets(taskSet.id) = manager
- activeTaskSetsQueue += manager
+ taskSetQueuesManager.addTaskSetManager(manager)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) {
@@ -122,13 +125,19 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def taskSetFinished(manager: TaskSetManager) {
this.synchronized {
activeTaskSets -= manager.taskSet.id
- activeTaskSetsQueue -= manager
+ taskSetQueuesManager.removeTaskSetManager(manager)
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id)
}
}
+ def taskFinished(manager: TaskSetManager) {
+ this.synchronized {
+ taskSetQueuesManager.taskFinished(manager)
+ }
+ }
+
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
@@ -144,8 +153,26 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHost(o.hostname) = new HashSet()
}
}
+
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ val taskSetIds = taskSetQueuesManager.receiveOffer(tasks, offers)
+ //We populate the necessary bookkeeping structures
+ for (i <- 0 until offers.size) {
+ val execId = offers(i).executorId
+ val host = offers(i).hostname
+ for(j <- 0 until tasks(i).size) {
+ val tid = tasks(i)(j).taskId
+ val taskSetid = taskSetIds(i)(j)
+ taskIdToTaskSetId(tid) = taskSetid
+ taskSetTaskIds(taskSetid) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ }
+ }
+
+ /*val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
var launchedTask = false
for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
@@ -170,7 +197,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
} while (launchedTask)
- }
+ }*/
if (tasks.size > 0) {
hasLaunchedTask = true
}
@@ -264,9 +291,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def checkSpeculatableTasks() {
var shouldRevive = false
synchronized {
- for (ts <- activeTaskSetsQueue) {
- shouldRevive |= ts.checkSpeculatableTasks()
- }
+ shouldRevive = taskSetQueuesManager.checkSpeculatableTasks()
}
if (shouldRevive) {
backend.reviveOffers()
@@ -309,6 +334,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHost -= host
}
executorIdToHost -= executorId
- activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ taskSetQueuesManager.removeExecutor(executorId, host)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
new file mode 100644
index 0000000000..99a9c94222
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
@@ -0,0 +1,63 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Logging
+
+/**
+ * A FIFO Implementation of the TaskSetQueuesManager
+ */
+private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
+
+ var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
+
+ override def addTaskSetManager(manager: TaskSetManager) {
+ activeTaskSetsQueue += manager
+ }
+
+ override def removeTaskSetManager(manager: TaskSetManager) {
+ activeTaskSetsQueue -= manager
+ }
+
+ override def taskFinished(manager: TaskSetManager) {
+ //do nothing
+ }
+
+ override def removeExecutor(executorId: String, host: String) {
+ activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ }
+
+ override def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]] = {
+ val taskSetIds = offers.map(o => new ArrayBuffer[String](o.cores))
+ val availableCpus = offers.map(o => o.cores).toArray
+ var launchedTask = false
+ for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
+ do {
+ launchedTask = false
+ for (i <- 0 until offers.size) {
+ val execId = offers(i).executorId
+ val host = offers(i).hostname
+ manager.slaveOffer(execId, host, availableCpus(i)) match {
+ case Some(task) =>
+ tasks(i) += task
+ taskSetIds(i) += manager.taskSet.id
+ availableCpus(i) -= 1
+ launchedTask = true
+
+ case None => {}
+ }
+ }
+ } while (launchedTask)
+ }
+ return taskSetIds
+ }
+
+ override def checkSpeculatableTasks(): Boolean = {
+ var shouldRevive = false
+ for (ts <- activeTaskSetsQueue) {
+ shouldRevive |= ts.checkSpeculatableTasks()
+ }
+ return shouldRevive
+ }
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
new file mode 100644
index 0000000000..ca308a5229
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
@@ -0,0 +1,183 @@
+package spark.scheduler.cluster
+
+import java.io.{File, FileInputStream, FileOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.util.control.Breaks._
+import scala.xml._
+
+import spark.Logging
+
+/**
+ * A Fair Implementation of the TaskSetQueuesManager
+ *
+ * The current implementation makes the following assumptions: A pool has a fixed configuration of weight.
+ * Within a pool, it just uses FIFO.
+ * Also, currently we assume that pools are statically defined
+ * We currently don't support min shares
+ */
+private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
+
+ val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+ val poolNameToPool= new HashMap[String, Pool]
+ var pools = new ArrayBuffer[Pool]
+
+ loadPoolProperties()
+
+ def loadPoolProperties() {
+ //first check if the file exists
+ val file = new File(schedulerAllocFile)
+ if(!file.exists()) {
+ //if file does not exist, we just create 1 pool, default
+ val pool = new Pool("default",100)
+ pools += pool
+ poolNameToPool("default") = pool
+ logInfo("Created a default pool with weight = 100")
+ }
+ else {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ "pool")) {
+ if((poolNode \ "weight").text != ""){
+ val pool = new Pool((poolNode \ "@name").text,(poolNode \ "weight").text.toInt)
+ pools += pool
+ poolNameToPool((poolNode \ "@name").text) = pool
+ logInfo("Created pool "+ pool.name +"with weight = "+pool.weight)
+ } else {
+ val pool = new Pool((poolNode \ "@name").text,100)
+ pools += pool
+ poolNameToPool((poolNode \ "@name").text) = pool
+ logInfo("Created pool "+ pool.name +"with weight = 100")
+ }
+ }
+ if(!poolNameToPool.contains("default")) {
+ val pool = new Pool("default", 100)
+ pools += pool
+ poolNameToPool("default") = pool
+ logInfo("Created a default pool with weight = 100")
+ }
+
+ }
+ }
+
+ override def addTaskSetManager(manager: TaskSetManager) {
+ var poolName = "default"
+ if(manager.taskSet.properties != null)
+ poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
+ if(poolNameToPool.contains(poolName))
+ poolNameToPool(poolName).activeTaskSetsQueue += manager
+ else
+ poolNameToPool("default").activeTaskSetsQueue += manager
+ logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
+
+ }
+
+ override def removeTaskSetManager(manager: TaskSetManager) {
+ var poolName = "default"
+ if(manager.taskSet.properties != null)
+ poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
+ if(poolNameToPool.contains(poolName))
+ poolNameToPool(poolName).activeTaskSetsQueue -= manager
+ else
+ poolNameToPool("default").activeTaskSetsQueue -= manager
+ }
+
+ override def taskFinished(manager: TaskSetManager) {
+ var poolName = "default"
+ if(manager.taskSet.properties != null)
+ poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
+ if(poolNameToPool.contains(poolName))
+ poolNameToPool(poolName).numRunningTasks -= 1
+ else
+ poolNameToPool("default").numRunningTasks -= 1
+ }
+
+ override def removeExecutor(executorId: String, host: String) {
+ for (pool <- pools) {
+ pool.activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ }
+ }
+
+ /**
+ * This is the comparison function used for sorting to determine which
+ * pool to allocate next based on fairness.
+ * The algorithm is as follows: we sort by the pool's running tasks to weight ratio
+ * (pools number running tast / pool's weight)
+ */
+ def poolFairCompFn(pool1: Pool, pool2: Pool): Boolean = {
+ val tasksToWeightRatio1 = pool1.numRunningTasks.toDouble / pool1.weight.toDouble
+ val tasksToWeightRatio2 = pool2.numRunningTasks.toDouble / pool2.weight.toDouble
+ var res = Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2)
+ if (res == 0) {
+ //Jobs are tied in fairness ratio. We break the tie by name
+ res = pool1.name.compareTo(pool2.name)
+ }
+ if (res < 0)
+ return true
+ else
+ return false
+ }
+
+ override def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]] = {
+ val taskSetIds = offers.map(o => new ArrayBuffer[String](o.cores))
+ val availableCpus = offers.map(o => o.cores).toArray
+ var launchedTask = false
+
+ for (i <- 0 until offers.size) { //we loop through the list of offers
+ val execId = offers(i).executorId
+ val host = offers(i).hostname
+ var breakOut = false
+ while(availableCpus(i) > 0 && !breakOut) {
+ breakable{
+ launchedTask = false
+ for (pool <- pools.sortWith(poolFairCompFn)) { //we loop through the list of pools
+ if(!pool.activeTaskSetsQueue.isEmpty) {
+ //sort the tasksetmanager in the pool
+ pool.activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))
+ for(manager <- pool.activeTaskSetsQueue) { //we loop through the activeTaskSets in this pool
+ //Make an offer
+ manager.slaveOffer(execId, host, availableCpus(i)) match {
+ case Some(task) =>
+ tasks(i) += task
+ taskSetIds(i) += manager.taskSet.id
+ availableCpus(i) -= 1
+ pool.numRunningTasks += 1
+ launchedTask = true
+ logInfo("launched task for pool"+pool.name);
+ break
+ case None => {}
+ }
+ }
+ }
+ }
+ //If there is not one pool that can assign the task then we have to exit the outer loop and continue to the next offer
+ if(!launchedTask){
+ breakOut = true
+ }
+ }
+ }
+ }
+ return taskSetIds
+ }
+
+ override def checkSpeculatableTasks(): Boolean = {
+ var shouldRevive = false
+ for (pool <- pools) {
+ for (ts <- pool.activeTaskSetsQueue) {
+ shouldRevive |= ts.checkSpeculatableTasks()
+ }
+ }
+ return shouldRevive
+ }
+}
+
+/**
+ * An internal representation of a pool. It contains an ArrayBuffer of TaskSets and also weight
+ */
+class Pool(val name: String, val weight: Int)
+{
+ var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
+ var numRunningTasks: Int = 0
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index c9f2c48804..015092b60b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -253,6 +253,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
val index = info.index
info.markSuccessful()
+ sched.taskFinished(this)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
@@ -281,6 +282,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
val index = info.index
info.markFailed()
+ sched.taskFinished(this)
if (!finished(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
new file mode 100644
index 0000000000..b0c30e9e8b
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
@@ -0,0 +1,19 @@
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * An interface for managing TaskSet queue/s that allows plugging different policy for
+ * offering tasks to resources
+ *
+ */
+private[spark] trait TaskSetQueuesManager {
+ def addTaskSetManager(manager: TaskSetManager): Unit
+ def removeTaskSetManager(manager: TaskSetManager): Unit
+ def taskFinished(manager: TaskSetManager): Unit
+ def removeExecutor(executorId: String, host: String): Unit
+ //The receiveOffers function, accepts tasks and offers. It populates the tasks to the actual task from TaskSet
+ //It returns a list of TaskSet ID that corresponds to each assigned tasks
+ def receiveOffer(tasks: Seq[ArrayBuffer[TaskDescription]], offers: Seq[WorkerOffer]): Seq[Seq[String]]
+ def checkSpeculatableTasks(): Boolean
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/cluster/fair/FairClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/fair/FairClusterScheduler.scala
deleted file mode 100644
index 591736faa2..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/fair/FairClusterScheduler.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-package spark.scheduler.cluster.fair
-
-import java.io.{File, FileInputStream, FileOutputStream}
-import java.util.{TimerTask, Timer}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.util.control.Breaks._
-import scala.xml._
-
-import spark._
-import spark.TaskState.TaskState
-import spark.scheduler._
-import spark.scheduler.cluster._
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
-import scala.io.Source
-
-/**
- * An implementation of a fair TaskScheduler, for running tasks on a cluster. Clients should first call
- * start(), then submit task sets through the runTasks method.
- *
- * The current implementation makes the following assumptions: A pool has a fixed configuration of weight.
- * Within a pool, it just uses FIFO.
- * Also, currently we assume that pools are statically defined
- * We currently don't support min shares
- */
-private[spark] class FairClusterScheduler(override val sc: SparkContext)
- extends ClusterScheduler(sc)
- with Logging {
-
-
- val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
-
- val poolNameToPool= new HashMap[String, Pool]
- var pools = new ArrayBuffer[Pool]
-
- loadPoolProperties()
-
- def loadPoolProperties() {
- //first check if the file exists
- val file = new File(schedulerAllocFile)
- if(!file.exists()) {
- //if file does not exist, we just create 1 pool, default
- val pool = new Pool("default",100)
- pools += pool
- poolNameToPool("default") = pool
- logInfo("Created a default pool with weight = 100")
- }
- else {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ "pool")) {
- if((poolNode \ "weight").text != ""){
- val pool = new Pool((poolNode \ "@name").text,(poolNode \ "weight").text.toInt)
- pools += pool
- poolNameToPool((poolNode \ "@name").text) = pool
- logInfo("Created pool "+ pool.name +"with weight = "+pool.weight)
- } else {
- val pool = new Pool((poolNode \ "@name").text,100)
- pools += pool
- poolNameToPool((poolNode \ "@name").text) = pool
- logInfo("Created pool "+ pool.name +"with weight = 100")
- }
- }
- if(!poolNameToPool.contains("default")) {
- val pool = new Pool("default", 100)
- pools += pool
- poolNameToPool("default") = pool
- logInfo("Created a default pool with weight = 100")
- }
-
- }
- }
-
- def taskFinished(manager: TaskSetManager) {
- var poolName = "default"
- if(manager.taskSet.properties != null)
- poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
-
- this.synchronized {
- //have to check that poolName exists
- if(poolNameToPool.contains(poolName))
- {
- poolNameToPool(poolName).numRunningTasks -= 1
- }
- else
- {
- poolNameToPool("default").numRunningTasks -= 1
- }
- }
- }
-
- override def submitTasks(taskSet: TaskSet) {
- val tasks = taskSet.tasks
-
-
- var poolName = "default"
- if(taskSet.properties != null)
- poolName = taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
-
- this.synchronized {
- if(poolNameToPool.contains(poolName))
- {
- val manager = new FairTaskSetManager(this, taskSet)
- poolNameToPool(poolName).activeTaskSetsQueue += manager
- activeTaskSets(taskSet.id) = manager
- //activeTaskSetsQueue += manager
- taskSetTaskIds(taskSet.id) = new HashSet[Long]()
- logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks to pool "+poolName)
- }
- else //If the pool name does not exists, where do we put them? We put them in default
- {
- val manager = new FairTaskSetManager(this, taskSet)
- poolNameToPool("default").activeTaskSetsQueue += manager
- activeTaskSets(taskSet.id) = manager
- //activeTaskSetsQueue += manager
- taskSetTaskIds(taskSet.id) = new HashSet[Long]()
- logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks to pool default")
- }
- if (hasReceivedTask == false) {
- starvationTimer.scheduleAtFixedRate(new TimerTask() {
- override def run() {
- if (!hasLaunchedTask) {
- logWarning("Initial job has not accepted any resources; " +
- "check your cluster UI to ensure that workers are registered")
- } else {
- this.cancel()
- }
- }
- }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
- }
- hasReceivedTask = true;
-
- }
- backend.reviveOffers()
- }
-
- override def taskSetFinished(manager: TaskSetManager) {
-
- var poolName = "default"
- if(manager.taskSet.properties != null)
- poolName = manager.taskSet.properties.getProperty("spark.scheduler.cluster.fair.pool","default")
-
-
- this.synchronized {
- //have to check that poolName exists
- if(poolNameToPool.contains(poolName))
- {
- poolNameToPool(poolName).activeTaskSetsQueue -= manager
- }
- else
- {
- poolNameToPool("default").activeTaskSetsQueue -= manager
- }
- //activeTaskSetsQueue -= manager
- activeTaskSets -= manager.taskSet.id
- taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
- taskSetTaskIds.remove(manager.taskSet.id)
- }
- //backend.reviveOffers()
- }
-
- /**
- * This is the comparison function used for sorting to determine which
- * pool to allocate next based on fairness.
- * The algorithm is as follows: we sort by the pool's running tasks to weight ratio
- * (pools number running tast / pool's weight)
- */
- def poolFairCompFn(pool1: Pool, pool2: Pool): Boolean = {
- val tasksToWeightRatio1 = pool1.numRunningTasks.toDouble / pool1.weight.toDouble
- val tasksToWeightRatio2 = pool2.numRunningTasks.toDouble / pool2.weight.toDouble
- var res = Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2)
- if (res == 0) {
- //Jobs are tied in fairness ratio. We break the tie by name
- res = pool1.name.compareTo(pool2.name)
- }
- if (res < 0)
- return true
- else
- return false
- }
-
- /**
- * Called by cluster manager to offer resources on slaves. We respond by asking our active task
- * sets for tasks in order of priority. We fill each node with tasks in a fair manner so
- * that tasks are balanced across the cluster.
- */
- override def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
- synchronized {
- SparkEnv.set(sc.env)
- // Mark each slave as alive and remember its hostname
- for (o <- offers) {
- executorIdToHost(o.executorId) = o.hostname
- if (!executorsByHost.contains(o.hostname)) {
- executorsByHost(o.hostname) = new HashSet()
- }
- }
- // Build a list of tasks to assign to each slave
- val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- val availableCpus = offers.map(o => o.cores).toArray
- var launchedTask = false
-
- for (i <- 0 until offers.size) { //we loop through the list of offers
- val execId = offers(i).executorId
- val host = offers(i).hostname
- var breakOut = false
- while(availableCpus(i) > 0 && !breakOut) {
- breakable{
- launchedTask = false
- for (pool <- pools.sortWith(poolFairCompFn)) { //we loop through the list of pools
- if(!pool.activeTaskSetsQueue.isEmpty) {
- //sort the tasksetmanager in the pool
- pool.activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))
- for(manager <- pool.activeTaskSetsQueue) { //we loop through the activeTaskSets in this pool
-// val manager = pool.activeTaskSetsQueue.head
- //Make an offer
- manager.slaveOffer(execId, host, availableCpus(i)) match {
- case Some(task) =>
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetId(tid) = manager.taskSet.id
- taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHost(host) += execId
- availableCpus(i) -= 1
- pool.numRunningTasks += 1
- launchedTask = true
- logInfo("launched task for pool"+pool.name);
- break
- case None => {}
- }
- }
- }
- }
- //If there is not one pool that can assign the task then we have to exit the outer loop and continue to the next offer
- if(!launchedTask){
- breakOut = true
- }
- }
- }
- }
- if (tasks.size > 0) {
- hasLaunchedTask = true
- }
- return tasks
- }
- }
-
- override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- var taskSetToUpdate: Option[TaskSetManager] = None
- var failedExecutor: Option[String] = None
- var taskFailed = false
- synchronized {
- try {
- if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
- // We lost this entire executor, so remember that it's gone
- val execId = taskIdToExecutorId(tid)
- if (activeExecutorIds.contains(execId)) {
- removeExecutor(execId)
- failedExecutor = Some(execId)
- }
- }
- taskIdToTaskSetId.get(tid) match {
- case Some(taskSetId) =>
- if (activeTaskSets.contains(taskSetId)) {
- taskSetToUpdate = Some(activeTaskSets(taskSetId))
- }
- if (TaskState.isFinished(state)) {
- taskIdToTaskSetId.remove(tid)
- if (taskSetTaskIds.contains(taskSetId)) {
- taskSetTaskIds(taskSetId) -= tid
- }
- taskIdToExecutorId.remove(tid)
- }
- if (state == TaskState.FAILED) {
- taskFailed = true
- }
- case None =>
- logInfo("Ignoring update from TID " + tid + " because its task set is gone")
- }
- } catch {
- case e: Exception => logError("Exception in statusUpdate", e)
- }
- }
- // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
- if (taskSetToUpdate != None) {
- taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
- }
- if (failedExecutor != None) {
- listener.executorLost(failedExecutor.get)
- backend.reviveOffers()
- }
- if (taskFailed) {
- // Also revive offers if a task had failed for some reason other than host lost
- backend.reviveOffers()
- }
- }
-
- // Check for speculatable tasks in all our active jobs.
- override def checkSpeculatableTasks() {
- var shouldRevive = false
- synchronized {
- for (pool <- pools) {
- for (ts <- pool.activeTaskSetsQueue) {
- shouldRevive |= ts.checkSpeculatableTasks()
- }
- }
- }
- if (shouldRevive) {
- backend.reviveOffers()
- }
- }
-
- /** Remove an executor from all our data structures and mark it as lost */
- private def removeExecutor(executorId: String) {
- activeExecutorIds -= executorId
- val host = executorIdToHost(executorId)
- val execs = executorsByHost.getOrElse(host, new HashSet)
- execs -= executorId
- if (execs.isEmpty) {
- executorsByHost -= host
- }
- executorIdToHost -= executorId
- for (pool <- pools) {
- pool.activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
- }
- }
-
-}
-
-/**
- * An internal representation of a pool. It contains an ArrayBuffer of TaskSets and also weight and minshare
- */
-class Pool(val name: String, val weight: Int)
-{
- var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
- var numRunningTasks: Int = 0
-}
diff --git a/core/src/main/scala/spark/scheduler/cluster/fair/FairTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/fair/FairTaskSetManager.scala
deleted file mode 100644
index 4b0277d2d5..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/fair/FairTaskSetManager.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-package spark.scheduler.cluster.fair
-
-import scala.collection.mutable.ArrayBuffer
-
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster._
-import spark.TaskState.TaskState
-import java.nio.ByteBuffer
-
-/**
- * Schedules the tasks within a single TaskSet in the FairClusterScheduler.
- */
-private[spark] class FairTaskSetManager(sched: FairClusterScheduler, override val taskSet: TaskSet) extends TaskSetManager(sched, taskSet) with Logging {
-
- // Add a task to all the pending-task lists that it should be on.
- private def addPendingTask(index: Int) {
- val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
- if (locations.size == 0) {
- pendingTasksWithNoPrefs += index
- } else {
- for (host <- locations) {
- val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
- list += index
- }
- }
- allPendingTasks += index
- }
-
- override def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- val info = taskInfos(tid)
- if (info.failed) {
- // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
- // or even from Mesos itself when acks get delayed.
- return
- }
- val index = info.index
- info.markSuccessful()
- sched.taskFinished(this)
- if (!finished(index)) {
- tasksFinished += 1
- logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
- tid, info.duration, tasksFinished, numTasks))
- // Deserialize task result and pass it to the scheduler
- val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
- result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
- // Mark finished and stop if we've finished all the tasks
- finished(index) = true
- if (tasksFinished == numTasks) {
- sched.taskSetFinished(this)
- }
- } else {
- logInfo("Ignoring task-finished event for TID " + tid +
- " because task " + index + " is already finished")
- }
- }
-
- override def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- val info = taskInfos(tid)
- if (info.failed) {
- // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
- // or even from Mesos itself when acks get delayed.
- return
- }
- val index = info.index
- info.markFailed()
- //Bookkeeping necessary for the pools in the scheduler
- sched.taskFinished(this)
- if (!finished(index)) {
- logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
- copiesRunning(index) -= 1
- // Check if the problem is a map output fetch failure. In that case, this
- // task will never succeed on any node, so tell the scheduler about it.
- if (serializedData != null && serializedData.limit() > 0) {
- val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader)
- reason match {
- case fetchFailed: FetchFailed =>
- logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
- sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
- finished(index) = true
- tasksFinished += 1
- sched.taskSetFinished(this)
- return
-
- case ef: ExceptionFailure =>
- val key = ef.exception.toString
- val now = System.currentTimeMillis
- val (printFull, dupCount) = {
- if (recentExceptions.contains(key)) {
- val (dupCount, printTime) = recentExceptions(key)
- if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
- recentExceptions(key) = (0, now)
- (true, 0)
- } else {
- recentExceptions(key) = (dupCount + 1, printTime)
- (false, dupCount + 1)
- }
- } else {
- recentExceptions(key) = (0, now)
- (true, 0)
- }
- }
- if (printFull) {
- val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
- } else {
- logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
- }
-
- case _ => {}
- }
- }
- // On non-fetch failures, re-enqueue the task as pending for a max number of retries
- addPendingTask(index)
- // Count failed attempts only on FAILED and LOST state (not on KILLED)
- if (state == TaskState.FAILED || state == TaskState.LOST) {
- numFailures(index) += 1
- if (numFailures(index) > MAX_TASK_FAILURES) {
- logError("Task %s:%d failed more than %d times; aborting job".format(
- taskSet.id, index, MAX_TASK_FAILURES))
- abort("Task %s:%d failed more than %d times".format(taskSet.id, index, MAX_TASK_FAILURES))
- }
- }
- } else {
- logInfo("Ignoring task-lost event for TID " + tid +
- " because task " + index + " is already finished")
- }
- }
-} \ No newline at end of file