diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-08-01 17:13:28 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-08-01 17:13:28 -0700 |
commit | 6d7afd7ced7207467395f586a683d7f803f72a55 (patch) | |
tree | 211e89e45192f307a69ea74c2a191c12b55c6238 /core | |
parent | e466a55a6b803a5295e6dcc106a4abef917e7058 (diff) | |
parent | 87fd321a5a12ccea9d5593a4c43aaadfa44855bd (diff) | |
download | spark-6d7afd7ced7207467395f586a683d7f803f72a55.tar.gz spark-6d7afd7ced7207467395f586a683d7f803f72a55.tar.bz2 spark-6d7afd7ced7207467395f586a683d7f803f72a55.zip |
Merge pull request #768 from pwendell/pr-695
Minor clean-up of fair scheduler UI
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/UIWorkloadGenerator.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/PoolPage.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/PoolTable.scala | 52 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StageTable.scala | 51 |
7 files changed, 38 insertions, 131 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0d1f9fa8d4..97e1aaf49e 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -579,23 +579,26 @@ class SparkContext( /** * Return pools for fair scheduler - * TODO(xiajunluan):now, we have not taken nested pools into account + * TODO(xiajunluan): We should take nested pools into account */ - def getPools: ArrayBuffer[Schedulable] = { + def getAllPools: ArrayBuffer[Schedulable] = { taskScheduler.rootPool.schedulableQueue } /** + * Return the pool associated with the given name, if one exists + */ + def getPoolForName(pool: String): Option[Schedulable] = { + taskScheduler.rootPool.schedulableNameToSchedulable.get(pool) + } + + /** * Return current scheduling mode */ def getSchedulingMode: SchedulingMode.SchedulingMode = { taskScheduler.schedulingMode } - def getPoolNameToPool: HashMap[String, Schedulable] = { - taskScheduler.rootPool.schedulableNameToSchedulable - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index 4fbb503e5c..3ac35085eb 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -30,9 +30,7 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode */ private[spark] object UIWorkloadGenerator { val NUM_PARTITIONS = 100 - val INTER_JOB_WAIT_MS = 500 - - + val INTER_JOB_WAIT_MS = 5000 def main(args: Array[String]) { if (args.length < 2) { diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 4ad787565d..b0d057afa1 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -17,16 +17,11 @@ package spark.ui.jobs -import java.util.Date - import javax.servlet.http.HttpServletRequest -import scala.Some import scala.xml.{NodeSeq, Node} -import spark.scheduler.cluster.TaskInfo -import spark.scheduler.Stage -import spark.storage.StorageLevel +import spark.scheduler.cluster.SchedulingMode import spark.ui.Page._ import spark.ui.UIUtils._ import spark.Utils @@ -50,7 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val completedStagesTable = new StageTable(completedStages, parent) val failedStagesTable = new StageTable(failedStages, parent) - val poolTable = new PoolTable(parent.stagePagePoolSource, listener) + val poolTable = new PoolTable(listener.sc.getAllPools, listener) val summary: NodeSeq = <div> <ul class="unstyled"> @@ -79,13 +74,17 @@ private[spark] class IndexPage(parent: JobProgressUI) { </div> val content = summary ++ - <h3>Pools </h3> ++ poolTable.toNodeSeq ++ - <h3>Active Stages : {activeStages.size}</h3> ++ - activeStagesTable.toNodeSeq++ - <h3>Completed Stages : {completedStages.size}</h3> ++ - completedStagesTable.toNodeSeq++ - <h3>Failed Stages : {failedStages.size}</h3> ++ - failedStagesTable.toNodeSeq + {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { + <h3>Pools</h3> ++ poolTable.toNodeSeq + } else { + Seq() + }} ++ + <h3>Active Stages : {activeStages.size}</h3> ++ + activeStagesTable.toNodeSeq++ + <h3>Completed Stages : {completedStages.size}</h3> ++ + completedStagesTable.toNodeSeq++ + <h3>Failed Stages : {failedStages.size}</h3> ++ + failedStagesTable.toNodeSeq headerSparkPage(content, parent.sc, "Spark Stages", Jobs) } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 3832c5d33c..c83f102ff3 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -41,25 +41,12 @@ private[spark] class JobProgressUI(val sc: SparkContext) { def listener = _listener.get val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) private val poolPage = new PoolPage(this) - var stagePoolInfo: StagePoolInfo = null - var stagePagePoolSource: PoolSource = null - def start() { _listener = Some(new JobProgressListener(sc)) - sc.getSchedulingMode match { - case SchedulingMode.FIFO => - stagePoolInfo = new FIFOStagePoolInfo() - stagePagePoolSource = new FIFOSource() - case SchedulingMode.FAIR => - stagePoolInfo = new FairStagePoolInfo(listener) - stagePagePoolSource = new FairSource(sc) - } - sc.addSparkListener(listener) } diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala index 37d4f8fa6b..ee5a6a6a48 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -17,12 +17,11 @@ private[spark] class PoolPage(parent: JobProgressUI) { val poolName = request.getParameter("poolname") val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq - - val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName) - val poolTable = new PoolTable(poolDetailPoolSource, listener) - val activeStagesTable = new StageTable(activeStages, parent) + val pool = listener.sc.getPoolForName(poolName).get + val poolTable = new PoolTable(Seq(pool), listener) + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala index 8788ed8bc1..9cfe0d68f0 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -1,65 +1,19 @@ package spark.ui.jobs -import java.util.Date - -import javax.servlet.http.HttpServletRequest - -import scala.Some -import scala.xml.{NodeSeq, Node} +import scala.xml.Node import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import spark.SparkContext import spark.scheduler.Stage -import spark.ui.UIUtils._ -import spark.ui.Page._ -import spark.storage.StorageLevel import spark.scheduler.cluster.Schedulable -/* - * Interface for get pools seq showing on Index or pool detail page - */ - -private[spark] trait PoolSource { - def getPools: Seq[Schedulable] -} - -/* - * Pool source for FIFO scheduler algorithm on Index page - */ -private[spark] class FIFOSource() extends PoolSource { - def getPools: Seq[Schedulable] = { - Seq[Schedulable]() - } -} - -/* - * Pool source for Fair scheduler algorithm on Index page - */ -private[spark] class FairSource(sc: SparkContext) extends PoolSource { - def getPools: Seq[Schedulable] = { - sc.getPools.toSeq - } -} - -/* - * specific pool info for pool detail page - */ -private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource { - def getPools: Seq[Schedulable] = { - val pools = HashSet[Schedulable]() - pools += sc.getPoolNameToPool(poolName) - pools.toSeq - } -} - /** Table showing list of pools */ -private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) { +private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) { var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages def toNodeSeq(): Seq[Node] = { - poolTable(poolRow, poolSource.getPools) + poolTable(poolRow, pools) } // pool tables diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index e18b70f0b9..3257f4e360 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -10,51 +10,20 @@ import scala.xml.{NodeSeq, Node} import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import spark.scheduler.cluster.TaskInfo +import spark.scheduler.cluster.{SchedulingMode, TaskInfo} import spark.scheduler.Stage import spark.ui.UIUtils._ import spark.ui.Page._ import spark.Utils import spark.storage.StorageLevel -/* - * Interface to get stage's pool name - */ -private[spark] trait StagePoolInfo { - def getStagePoolName(s: Stage): String - - def hasHref: Boolean -} - -/* - * For FIFO scheduler algorithm, just show "N/A" and its link status is false - */ -private[spark] class FIFOStagePoolInfo extends StagePoolInfo { - def getStagePoolName(s: Stage): String = "N/A" - - def hasHref: Boolean = false -} - -/* - * For Fair scheduler algorithm, show its pool name and pool detail link status is true - */ -private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo { - def getStagePoolName(s: Stage): String = { - listener.stageToPool(s) - } - - def hasHref: Boolean = true -} - /** Page showing list of all ongoing and recently finished stages */ -private[spark] class StageTable( - val stages: Seq[Stage], - val parent: JobProgressUI) { +private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) { val listener = parent.listener val dateFmt = parent.dateFmt - var stagePoolInfo = parent.stagePoolInfo - + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + def toNodeSeq(): Seq[Node] = { stageTable(stageRow, stages) } @@ -64,7 +33,7 @@ private[spark] class StageTable( <table class="table table-bordered table-striped table-condensed sortable"> <thead> <th>Stage Id</th> - <th>Pool Name</th> + {if (isFairScheduler) {<th>Pool Name</th>} else {}} <th>Origin</th> <th>Submitted</th> <td>Duration</td> @@ -116,15 +85,13 @@ private[spark] class StageTable( val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) val totalTasks = s.numPartitions - val poolName = stagePoolInfo.getStagePoolName(s) + val poolName = listener.stageToPool.get(s) <tr> <td>{s.id}</td> - <td>{if (stagePoolInfo.hasHref) { - <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a> - } else { - {poolName} - }}</td> + {if (isFairScheduler) { + <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>} + } <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> <td>{submissionTime}</td> <td>{getElapsedTime(s.submissionTime, |