aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMingfei <mingfei.shi@intel.com>2013-06-08 15:20:13 +0800
committerMingfei <mingfei.shi@intel.com>2013-06-08 15:20:13 +0800
commit362f0f93acf38aefa872158fe9cc9d51a43d81a1 (patch)
tree021d4fb6db4d050a4dff8ca354f17a77696382c2 /core/src
parent1a4d93c025e5d3679257a622f49dfaade4ac18c2 (diff)
parentb58a29295b2e610cadf1cac44438337ce9b51537 (diff)
downloadspark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.tar.gz
spark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.tar.bz2
spark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.zip
Merge branch 'master' of https://github.com/mesos/spark
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala26
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala16
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala9
3 files changed, 31 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index a5d6285c99..f33310a34a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -13,11 +13,11 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
- var res = Math.signum(priority1 - priority2)
+ var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
- res = Math.signum(stageId1 - stageId2)
+ res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
return true
@@ -35,22 +35,30 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
- val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble
- val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble
+ val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
+ val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
+ var compare:Int = 0
if (s1Needy && !s2Needy) {
- res = true
+ return true
} else if (!s1Needy && s2Needy) {
- res = false
+ return false
} else if (s1Needy && s2Needy) {
- res = minShareRatio1 <= minShareRatio2
+ compare = minShareRatio1.compareTo(minShareRatio2)
+ } else {
+ compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
+ }
+
+ if (compare < 0) {
+ return true
+ } else if (compare > 0) {
+ return false
} else {
- res = taskToWeightRatio1 <= taskToWeightRatio2
+ return s1.name < s2.name
}
- return res
}
}
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index 8f52168c24..950c0cdf35 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -55,21 +55,21 @@ object StorageUtils {
}.mapValues(_.values.toArray)
// For each RDD, generate an RDDInfo object
- val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
-
+ val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) =>
// Add up memory and disk sizes
val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
- // Get the friendly name for the rdd, if available.
- val rdd = sc.persistentRdds(rddId)
- val rddName = Option(rdd.name).getOrElse(rddKey)
- val rddStorageLevel = rdd.getStorageLevel
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
- }.toArray
+ // Get the friendly name and storage level for the RDD, if available
+ sc.persistentRdds.get(rddId).map { r =>
+ val rddName = Option(r.name).getOrElse(rddKey)
+ val rddStorageLevel = r.getStorageLevel
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
+ }
+ }.flatten.toArray
scala.util.Sorting.quickSort(rddInfos)
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index a39418b716..c861597c6b 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
@@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue()
- for (taskSet <- taskSetQueue)
- {
+ /* Just for Test*/
+ for (manager <- taskSetQueue) {
+ logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+ }
+ for (taskSet <- taskSetQueue) {
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
case Some(task) =>
return taskSet.stageId