aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-03-29 08:20:35 +0800
committerAndrew xia <junluan.xia@intel.com>2013-03-29 08:20:35 +0800
commitdef3d1c84a3e0d1371239e9358294a4b4ad46b9f (patch)
tree613933e10dff80c716437d020997d463019d3bff
parentd1d9bdaabe24cc60097f843e0bef92e57b404941 (diff)
downloadspark-def3d1c84a3e0d1371239e9358294a4b4ad46b9f.tar.gz
spark-def3d1c84a3e0d1371239e9358294a4b4ad46b9f.tar.bz2
spark-def3d1c84a3e0d1371239e9358294a4b4ad46b9f.zip
1.remove redundant spacing in source code
2.replace get/set functions with val and var defination
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala65
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Pool.scala43
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala34
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala37
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala1
11 files changed, 91 insertions, 144 deletions
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index b6d3c2c089..105eaecb22 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -13,7 +13,7 @@ private[spark] class ActiveJob(
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
- val listener: JobListener,
+ val listener: JobListener,
val properties: Properties) {
val numPartitions = partitions.length
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 717cc27739..0a64a4f041 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -241,7 +241,7 @@ class DAGScheduler(
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
- resultHandler: (Int, U) => Unit,
+ resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
if (partitions.size == 0) {
@@ -263,7 +263,7 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
callSite: String,
- timeout: Long,
+ timeout: Long,
properties: Properties = null)
: PartialResult[R] =
{
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 092b0a0cfc..be0d480aa0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -60,7 +60,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker
-
+
var taskSetQueuesManager: TaskSetQueuesManager = null
override def setListener(listener: TaskSchedulerListener) {
@@ -131,11 +131,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def taskFinished(manager: TaskSetManager) {
+ def taskFinished(manager: TaskSetManager) {
this.synchronized {
- taskSetQueuesManager.taskFinished(manager)
+ taskSetQueuesManager.taskFinished(manager)
}
- }
+ }
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
@@ -144,7 +144,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized {
-
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
@@ -228,7 +227,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
if (failedExecutor != None) {
- listener.executorLost(failedExecutor.get)
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@@ -299,7 +298,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Call listener.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor != None) {
- listener.executorLost(failedExecutor.get)
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
index 868b11c8d6..5949ee773f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala
@@ -8,26 +8,26 @@ import spark.Logging
* A FIFO Implementation of the TaskSetQueuesManager
*/
private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
-
+
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm()
-
+
override def addTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue += manager
}
-
+
override def removeTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue -= manager
}
-
+
override def taskFinished(manager: TaskSetManager) {
//do nothing
}
-
+
override def removeExecutor(executorId: String, host: String) {
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
-
+
override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] =
{
for(manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator))
@@ -48,5 +48,4 @@ private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with
}
return shouldRevive
}
-
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
index 4e26cedfda..0609600f35 100644
--- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala
@@ -14,15 +14,14 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* A Fair Implementation of the TaskSetQueuesManager
- *
+ *
* Currently we support minShare,weight for fair scheduler between pools
* Within a pool, it supports FIFO or FS
* Also, currently we could allocate pools dynamically
- *
*/
private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
-
- val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+
+ val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val poolNameToPool= new HashMap[String, Pool]
var pools = new ArrayBuffer[Pool]
val poolScheduleAlgorithm = new FairSchedulingAlgorithm()
@@ -36,9 +35,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val POOL_DEFAULT_MINIMUM_SHARES = 2
val POOL_DEFAULT_WEIGHT = 1
-
+
loadPoolProperties()
-
+
def loadPoolProperties() {
//first check if the file exists
val file = new File(schedulerAllocFile)
@@ -51,26 +50,25 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
var minShares = POOL_DEFAULT_MINIMUM_SHARES
var weight = POOL_DEFAULT_WEIGHT
-
-
+
val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
if( xmlSchedulingMode != "")
{
- try
+ try
{
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
}
catch{
- case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
-
+
val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
if(xmlMinShares != "")
{
minShares = xmlMinShares.toInt
}
-
+
val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
if(xmlWeight != "")
{
@@ -84,15 +82,15 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
}
}
- if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
+ if(!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME))
{
val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
- }
+ }
}
-
+
override def addTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
@@ -100,19 +98,19 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
if(!poolNameToPool.contains(poolName))
{
- //we will create a new pool that user has configured in app,but not contained in xml file
+ //we will create a new pool that user has configured in app instead of being defined in xml file
val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(poolName) = pool
- logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
+ logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
poolNameToPool(poolName).addTaskSetManager(manager)
- logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
+ logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
}
-
+
override def removeTaskSetManager(manager: TaskSetManager) {
-
+
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
{
@@ -121,10 +119,9 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
val pool = poolNameToPool(poolName)
pool.removeTaskSetManager(manager)
- pool.setRunningTasks(pool.getRunningTasks() - manager.getRunningTasks())
-
+ pool.runningTasks -= manager.runningTasks
}
-
+
override def taskFinished(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if(manager.taskSet.properties != null)
@@ -132,40 +129,40 @@ private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
val pool = poolNameToPool(poolName)
- pool.setRunningTasks(pool.getRunningTasks() - 1)
- manager.setRunningTasks(manager.getRunningTasks() - 1)
+ pool.runningTasks -= 1
+ manager.runningTasks -=1
}
-
+
override def removeExecutor(executorId: String, host: String) {
for (pool <- pools) {
- pool.removeExecutor(executorId,host)
- }
+ pool.removeExecutor(executorId,host)
+ }
}
-
+
override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] =
{
val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
for(pool <- sortedPools)
{
- logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.getMinShare(),pool.getRunningTasks()))
+ logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
}
for (pool <- sortedPools)
{
val task = pool.receiveOffer(execId,host,avaiableCpus)
if(task != None)
{
- pool.setRunningTasks(pool.getRunningTasks() + 1)
+ pool.runningTasks += 1
return task
}
}
return None
}
-
- override def checkSpeculatableTasks(): Boolean =
+
+ override def checkSpeculatableTasks(): Boolean =
{
var shouldRevive = false
- for (pool <- pools)
+ for (pool <- pools)
{
shouldRevive |= pool.checkSpeculatableTasks()
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index 68e1d2a75a..8fdca5d2b4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -7,13 +7,21 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of TaskSetManager
*/
-private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,val minShare:Int, val weight:Int) extends Schedulable with Logging {
-
+private[spark] class Pool(val poolName: String,val schedulingMode: SchedulingMode, initMinShare:Int, initWeight:Int) extends Schedulable with Logging
+{
+
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
- var numRunningTasks: Int = 0
- var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
+
+ var weight = initWeight
+ var minShare = initMinShare
+ var runningTasks = 0
+
+ val priority = 0
+ val stageId = 0
+
+ var taskSetSchedulingAlgorithm: SchedulingAlgorithm =
{
- schedulingMode match
+ schedulingMode match
{
case SchedulingMode.FAIR =>
val schedule = new FairSchedulingAlgorithm()
@@ -23,26 +31,6 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v
schedule
}
}
-
- override def getMinShare():Int =
- {
- return minShare
- }
-
- override def getRunningTasks():Int =
- {
- return numRunningTasks
- }
-
- def setRunningTasks(taskNum : Int)
- {
- numRunningTasks = taskNum
- }
-
- override def getWeight(): Int =
- {
- return weight
- }
def addTaskSetManager(manager:TaskSetManager)
{
@@ -74,15 +62,14 @@ private[spark] class Pool(val poolName: String, schedulingMode: SchedulingMode,v
val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for(manager <- sortedActiveTasksSetQueue)
{
-
- logDebug("taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(manager.taskSet.id,manager.numTasks,manager.getMinShare(),manager.getWeight(),manager.getRunningTasks()))
+ logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format(poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks))
}
for(manager <- sortedActiveTasksSetQueue)
{
val task = manager.slaveOffer(execId,host,availableCpus)
if (task != None)
{
- manager.setRunningTasks(manager.getRunningTasks() + 1)
+ manager.runningTasks += 1
return task
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 837f9c4983..6f4f104f42 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -1,21 +1,12 @@
package spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
-
/**
- * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
+ * An interface for schedulable entities, there are two type Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
-
- def getMinShare(): Int
- def getRunningTasks(): Int
- def getPriority(): Int =
- {
- return 0
- }
- def getWeight(): Int
- def getStageId(): Int =
- {
- return 0
- }
+ def weight:Int
+ def minShare:Int
+ def runningTasks:Int
+ def priority:Int
+ def stageId:Int
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index f8919e7374..2f8123587f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -1,7 +1,7 @@
package spark.scheduler.cluster
/**
- * An interface for sort algorithm
+ * An interface for sort algorithm
* FIFO: FIFO algorithm for TaskSetManagers
* FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers
*/
@@ -13,13 +13,13 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean =
{
- val priority1 = s1.getPriority()
- val priority2 = s2.getPriority()
+ val priority1 = s1.priority
+ val priority2 = s2.priority
var res = Math.signum(priority1 - priority2)
if (res == 0)
{
- val stageId1 = s1.getStageId()
- val stageId2 = s2.getStageId()
+ val stageId1 = s1.stageId
+ val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2)
}
if (res < 0)
@@ -29,7 +29,7 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm
else
{
return false
- }
+ }
}
}
@@ -37,16 +37,18 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
{
def comparator(s1: Schedulable, s2:Schedulable): Boolean =
{
- val minShare1 = s1.getMinShare()
- val minShare2 = s2.getMinShare()
- val s1Needy = s1.getRunningTasks() < minShare1
- val s2Needy = s2.getRunningTasks() < minShare2
- val minShareRatio1 = s1.getRunningTasks().toDouble / Math.max(minShare1,1.0).toDouble
- val minShareRatio2 = s2.getRunningTasks().toDouble / Math.max(minShare2,1.0).toDouble
- val taskToWeightRatio1 = s1.getRunningTasks().toDouble / s1.getWeight().toDouble
- val taskToWeightRatio2 = s2.getRunningTasks().toDouble / s2.getWeight().toDouble
+ val minShare1 = s1.minShare
+ val minShare2 = s2.minShare
+ val runningTasks1 = s1.runningTasks
+ val runningTasks2 = s2.runningTasks
+ val s1Needy = runningTasks1 < minShare1
+ val s2Needy = runningTasks2 < minShare2
+ val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1,1.0).toDouble
+ val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2,1.0).toDouble
+ val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
+ val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
-
+
if(s1Needy && !s2Needy)
{
res = true
@@ -57,7 +59,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm
}
else if (s1Needy && s2Needy)
{
- res = minShareRatio1 <= minShareRatio2
+ res = minShareRatio1 <= minShareRatio2
}
else
{
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 6be4f3cd84..480af2c1a3 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,4 +1,4 @@
-package spark.scheduler.cluster
+package spark.scheduler.cluster
object SchedulingMode extends Enumeration("FAIR","FIFO")
{
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 064593f486..ddc4fa6642 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -29,7 +29,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val MAX_TASK_FAILURES = 4
val TASKSET_MINIMUM_SHARES = 1
-
val TASKSET_WEIGHT = 1
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
@@ -38,7 +37,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
+ var weight = TASKSET_WEIGHT
+ var minShare = TASKSET_MINIMUM_SHARES
+ var runningTasks = 0
val priority = taskSet.priority
+ val stageId = taskSet.stageId
+
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
@@ -46,7 +50,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
- var numRunningTasks =0;
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -100,36 +103,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
- override def getMinShare(): Int =
- {
- return TASKSET_MINIMUM_SHARES
- }
-
- override def getRunningTasks(): Int =
- {
- return numRunningTasks
- }
-
- def setRunningTasks(taskNum :Int)
- {
- numRunningTasks = taskNum
- }
-
- override def getPriority(): Int =
- {
- return priority
- }
-
- override def getWeight(): Int =
- {
- return TASKSET_WEIGHT
- }
-
- override def getStageId(): Int =
- {
- return taskSet.stageId
- }
-
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
index c117ee7a85..86971d47e6 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetQueuesManager.scala
@@ -5,7 +5,6 @@ import scala.collection.mutable.ArrayBuffer
/**
* An interface for managing TaskSet queue/s that allows plugging different policy for
* offering tasks to resources
- *
*/
private[spark] trait TaskSetQueuesManager {
def addTaskSetManager(manager: TaskSetManager): Unit