aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-07-30 10:57:26 +0800
committerAndrew xia <junluan.xia@intel.com>2013-07-30 10:57:26 +0800
commit614ee16cc4c63260f13d0c7494fbaafa8a061e95 (patch)
tree2c20fda3668b19dfbd6a98d1389fab5a186e1536 /core
parent2080e250060975a876a388eb785e7f2b3cf2c0cd (diff)
downloadspark-614ee16cc4c63260f13d0c7494fbaafa8a061e95.tar.gz
spark-614ee16cc4c63260f13d0c7494fbaafa8a061e95.tar.bz2
spark-614ee16cc4c63260f13d0c7494fbaafa8a061e95.zip
refactor job ui with pool information
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala7
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala35
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala28
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala62
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala7
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolPage.scala17
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala23
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala16
17 files changed, 116 insertions, 136 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index b5225d5681..375636071d 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -45,13 +45,13 @@ import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable,
+SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -542,17 +542,25 @@ class SparkContext(
env.blockManager.master.getStorageStatus
}
- def getPoolsInfo: ArrayBuffer[Schedulable] = {
+ /**
+ * Return pools for fair scheduler
+ * TODO:now, we have not taken nested pools into account
+ */
+ def getPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
- def getSchedulingMode: SchedulingMode = {
+ /**
+ * Return current scheduling mode
+ */
+ def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
def getPoolNameToPool: HashMap[String, Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable
}
+
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 94fdad9b98..07372ee786 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -8,7 +8,7 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties = null) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1b23fd6cef..74b3e43d2b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -98,8 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
- //default scheduler is FIFO
- val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+ // default scheduler is FIFO
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
@@ -107,7 +108,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
- //temporarily set rootPool name to empty
+ // temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
@@ -254,10 +255,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
var launchedTask = false
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
- for (manager <- sortedTaskSetQueue)
- {
+
+ for (manager <- sortedTaskSetQueue) {
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
}
+
for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks.
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 7a6a6b7826..4d11b0959a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -91,7 +91,6 @@ private[spark] class ClusterTaskSetManager(
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
- var schedulableQueue :ArrayBuffer[Schedulable] = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -645,17 +644,17 @@ private[spark] class ClusterTaskSetManager(
}
}
- //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+ // TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
- //nothing
+ // nothing
}
override def removeSchedulable(schedulable:Schedulable) {
- //nothing
+ // nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 2e4f14c11f..c410af8af4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -9,7 +9,7 @@ import scala.collection.mutable.ArrayBuffer
*/
private[spark] trait Schedulable {
var parent: Schedulable
- //childrens
+ // child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index 18cc15c2a5..a2fa80aa36 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -27,7 +27,7 @@ private[spark] trait SchedulableBuilder {
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
override def buildPools() {
- //nothing
+ // nothing
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@@ -86,7 +86,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
- //finally create "default" pool
+ // finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
@@ -102,7 +102,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
- //we will create a new pool that user has configured in app instead of being defined in xml file
+ // we will create a new pool that user has configured in app instead of being defined in xml file
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index c5c7ee3b22..a7f0f6f393 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,6 +1,10 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE"){
+/**
+ * "FAIR" and "FIFO" determines which policy is used to order tasks amongst a Schedulable's sub-queues
+ * "NONE" is used when the a Schedulable has no sub-queues.
+ */
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
type SchedulingMode = Value
val FAIR,FIFO,NONE = Value
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 472e01b227..4e6bc51278 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -6,6 +6,7 @@ import spark.TaskState.TaskState
import java.nio.ByteBuffer
private[spark] trait TaskSetManager extends Schedulable {
+ def schedulableQueue = null
def schedulingMode = SchedulingMode.NONE
def taskSet: TaskSet
def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 19a48895e3..f4411582f1 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -64,7 +64,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
- val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -72,7 +73,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null
override def start() {
- //temporarily set rootPool name to empty
+ // temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 8954f40ea9..cc27f1ecca 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster._
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
var parent: Schedulable = null
- var schedulableQueue :ArrayBuffer[Schedulable] = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
@@ -48,11 +47,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
def addSchedulable(schedulable: Schedulable): Unit = {
- //nothing
+ // nothing
}
def removeSchedulable(schedulable: Schedulable): Unit = {
- //nothing
+ // nothing
}
def getSchedulableByName(name: String): Schedulable = {
@@ -60,7 +59,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
def executorLost(executorId: String, host: String): Unit = {
- //nothing
+ // nothing
}
def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 8bbc6ce88e..840ac9773e 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -4,7 +4,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
-
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@@ -15,8 +16,17 @@ private[spark] object UIWorkloadGenerator {
val INTER_JOB_WAIT_MS = 500
def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ System.exit(1)
+ }
val master = args(0)
+ val schedulingMode = SchedulingMode.withName(args(1))
val appName = "Spark UI Tester"
+
+ if (schedulingMode == SchedulingMode.FAIR) {
+ System.setProperty("spark.cluster.schedulingmode", "FAIR")
+ }
val sc = new SparkContext(master, appName)
// NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
@@ -56,14 +66,21 @@ private[spark] object UIWorkloadGenerator {
while (true) {
for ((desc, job) <- jobs) {
- try {
- setName(desc)
- job()
- println("Job funished: " + desc)
- } catch {
- case e: Exception =>
- println("Job Failed: " + desc)
- }
+ new Thread {
+ override def run() {
+ if(schedulingMode == SchedulingMode.FAIR) {
+ sc.addLocalProperties("spark.scheduler.cluster.fair.pool",desc)
+ }
+ try {
+ setName(desc)
+ job()
+ println("Job funished: " + desc)
+ } catch {
+ case e: Exception =>
+ println("Job Failed: " + desc)
+ }
+ }
+ }.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index e765cecb01..abef683791 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -4,38 +4,30 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.Some
-import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+import scala.Some
+import scala.xml.{NodeSeq, Node}
import spark.scheduler.Stage
+import spark.storage.StorageLevel
import spark.ui.UIUtils._
import spark.ui.Page._
-import spark.storage.StorageLevel
-import spark.scheduler.cluster.Schedulable
-import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
- def stageTable: StageTable = parent.stageTable
-
- def poolTable: PoolTable = parent.poolTable
-
def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
- stageTable.setStagePoolInfo(parent.stagePoolInfo)
- poolTable.setPoolSource(parent.stagePagePoolSource)
+ val activeStagesTable = new StageTable(activeStages, parent)
+ val completedStagesTable = new StageTable(completedStages, parent)
+ val failedStagesTable = new StageTable(failedStages, parent)
- val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
- val completedStageNodeSeq = stageTable.toNodeSeq(completedStages)
- val failedStageNodeSeq = stageTable.toNodeSeq(failedStages)
+ val poolTable = new PoolTable(parent.stagePagePoolSource, listener)
val content = <div class="row">
<div class="span12">
@@ -48,9 +40,9 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</div>
</div> ++
<h3>Pools </h3> ++ poolTable.toNodeSeq ++
- <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq++
- <h3>Completed Stages : {completedStages.size}</h3> ++ completedStageNodeSeq++
- <h3>Failed Stages : {failedStages.size}</h3> ++ failedStageNodeSeq
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq++
+ <h3>Completed Stages : {completedStages.size}</h3> ++ completedStagesTable.toNodeSeq++
+ <h3>Failed Stages : {failedStages.size}</h3> ++ failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs)
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index 1244f9538b..d4767bea22 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -9,53 +9,13 @@ import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
-private[spark] class FairJobProgressListener(val sparkContext: SparkContext)
- extends JobProgressListener(sparkContext) {
-
- val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
- val DEFAULT_POOL_NAME = "default"
-
- override val stageToPool = HashMap[Stage, String]()
- override val poolToActiveStages = HashMap[String, HashSet[Stage]]()
-
- override def onStageCompleted(stageCompleted: StageCompleted) = {
- super.onStageCompleted(stageCompleted)
- val stage = stageCompleted.stageInfo.stage
- poolToActiveStages(stageToPool(stage)) -= stage
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
- super.onStageSubmitted(stageSubmitted)
- val stage = stageSubmitted.stage
- var poolName = DEFAULT_POOL_NAME
- if (stageSubmitted.properties != null) {
- poolName = stageSubmitted.properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
- }
- stageToPool(stage) = poolName
- val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
- stages += stage
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- super.onJobEnd(jobEnd)
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- poolToActiveStages(stageToPool(stage)) -= stage
- case _ =>
- }
- case _ =>
- }
- }
-}
-
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+ val DEFAULT_POOL_NAME = "default"
- def stageToPool: HashMap[Stage, String] = null
- def poolToActiveStages: HashMap[String, HashSet[Stage]] =null
+ val stageToPool = new HashMap[Stage, String]()
+ val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
val activeStages = HashSet[Stage]()
val completedStages = ListBuffer[Stage]()
@@ -70,6 +30,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onStageCompleted(stageCompleted: StageCompleted) = {
val stage = stageCompleted.stageInfo.stage
+ poolToActiveStages(stageToPool(stage)) -= stage
activeStages -= stage
completedStages += stage
trimIfNecessary(completedStages)
@@ -86,8 +47,18 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
}
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
- activeStages += stageSubmitted.stage
+ /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+ val stage = stageSubmitted.stage
+ activeStages += stage
+ var poolName = DEFAULT_POOL_NAME
+ if (stageSubmitted.properties != null) {
+ poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ }
+ stageToPool(stage) = poolName
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+ stages += stage
+ }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
@@ -112,6 +83,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
+ poolToActiveStages(stageToPool(stage)) -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index e610252242..5703b146df 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -29,26 +29,21 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
private val stagePage = new StagePage(this)
private val poolPage = new PoolPage(this)
- var stageTable: StageTable = null
var stagePoolInfo: StagePoolInfo = null
- var poolTable: PoolTable = null
var stagePagePoolSource: PoolSource = null
def start() {
+ _listener = Some(new JobProgressListener(sc))
sc.getSchedulingMode match {
case SchedulingMode.FIFO =>
- _listener = Some(new JobProgressListener(sc))
stagePoolInfo = new FIFOStagePoolInfo()
stagePagePoolSource = new FIFOSource()
case SchedulingMode.FAIR =>
- _listener = Some(new FairJobProgressListener(sc))
stagePoolInfo = new FairStagePoolInfo(listener)
stagePagePoolSource = new FairSource(sc)
}
sc.addSparkListener(listener)
- stageTable = new StageTable(dateFmt, formatDuration, listener)
- poolTable = new PoolTable(listener)
}
def formatDuration(ms: Long) = Utils.msDurationToString(ms)
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
index 00703887c3..37d4f8fa6b 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -9,30 +9,23 @@ import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
-/** Page showing specific pool details*/
+/** Page showing specific pool details */
private[spark] class PoolPage(parent: JobProgressUI) {
def listener = parent.listener
- def stageTable: StageTable = parent.stageTable
-
- def poolTable: PoolTable = parent.poolTable
-
def render(request: HttpServletRequest): Seq[Node] = {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
- val stageToPool = listener.stageToPool
val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName)
- poolTable.setPoolSource(poolDetailPoolSource)
+ val poolTable = new PoolTable(poolDetailPoolSource, listener)
- stageTable.setStagePoolInfo(parent.stagePoolInfo)
+ val activeStagesTable = new StageTable(activeStages, parent)
- val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
+ val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
- val content = <h3>Pool </h3> ++ poolTable.toNodeSeq ++
- <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq
-
headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index bb8be4b26e..8788ed8bc1 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -27,7 +27,7 @@ private[spark] trait PoolSource {
/*
* Pool source for FIFO scheduler algorithm on Index page
*/
-private[spark] class FIFOSource() extends PoolSource{
+private[spark] class FIFOSource() extends PoolSource {
def getPools: Seq[Schedulable] = {
Seq[Schedulable]()
}
@@ -36,16 +36,16 @@ private[spark] class FIFOSource() extends PoolSource{
/*
* Pool source for Fair scheduler algorithm on Index page
*/
-private[spark] class FairSource(sc: SparkContext) extends PoolSource{
+private[spark] class FairSource(sc: SparkContext) extends PoolSource {
def getPools: Seq[Schedulable] = {
- sc.getPoolsInfo.toSeq
+ sc.getPools.toSeq
}
}
/*
* specific pool info for pool detail page
*/
-private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource{
+private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource {
def getPools: Seq[Schedulable] = {
val pools = HashSet[Schedulable]()
pools += sc.getPoolNameToPool(poolName)
@@ -54,21 +54,18 @@ private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extend
}
/** Table showing list of pools */
-private[spark] class PoolTable(listener: JobProgressListener) {
+private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) {
- var poolSource: PoolSource = null
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
- def toNodeSeq: Seq[Node] = {
+ def toNodeSeq(): Seq[Node] = {
poolTable(poolRow, poolSource.getPools)
}
- def setPoolSource(poolSource: PoolSource) {
- this.poolSource = poolSource
- }
-
- //pool tables
- def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = {
+ // pool tables
+ def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+ rows: Seq[Schedulable]
+ ): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Pool Name</th>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 83e566c55b..82fb0bd5cc 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -44,18 +44,18 @@ private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends St
}
/** Page showing list of all ongoing and recently finished stages */
-private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuration: Long => String, val listener: JobProgressListener) {
+private[spark] class StageTable(
+ val stages: Seq[Stage],
+ val parent: JobProgressUI) {
- var stagePoolInfo: StagePoolInfo = null
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ var stagePoolInfo = parent.stagePoolInfo
- def toNodeSeq(stages: Seq[Stage]): Seq[Node] = {
+ def toNodeSeq(): Seq[Node] = {
stageTable(stageRow, stages)
}
- def setStagePoolInfo(stagePoolInfo: StagePoolInfo) {
- this.stagePoolInfo = stagePoolInfo
- }
-
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
@@ -77,7 +77,7 @@ private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuratio
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
- case Some(t) => formatDuration(completed - t)
+ case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}