aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-05-17 05:10:38 +0800
committerAndrew xia <junluan.xia@intel.com>2013-05-17 05:10:38 +0800
commitc6e2770bfe940a4f4f26f75c9ba228faea7316f0 (patch)
tree02da2f9a2b2987d0641d4ffaf72357ecf264e319
parent8436bd5d4a96480ac1871330a28d9d712e64959d (diff)
downloadspark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.tar.gz
spark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.tar.bz2
spark-c6e2770bfe940a4f4f26f75c9ba228faea7316f0.zip
Fix ClusterScheduler bug to avoid allocating tasks to same slave
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala48
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala8
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala46
5 files changed, 75 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1a300c9e8c..4caafcc1d3 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -164,27 +164,35 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
- for (i <- 0 until offers.size) {
- var launchedTask = true
- val execId = offers(i).executorId
- val host = offers(i).hostname
- while (availableCpus(i) > 0 && launchedTask) {
+ var launchedTask = false
+ val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
+ for (schedulable <- sortedLeafSchedulable)
+ {
+ logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
+ }
+ for (schedulable <- sortedLeafSchedulable) {
+ do {
launchedTask = false
- rootPool.receiveOffer(execId,host,availableCpus(i)) match {
- case Some(task) =>
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetId(tid) = task.taskSetId
- taskSetTaskIds(task.taskSetId) += tid
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHost(host) += execId
- availableCpus(i) -= 1
- launchedTask = true
-
- case None => {}
- }
- }
+ for (i <- 0 until offers.size) {
+ var launchedTask = true
+ val execId = offers(i).executorId
+ val host = offers(i).hostname
+ schedulable.slaveOffer(execId,host,availableCpus(i)) match {
+ case Some(task) =>
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = task.taskSetId
+ taskSetTaskIds(task.taskSetId) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= 1
+ launchedTask = true
+
+ case None => {}
+ }
+ }
+ } while(launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index d5482f71ad..ae603e7dd9 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -75,19 +75,17 @@ private[spark] class Pool(
return shouldRevive
}
- override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ return None
+ }
+
+ override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+ var leafSchedulableQueue = new ArrayBuffer[Schedulable]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
- for (manager <- sortedSchedulableQueue) {
- logInfo("parentName:%s,schedulableName:%s,minShares:%d,weight:%d,runningTasks:%d".format(
- manager.parent.name, manager.name, manager.minShare, manager.weight, manager.runningTasks))
+ for (schedulable <- sortedSchedulableQueue) {
+ leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
}
- for (manager <- sortedSchedulableQueue) {
- val task = manager.receiveOffer(execId, host, availableCpus)
- if (task != None) {
- return task
- }
- }
- return None
+ return leafSchedulableQueue
}
override def increaseRunningTasks(taskNum: Int) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 54e8ae95f9..c620588e14 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -21,6 +21,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
- def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
+ def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
+ def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index baaaa41a37..80edbe77a1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -398,6 +398,12 @@ private[spark] class TaskSetManager(
//nothing
}
+ override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+ var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+ leafSchedulableQueue += this
+ return leafSchedulableQueue
+ }
+
override def executorLost(execId: String, hostname: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
val newHostsAlive = sched.hostsAlive
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 2eda48196b..8426be7575 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -6,6 +6,7 @@ import org.scalatest.BeforeAndAfter
import spark._
import spark.scheduler._
import spark.scheduler.cluster._
+import scala.collection.mutable.ArrayBuffer
import java.util.Properties
@@ -25,34 +26,34 @@ class DummyTaskSetManager(
var numTasks = initNumTasks
var tasksFinished = 0
- def increaseRunningTasks(taskNum: Int) {
+ override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
- def decreaseRunningTasks(taskNum: Int) {
+ override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
- def addSchedulable(schedulable: Schedulable) {
+ override def addSchedulable(schedulable: Schedulable) {
}
- def removeSchedulable(schedulable: Schedulable) {
+ override def removeSchedulable(schedulable: Schedulable) {
}
- def getSchedulableByName(name: String): Schedulable = {
+ override def getSchedulableByName(name: String): Schedulable = {
return null
}
- def executorLost(executorId: String, host: String): Unit = {
+ override def executorLost(executorId: String, host: String): Unit = {
}
- def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
+ override def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
if (tasksFinished + runningTasks < numTasks) {
increaseRunningTasks(1)
return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null))
@@ -60,10 +61,16 @@ class DummyTaskSetManager(
return None
}
- def checkSpeculatableTasks(): Boolean = {
+ override def checkSpeculatableTasks(): Boolean = {
return true
}
+ override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+ var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+ leafSchedulableQueue += this
+ return leafSchedulableQueue
+ }
+
def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
@@ -80,16 +87,21 @@ class DummyTaskSetManager(
class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
- def receiveOffer(rootPool: Pool) : Option[TaskDescription] = {
- rootPool.receiveOffer("execId_1", "hostname_1", 1)
+ def resourceOffer(rootPool: Pool): Int = {
+ val taskSetQueue = rootPool.getSortedLeafSchedulable()
+ for (taskSet <- taskSetQueue)
+ {
+ taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+ case Some(task) =>
+ return task.taskSetId.toInt
+ case None => {}
+ }
+ }
+ -1
}
def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
- receiveOffer(rootPool) match {
- case Some(task) =>
- assert(task.taskSetId.toInt === expectedTaskSetId)
- case _ =>
- }
+ assert(resourceOffer(rootPool) === expectedTaskSetId)
}
test("FIFO Scheduler Test") {
@@ -105,9 +117,9 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
checkTaskSetId(rootPool, 0)
- receiveOffer(rootPool)
+ resourceOffer(rootPool)
checkTaskSetId(rootPool, 1)
- receiveOffer(rootPool)
+ resourceOffer(rootPool)
taskSetManager1.abort()
checkTaskSetId(rootPool, 2)
}