aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-04-09 13:02:50 +0800
committerAndrew xia <junluan.xia@intel.com>2013-04-09 13:02:50 +0800
commit2f883c515fe4577f0105e62dd9f395d7de42bd68 (patch)
treecc92e28b84513a57325dc727af71e98be31c267e /core
parent2b373dd07a7b3f2906607d910c869e3290ca9d05 (diff)
downloadspark-2f883c515fe4577f0105e62dd9f395d7de42bd68.tar.gz
spark-2f883c515fe4577f0105e62dd9f395d7de42bd68.tar.bz2
spark-2f883c515fe4577f0105e62dd9f395d7de42bd68.zip
Contiue to update codes for scala code style
1.refactor braces for "class" "if" "while" "for" "match" 2.make code lines less than 100 3.refactor class parameter and extends defination
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSet.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala139
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala46
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
11 files changed, 122 insertions, 147 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index ed5f686379..7c96ae637b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -146,7 +146,8 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler
@@ -166,7 +167,8 @@ class SparkContext(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
@@ -186,7 +188,8 @@ class SparkContext(
} else {
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
}
- val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).newInstance().asInstanceOf[TaskSetQueuesManager]
+ val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")).
+ newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler
}
@@ -602,7 +605,8 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,localProperties.value)
+ val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler
+ ,localProperties.value)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 6f4e5cd83e..11fec568c6 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -23,7 +23,8 @@ private[spark] case class JobSubmitted(
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
- listener: JobListener, properties: Properties = null)
+ listener: JobListener,
+ properties: Properties = null)
extends DAGSchedulerEvent
private[spark] case class CompletionEvent(
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
index 2498e8a5aa..e4b5fcaedb 100644
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSet.scala
@@ -6,8 +6,13 @@ import java.util.Properties
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
-private[spark] class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int, val properties: Properties) {
- val id: String = stageId + "." + attempt
+private[spark] class TaskSet(
+ val tasks: Array[Task[_]],
+ val stageId: Int,
+ val attempt: Int,
+ val priority: Int,
+ val properties: Properties) {
+ val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index be0d480aa0..2ddac0ff30 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -155,13 +155,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
- for (i <- 0 until offers.size)
- {
+ for (i <- 0 until offers.size){
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
- while (availableCpus(i) > 0 && launchedTask)
- {
+ while (availableCpus(i) > 0 && launchedTask){
launchedTask = false
taskSetQueuesManager.receiveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
index 5949ee773f..62d3130341 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
@@ -28,13 +28,11 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
- override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] =
- {
- for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator))
- {
+ override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] = {
+
+ for (manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator)) {
val task = manager.slaveOffer(execId,host,avaiableCpus)
- if (task != None)
- {
+ if (task != None) {
return task
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
index 0609600f35..89b74fbb47 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
@@ -38,71 +38,17 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
loadPoolProperties()
- def loadPoolProperties() {
- //first check if the file exists
- val file = new File(schedulerAllocFile)
- if(file.exists())
- {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
- var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
- var minShares = POOL_DEFAULT_MINIMUM_SHARES
- var weight = POOL_DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
- if( xmlSchedulingMode != "")
- {
- try
- {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- }
- catch{
- case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
- if(xmlMinShares != "")
- {
- minShares = xmlMinShares.toInt
- }
-
- val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
- if(xmlWeight != "")
- {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName,schedulingMode,minShares,weight)
- pools += pool
- poolNameToPool(poolName) = pool
- logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,schedulingMode,minShares,weight))
- }
- }
-
- if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
- {
- val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
- pools += pool
- poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
- logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
- }
- }
-
override def addTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
- if(!poolNameToPool.contains(poolName))
- {
+ if (!poolNameToPool.contains(poolName)) {
//we will create a new pool that user has configured in app instead of being defined in xml file
val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(poolName) = pool
- logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
poolNameToPool(poolName).addTaskSetManager(manager)
@@ -110,10 +56,8 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
override def removeTaskSetManager(manager: TaskSetManager) {
-
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
@@ -124,8 +68,7 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
override def taskFinished(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
- if(manager.taskSet.properties != null)
- {
+ if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
val pool = poolNameToPool(poolName)
@@ -139,19 +82,15 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
}
- override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] =
- {
-
+ override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] = {
val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
- for(pool <- sortedPools)
- {
- logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
+ for (pool <- sortedPools) {
+ logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(
+ pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
}
- for (pool <- sortedPools)
- {
+ for (pool <- sortedPools) {
val task = pool.receiveOffer(execId,host,avaiableCpus)
- if(task != None)
- {
+ if(task != None) {
pool.runningTasks += 1
return task
}
@@ -159,14 +98,60 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
return None
}
- override def checkSpeculatableTasks(): Boolean =
- {
+ override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
- for (pool <- pools)
- {
+ for (pool <- pools) {
shouldRevive |= pool.checkSpeculatableTasks()
}
return shouldRevive
}
+ def loadPoolProperties() {
+ //first check if the file exists
+ val file = new File(schedulerAllocFile)
+ if (file.exists()) {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
+ var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
+ var minShares = POOL_DEFAULT_MINIMUM_SHARES
+ var weight = POOL_DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try{
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ }
+ catch{
+ case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShares != "") {
+ minShares = xmlMinShares.toInt
+ }
+
+ val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName,schedulingMode,minShares,weight)
+ pools += pool
+ poolNameToPool(poolName) = pool
+ logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ poolName,schedulingMode,minShares,weight))
+ }
+ }
+
+ if (!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) {
+ val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
+ pools += pool
+ poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
+ logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
+ POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ }
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index 8fdca5d2b4..e0917ca1ca 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -7,8 +7,13 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of TaskSetManager
*/
-private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode, initMinShare:Int, initWeight:Int) extends Schedulable with Logging
-{
+private[spark] class Pool(
+ val poolName: String,
+ val schedulingMode: SchedulingMode,
+ initMinShare:Int,
+ initWeight:Int)
+ extends Schedulable
+ with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -19,10 +24,8 @@ private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMod
val priority = 0
val stageId = 0
- var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
- {
- schedulingMode match
- {
+ var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
+ schedulingMode match {
case SchedulingMode.FAIR =>
val schedule = new FairSchedulingAlgorithm()
schedule
@@ -32,43 +35,36 @@ private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMod
}
}
- def addTaskSetManager(manager:TaskSetManager)
- {
+ def addTaskSetManager(manager:TaskSetManager) {
activeTaskSetsQueue += manager
}
- def removeTaskSetManager(manager:TaskSetManager)
- {
+ def removeTaskSetManager(manager:TaskSetManager) {
activeTaskSetsQueue -= manager
}
- def removeExecutor(executorId: String, host: String)
- {
+ def removeExecutor(executorId: String, host: String) {
activeTaskSetsQueue.foreach(_.executorLost(executorId,host))
}
- def checkSpeculatableTasks(): Boolean =
- {
+ def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
- for(ts <- activeTaskSetsQueue)
- {
+ for (ts <- activeTaskSetsQueue) {
shouldRevive |= ts.checkSpeculatableTasks()
}
return shouldRevive
}
- def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] =
- {
+ def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] = {
val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
- for(manager <- sortedActiveTasksSetQueue)
- {
- logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
+ for (manager <- sortedActiveTasksSetQueue) {
+ logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(
+ poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
}
- for(manager <- sortedActiveTasksSetQueue)
- {
+
+ for (manager <- sortedActiveTasksSetQueue) {
val task = manager.slaveOffer(execId,host,availableCpus)
- if (task != None)
- {
+ if (task != None) {
manager.runningTasks += 1
return task
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 6f4f104f42..8dfc369c03 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -1,7 +1,8 @@
package spark.scheduler.cluster
/**
- * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
+ * An interface for schedulable entities.
+ * there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
def weight:Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index 2f8123587f..ac2237a7ef 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -9,34 +9,25 @@ private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable,s2: Schedulable): Boolean
}
-private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
-{
- override def comparator(s1: Schedulable, s2: Schedulable): Boolean =
- {
+private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
+ override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = Math.signum(priority1 - priority2)
- if (res == 0)
- {
+ if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2)
}
if (res < 0)
- {
return true
- }
else
- {
return false
- }
}
}
-private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
-{
- def comparator(s1: Schedulable, s2:Schedulable): Boolean =
- {
+private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
+ def comparator(s1: Schedulable, s2:Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
@@ -49,22 +40,15 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
- if(s1Needy && !s2Needy)
- {
+ if (s1Needy && !s2Needy)
res = true
- }
else if(!s1Needy && s2Needy)
- {
res = false
- }
else if (s1Needy && s2Needy)
- {
res = minShareRatio1 <= minShareRatio2
- }
else
- {
res = taskToWeightRatio1 <= taskToWeightRatio2
- }
+
return res
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 480af2c1a3..6e0c6793e0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,8 +1,7 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR","FIFO")
-{
- type SchedulingMode = Value
+object SchedulingMode extends Enumeration("FAIR","FIFO"){
+ type SchedulingMode = Value
val FAIR,FIFO = Value
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index ddc4fa6642..7ec2f69da5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,7 +17,11 @@ import java.nio.ByteBuffer
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet) extends Schedulable with Logging {
+private[spark] class TaskSetManager(
+ sched: ClusterScheduler,
+ val taskSet: TaskSet)
+ extends Schedulable
+ with Logging {
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong