diff options
author | Mingfei <mingfei.shi@intel.com> | 2013-06-08 15:20:13 +0800 |
---|---|---|
committer | Mingfei <mingfei.shi@intel.com> | 2013-06-08 15:20:13 +0800 |
commit | 362f0f93acf38aefa872158fe9cc9d51a43d81a1 (patch) | |
tree | 021d4fb6db4d050a4dff8ca354f17a77696382c2 | |
parent | 1a4d93c025e5d3679257a622f49dfaade4ac18c2 (diff) | |
parent | b58a29295b2e610cadf1cac44438337ce9b51537 (diff) | |
download | spark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.tar.gz spark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.tar.bz2 spark-362f0f93acf38aefa872158fe9cc9d51a43d81a1.zip |
Merge branch 'master' of https://github.com/mesos/spark
3 files changed, 31 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala index a5d6285c99..f33310a34a 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala @@ -13,11 +13,11 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority - var res = Math.signum(priority1 - priority2) + var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId - res = Math.signum(stageId1 - stageId2) + res = math.signum(stageId1 - stageId2) } if (res < 0) { return true @@ -35,22 +35,30 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 - val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble - val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble + val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble + val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var res:Boolean = true + var compare:Int = 0 if (s1Needy && !s2Needy) { - res = true + return true } else if (!s1Needy && s2Needy) { - res = false + return false } else if (s1Needy && s2Needy) { - res = minShareRatio1 <= minShareRatio2 + compare = minShareRatio1.compareTo(minShareRatio2) + } else { + compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) + } + + if (compare < 0) { + return true + } else if (compare > 0) { + return false } else { - res = taskToWeightRatio1 <= taskToWeightRatio2 + return s1.name < s2.name } - return res } } diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 8f52168c24..950c0cdf35 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -55,21 +55,21 @@ object StorageUtils { }.mapValues(_.values.toArray) // For each RDD, generate an RDDInfo object - val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) => - + val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) => // Add up memory and disk sizes val memSize = rddBlocks.map(_.memSize).reduce(_ + _) val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _) // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt - // Get the friendly name for the rdd, if available. - val rdd = sc.persistentRdds(rddId) - val rddName = Option(rdd.name).getOrElse(rddKey) - val rddStorageLevel = rdd.getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize) - }.toArray + // Get the friendly name and storage level for the RDD, if available + sc.persistentRdds.get(rddId).map { r => + val rddName = Option(r.name).getOrElse(rddKey) + val rddStorageLevel = r.getStorageLevel + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize) + } + }.flatten.toArray scala.util.Sorting.quickSort(rddInfos) diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala index a39418b716..c861597c6b 100644 --- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId) } } -class ClusterSchedulerSuite extends FunSuite with LocalSparkContext { +class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = { new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet) @@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext { def resourceOffer(rootPool: Pool): Int = { val taskSetQueue = rootPool.getSortedTaskSetQueue() - for (taskSet <- taskSetQueue) - { + /* Just for Test*/ + for (manager <- taskSetQueue) { + logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) + } + for (taskSet <- taskSetQueue) { taskSet.slaveOffer("execId_1", "hostname_1", 1) match { case Some(task) => return taskSet.stageId |