aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-04-09 13:02:50 +0800
committerAndrew xia <junluan.xia@intel.com>2013-04-09 13:02:50 +0800
commit2f883c515fe4577f0105e62dd9f395d7de42bd68 (patch)
treecc92e28b84513a57325dc727af71e98be31c267e
parent2b373dd07a7b3f2906607d910c869e3290ca9d05 (diff)
downloadspark-2f883c515fe4577f0105e62dd9f395d7de42bd68.tar.gz
spark-2f883c515fe4577f0105e62dd9f395d7de42bd68.tar.bz2
spark-2f883c515fe4577f0105e62dd9f395d7de42bd68.zip
Contiue to update codes for scala code style
1.refactor braces for "class" "if" "while" "for" "match" 2.make code lines less than 100 3.refactor class parameter and extends defination
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSet.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala139
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala46
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
11 files changed, 122 insertions, 147 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index ed5f686379..7c96ae637b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -146,7 +146,8 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler
@@ -166,7 +167,8 @@ class SparkContext(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
@@ -186,7 +188,8 @@ class SparkContext(
} else {
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
}
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler
}
@@ -602,7 +605,8 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,localProperties.value)
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler
+ ,localProperties.value)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 6f4e5cd83e..11fec568c6 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -23,7 +23,8 @@ private[spark] case class JobSubmitted(
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
- listener: JobListener, properties: Properties = null)
+ listener: JobListener,
+ properties: Properties = null)
extends DAGSchedulerEvent
private[spark] case class CompletionEvent(
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
index 2498e8a5aa..e4b5fcaedb 100644
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSet.scala
@@ -6,8 +6,13 @@ import java.util.Properties
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
-private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int, val properties: Properties) {
- val id: String = stageId + "." + attempt
+private[spark] class TaskSet(
+ val tasks: Array[Task[_]],
+ val stageId: Int,
+ val attempt: Int,
+ val priority: Int,
+ val properties: Properties) {
+ val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index be0d480aa0..2ddac0ff30 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -155,13 +155,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
- for (i <- 0 until offers.size)
- {
+ for (i <- 0 until offers.size){
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
- while (availableCpus(i) > 0 && launchedTask)
- {
+ while (availableCpus(i) > 0 && launchedTask){
launchedTask = false
taskSetQueuesManager.receiveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
index 5949ee773f..62d3130341 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
@@ -28,13 +28,11 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
- override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] =
- {
- for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator))
- {
+ override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] = {
+
+ for (manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator)) {
val task = manager.slaveOffer(execId,host,avaiableCpus)
- if (task != None)
- {
+ if (task != None) {
return task
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
index 0609600f35..89b74fbb47 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
@@ -38,71 +38,17 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
loadPoolProperties()
- def loadPoolProperties() {
- //first check if the file exists
- val file = new File(schedulerAllocFile)
- if(file.exists())
- {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
- var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
- var minShares = POOL_DEFAULT_MINIMUM_SHARES
- var weight = POOL_DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
- if( xmlSchedulingMode != "")
- {
- try
- {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- }
- catch{
- case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
- if(xmlMinShares != "")
- {
- minShares = xmlMinShares.toInt
- }
-
- val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
- if(xmlWeight != "")
- {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName,schedulingMode,minShares,weight)
- pools += pool
- poolNameToPool(poolName) = pool
- logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,schedulingMode,minShares,weight))
- }
- }
-
- if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
- {
- val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
- pools += pool
- poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
- logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
- }
- }
-
override def addTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
- if(!poolNameToPool.contains(poolName))
- {
+ if (!poolNameToPool.contains(poolName)) {
//we will create a new pool that user has configured in app instead of being defined in xml file
val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(poolName) = pool
- logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
poolNameToPool(poolName).addTaskSetManager(manager)
@@ -110,10 +56,8 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
override def removeTaskSetManager(manager: TaskSetManager) {
-
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
@@ -124,8 +68,7 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
override def taskFinished(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
val pool = poolNameToPool(poolName)
@@ -139,19 +82,15 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
}
- override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] =
- {
-
+ override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] = {
val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
- for(pool <- sortedPools)
- {
- logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
+ for (pool <- sortedPools) {
+ logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(
+ pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
}
- for (pool <- sortedPools)
- {
+ for (pool <- sortedPools) {
val task = pool.receiveOffer(execId,host,avaiableCpus)
- if(task != None)
- {
+ if(task != None) {
pool.runningTasks += 1
return task
}
@@ -159,14 +98,60 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
return None
}
- override def checkSpeculatableTasks(): Boolean =
- {
+ override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
- for (pool <- pools)
- {
+ for (pool <- pools) {
shouldRevive |= pool.checkSpeculatableTasks()
}
return shouldRevive
}
+ def loadPoolProperties() {
+ //first check if the file exists
+ val file = new File(schedulerAllocFile)
+ if (file.exists()) {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
+ var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
+ var minShares = POOL_DEFAULT_MINIMUM_SHARES
+ var weight = POOL_DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try{
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ }
+ catch{
+ case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShares != "") {
+ minShares = xmlMinShares.toInt
+ }
+
+ val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName,schedulingMode,minShares,weight)
+ pools += pool
+ poolNameToPool(poolName) = pool
+ logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ poolName,schedulingMode,minShares,weight))
+ }
+ }
+
+ if (!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) {
+ val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
+ pools += pool
+ poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
+ logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ }
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index 8fdca5d2b4..e0917ca1ca 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -7,8 +7,13 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of TaskSetManager
*/
-private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode, initMinShare:Int, initWeight:Int) extends Schedulable with Logging
-{
+private[spark] class Pool(
+ val poolName: String,
+ val schedulingMode: SchedulingMode,
+ initMinShare:Int,
+ initWeight:Int)
+ extends Schedulable
+ with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -19,10 +24,8 @@ private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMod
val priority = 0
val stageId = 0
- var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
- {
- schedulingMode match
- {
+ var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+ schedulingMode match {
case SchedulingMode.FAIR =>
val schedule = new FairSchedulingAlgorithm()
schedule
@@ -32,43 +35,36 @@ private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMod
}
}
- def addTaskSetManager(manager:TaskSetManager)
- {
+ def addTaskSetManager(manager:TaskSetManager) {
activeTaskSetsQueue += manager
}
- def removeTaskSetManager(manager:TaskSetManager)
- {
+ def removeTaskSetManager(manager:TaskSetManager) {
activeTaskSetsQueue -= manager
}
- def removeExecutor(executorId: String, host: String)
- {
+ def removeExecutor(executorId: String, host: String) {
activeTaskSetsQueue.foreach(_.executorLost(executorId,host))
}
- def checkSpeculatableTasks(): Boolean =
- {
+ def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
- for(ts <- activeTaskSetsQueue)
- {
+ for (ts <- activeTaskSetsQueue) {
shouldRevive |= ts.checkSpeculatableTasks()
}
return shouldRevive
}
- def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] =
- {
+ def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] = {
val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
- for(manager <- sortedActiveTasksSetQueue)
- {
- logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
+ for (manager <- sortedActiveTasksSetQueue) {
+ logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(
+ poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
}
- for(manager <- sortedActiveTasksSetQueue)
- {
+
+ for (manager <- sortedActiveTasksSetQueue) {
val task = manager.slaveOffer(execId,host,availableCpus)
- if (task != None)
- {
+ if (task != None) {
manager.runningTasks += 1
return task
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 6f4f104f42..8dfc369c03 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -1,7 +1,8 @@
package spark.scheduler.cluster
/**
- * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
+ * An interface for schedulable entities.
+ * there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
def weight:Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index 2f8123587f..ac2237a7ef 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -9,34 +9,25 @@ private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable,s2: Schedulable): Boolean
}
-private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
-{
- override def comparator(s1: Schedulable, s2: Schedulable): Boolean =
- {
+private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
+ override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = Math.signum(priority1 - priority2)
- if (res == 0)
- {
+ if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2)
}
if (res < 0)
- {
return true
- }
else
- {
return false
- }
}
}
-private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
-{
- def comparator(s1: Schedulable, s2:Schedulable): Boolean =
- {
+private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
+ def comparator(s1: Schedulable, s2:Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
@@ -49,22 +40,15 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
- if(s1Needy && !s2Needy)
- {
+ if (s1Needy && !s2Needy)
res = true
- }
else if(!s1Needy && s2Needy)
- {
res = false
- }
else if (s1Needy && s2Needy)
- {
res = minShareRatio1 <= minShareRatio2
- }
else
- {
res = taskToWeightRatio1 <= taskToWeightRatio2
- }
+
return res
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 480af2c1a3..6e0c6793e0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,8 +1,7 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR","FIFO")
-{
- type SchedulingMode = Value
+object SchedulingMode extends Enumeration("FAIR","FIFO"){
+ type SchedulingMode = Value
val FAIR,FIFO = Value
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index ddc4fa6642..7ec2f69da5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,7 +17,11 @@ import java.nio.ByteBuffer
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Schedulable with Logging {
+private[spark] class TaskSetManager(
+ sched: ClusterScheduler,
+ val taskSet: TaskSet)
+ extends Schedulable
+ with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong