aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-01 17:13:28 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-01 17:13:28 -0700
commit6d7afd7ced7207467395f586a683d7f803f72a55 (patch)
tree211e89e45192f307a69ea74c2a191c12b55c6238 /core
parente466a55a6b803a5295e6dcc106a4abef917e7058 (diff)
parent87fd321a5a12ccea9d5593a4c43aaadfa44855bd (diff)
downloadspark-6d7afd7ced7207467395f586a683d7f803f72a55.tar.gz
spark-6d7afd7ced7207467395f586a683d7f803f72a55.tar.bz2
spark-6d7afd7ced7207467395f586a683d7f803f72a55.zip
Merge pull request #768 from pwendell/pr-695
Minor clean-up of fair scheduler UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala4
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala27
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala13
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolPage.scala7
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala52
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala51
7 files changed, 38 insertions, 131 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0d1f9fa8d4..97e1aaf49e 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -579,23 +579,26 @@ class SparkContext(
/**
* Return pools for fair scheduler
- * TODO(xiajunluan):now, we have not taken nested pools into account
+ * TODO(xiajunluan): We should take nested pools into account
*/
- def getPools: ArrayBuffer[Schedulable] = {
+ def getAllPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
/**
+ * Return the pool associated with the given name, if one exists
+ */
+ def getPoolForName(pool: String): Option[Schedulable] = {
+ taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
+ }
+
+ /**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
- def getPoolNameToPool: HashMap[String, Schedulable] = {
- taskScheduler.rootPool.schedulableNameToSchedulable
- }
-
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 4fbb503e5c..3ac35085eb 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -30,9 +30,7 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
*/
private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100
- val INTER_JOB_WAIT_MS = 500
-
-
+ val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
if (args.length < 2) {
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 4ad787565d..b0d057afa1 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -17,16 +17,11 @@
package spark.ui.jobs
-import java.util.Date
-
import javax.servlet.http.HttpServletRequest
-import scala.Some
import scala.xml.{NodeSeq, Node}
-import spark.scheduler.cluster.TaskInfo
-import spark.scheduler.Stage
-import spark.storage.StorageLevel
+import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
@@ -50,7 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val completedStagesTable = new StageTable(completedStages, parent)
val failedStagesTable = new StageTable(failedStages, parent)
- val poolTable = new PoolTable(parent.stagePagePoolSource, listener)
+ val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
@@ -79,13 +74,17 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</div>
val content = summary ++
- <h3>Pools </h3> ++ poolTable.toNodeSeq ++
- <h3>Active Stages : {activeStages.size}</h3> ++
- activeStagesTable.toNodeSeq++
- <h3>Completed Stages : {completedStages.size}</h3> ++
- completedStagesTable.toNodeSeq++
- <h3>Failed Stages : {failedStages.size}</h3> ++
- failedStagesTable.toNodeSeq
+ {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
+ <h3>Pools</h3> ++ poolTable.toNodeSeq
+ } else {
+ Seq()
+ }} ++
+ <h3>Active Stages : {activeStages.size}</h3> ++
+ activeStagesTable.toNodeSeq++
+ <h3>Completed Stages : {completedStages.size}</h3> ++
+ completedStagesTable.toNodeSeq++
+ <h3>Failed Stages : {failedStages.size}</h3> ++
+ failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 3832c5d33c..c83f102ff3 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -41,25 +41,12 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def listener = _listener.get
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
private val poolPage = new PoolPage(this)
- var stagePoolInfo: StagePoolInfo = null
- var stagePagePoolSource: PoolSource = null
-
def start() {
_listener = Some(new JobProgressListener(sc))
- sc.getSchedulingMode match {
- case SchedulingMode.FIFO =>
- stagePoolInfo = new FIFOStagePoolInfo()
- stagePagePoolSource = new FIFOSource()
- case SchedulingMode.FAIR =>
- stagePoolInfo = new FairStagePoolInfo(listener)
- stagePagePoolSource = new FairSource(sc)
- }
-
sc.addSparkListener(listener)
}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
index 37d4f8fa6b..ee5a6a6a48 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -17,12 +17,11 @@ private[spark] class PoolPage(parent: JobProgressUI) {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
-
- val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName)
- val poolTable = new PoolTable(poolDetailPoolSource, listener)
-
val activeStagesTable = new StageTable(activeStages, parent)
+ val pool = listener.sc.getPoolForName(poolName).get
+ val poolTable = new PoolTable(Seq(pool), listener)
+
val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index 8788ed8bc1..9cfe0d68f0 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -1,65 +1,19 @@
package spark.ui.jobs
-import java.util.Date
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.Some
-import scala.xml.{NodeSeq, Node}
+import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-import spark.SparkContext
import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
-import spark.storage.StorageLevel
import spark.scheduler.cluster.Schedulable
-/*
- * Interface for get pools seq showing on Index or pool detail page
- */
-
-private[spark] trait PoolSource {
- def getPools: Seq[Schedulable]
-}
-
-/*
- * Pool source for FIFO scheduler algorithm on Index page
- */
-private[spark] class FIFOSource() extends PoolSource {
- def getPools: Seq[Schedulable] = {
- Seq[Schedulable]()
- }
-}
-
-/*
- * Pool source for Fair scheduler algorithm on Index page
- */
-private[spark] class FairSource(sc: SparkContext) extends PoolSource {
- def getPools: Seq[Schedulable] = {
- sc.getPools.toSeq
- }
-}
-
-/*
- * specific pool info for pool detail page
- */
-private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource {
- def getPools: Seq[Schedulable] = {
- val pools = HashSet[Schedulable]()
- pools += sc.getPoolNameToPool(poolName)
- pools.toSeq
- }
-}
-
/** Table showing list of pools */
-private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) {
+private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
def toNodeSeq(): Seq[Node] = {
- poolTable(poolRow, poolSource.getPools)
+ poolTable(poolRow, pools)
}
// pool tables
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index e18b70f0b9..3257f4e360 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -10,51 +10,20 @@ import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-import spark.scheduler.cluster.TaskInfo
+import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.Utils
import spark.storage.StorageLevel
-/*
- * Interface to get stage's pool name
- */
-private[spark] trait StagePoolInfo {
- def getStagePoolName(s: Stage): String
-
- def hasHref: Boolean
-}
-
-/*
- * For FIFO scheduler algorithm, just show "N/A" and its link status is false
- */
-private[spark] class FIFOStagePoolInfo extends StagePoolInfo {
- def getStagePoolName(s: Stage): String = "N/A"
-
- def hasHref: Boolean = false
-}
-
-/*
- * For Fair scheduler algorithm, show its pool name and pool detail link status is true
- */
-private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo {
- def getStagePoolName(s: Stage): String = {
- listener.stageToPool(s)
- }
-
- def hasHref: Boolean = true
-}
-
/** Page showing list of all ongoing and recently finished stages */
-private[spark] class StageTable(
- val stages: Seq[Stage],
- val parent: JobProgressUI) {
+private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
- var stagePoolInfo = parent.stagePoolInfo
-
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
def toNodeSeq(): Seq[Node] = {
stageTable(stageRow, stages)
}
@@ -64,7 +33,7 @@ private[spark] class StageTable(
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
- <th>Pool Name</th>
+ {if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Origin</th>
<th>Submitted</th>
<td>Duration</td>
@@ -116,15 +85,13 @@ private[spark] class StageTable(
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
- val poolName = stagePoolInfo.getStagePoolName(s)
+ val poolName = listener.stageToPool.get(s)
<tr>
<td>{s.id}</td>
- <td>{if (stagePoolInfo.hasHref) {
- <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a>
- } else {
- {poolName}
- }}</td>
+ {if (isFairScheduler) {
+ <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+ }
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,