aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-07-12 14:25:18 +0800
committerAndrew xia <junluan.xia@intel.com>2013-07-12 14:25:18 +0800
commit2080e250060975a876a388eb785e7f2b3cf2c0cd (patch)
tree8bed51a088f8babbc0b2b79844a297b841e3615b /core
parent744da8eefda3ae66f3471a12cc02b29cf5441dbc (diff)
downloadspark-2080e250060975a876a388eb785e7f2b3cf2c0cd.tar.gz
spark-2080e250060975a876a388eb785e7f2b3cf2c0cd.tar.bz2
spark-2080e250060975a876a388eb785e7f2b3cf2c0cd.zip
Enhance job ui in spark ui system with adding pool information
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/TaskScheduler.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala121
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala140
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala112
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolPage.scala38
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala98
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala143
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala6
-rw-r--r--core/src/test/scala/spark/scheduler/JobLoggerSuite.scala2
20 files changed, 530 insertions, 199 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 43e6af2351..b5225d5681 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -10,6 +10,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
+import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
@@ -43,13 +44,14 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
+import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -540,6 +542,17 @@ class SparkContext(
env.blockManager.master.getStorageStatus
}
+ def getPoolsInfo: ArrayBuffer[Schedulable] = {
+ taskScheduler.rootPool.schedulableQueue
+ }
+
+ def getSchedulingMode: SchedulingMode = {
+ taskScheduler.schedulingMode
+ }
+
+ def getPoolNameToPool: HashMap[String, Schedulable] = {
+ taskScheduler.rootPool.schedulableNameToSchedulable
+ }
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 3d3b9ea011..c865743e37 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -472,11 +472,11 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
- sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
+ val properties = idToActiveJob(stage.priority).properties
+ sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size, properties)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
- val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 6a9d52f356..8e5540873f 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -45,7 +45,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
event match {
case SparkListenerJobStart(job, properties) =>
processJobStartEvent(job, properties)
- case SparkListenerStageSubmitted(stage, taskSize) =>
+ case SparkListenerStageSubmitted(stage, taskSize, properties) =>
processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 8de3aa91a4..94fdad9b98 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -8,7 +8,7 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties = null) extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
index 7787b54762..5cdf846032 100644
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
@@ -1,5 +1,7 @@
package spark.scheduler
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
@@ -8,6 +10,11 @@ package spark.scheduler
* the TaskSchedulerListener interface.
*/
private[spark] trait TaskScheduler {
+
+ def rootPool: Pool
+
+ def schedulingMode: SchedulingMode
+
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 3a0c29b27f..1b23fd6cef 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -12,6 +12,7 @@ import spark.scheduler._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -97,6 +98,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
+ //default scheduler is FIFO
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
@@ -104,15 +107,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
- //default scheduler is FIFO
- val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
- rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+ rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
- case "FIFO" =>
+ case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
- case "FAIR" =>
+ case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index fe6420a522..7a6a6b7826 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -90,8 +90,8 @@ private[spark] class ClusterTaskSetManager(
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
- var parent:Schedulable = null
-
+ var parent: Schedulable = null
+ var schedulableQueue :ArrayBuffer[Schedulable] = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 2dd9c0564f..2e4f14c11f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -1,13 +1,17 @@
package spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import scala.collection.mutable.ArrayBuffer
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Schedulable
+ //childrens
+ def schedulableQueue: ArrayBuffer[Schedulable]
+ def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 6e0c6793e0..c5c7ee3b22 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,7 +1,7 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR","FIFO"){
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE"){
type SchedulingMode = Value
- val FAIR,FIFO = Value
+ val FAIR,FIFO,NONE = Value
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index b4dd75d90f..472e01b227 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -6,6 +6,7 @@ import spark.TaskState.TaskState
import java.nio.ByteBuffer
private[spark] trait TaskSetManager extends Schedulable {
+ def schedulingMode = SchedulingMode.NONE
def taskSet: TaskSet
def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index b000e328e6..19a48895e3 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -12,6 +12,7 @@ import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
import spark.scheduler.cluster._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._
/**
@@ -63,6 +64,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -70,15 +72,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null
override def start() {
- //default scheduler is FIFO
- val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
- rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+ rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
- case "FIFO" =>
+ case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
- case "FAIR" =>
+ case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index f12fec41d5..8954f40ea9 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -14,6 +14,7 @@ import spark.scheduler.cluster._
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
var parent: Schedulable = null
+ var schedulableQueue :ArrayBuffer[Schedulable] = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 1e675ab2cb..e765cecb01 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -6,107 +6,52 @@ import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.storage.StorageLevel
+import spark.scheduler.cluster.Schedulable
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-/** Page showing list of all ongoing and recently finished stages */
+/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
- val dateFmt = parent.dateFmt
+
+ def stageTable: StageTable = parent.stageTable
+
+ def poolTable: PoolTable = parent.poolTable
def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
- /** Special table which merges two header cells. */
- def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <th>Stage Id</th>
- <th>Origin</th>
- <th>Submitted</th>
- <td>Duration</td>
- <td colspan="2">Tasks: Complete/Total</td>
- <td>Shuffle Activity</td>
- <td>Stored RDD</td>
- </thead>
- <tbody>
- {rows.map(r => makeRow(r))}
- </tbody>
- </table>
- }
-
- val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
- val completedStageTable = stageTable(stageRow, completedStages)
- val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
-
- val content = <h2>Active Stages</h2> ++ activeStageTable ++
- <h2>Completed Stages</h2> ++ completedStageTable ++
- <h2>Failed Stages</h2> ++ failedStageTable
-
- headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
- }
-
- def getElapsedTime(submitted: Option[Long], completed: Long): String = {
- submitted match {
- case Some(t) => parent.formatDuration(completed - t)
- case _ => "Unknown"
- }
- }
-
- def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
- val width=130
- val height=15
- val completeWidth = (completed.toDouble / total) * width
-
- <svg width={width.toString} height={height.toString}>
- <rect width={width.toString} height={height.toString}
- fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
- <rect width={completeWidth.toString} height={height.toString}
- fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
- </svg>
- }
-
-
- def stageRow(s: Stage): Seq[Node] = {
- val submissionTime = s.submissionTime match {
- case Some(t) => dateFmt.format(new Date(t))
- case None => "Unknown"
- }
- val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
- val shuffleInfo = (read, write) match {
- case (true, true) => "Read/Write"
- case (true, false) => "Read"
- case (false, true) => "Write"
- case _ => ""
- }
- val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
- val totalTasks = s.numPartitions
-
- <tr>
- <td>{s.id}</td>
- <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td>
- <td>{submissionTime}</td>
- <td>{getElapsedTime(s.submissionTime,
- s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
- <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
- {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ =>
- }}
- </td>
- <td>{shuffleInfo}</td>
- <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
- <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
- {Option(s.rdd.name).getOrElse(s.rdd.id)}
- </a>
- }}
- </td>
- </tr>
+ stageTable.setStagePoolInfo(parent.stagePoolInfo)
+ poolTable.setPoolSource(parent.stagePagePoolSource)
+
+ val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
+ val completedStageNodeSeq = stageTable.toNodeSeq(completedStages)
+ val failedStageNodeSeq = stageTable.toNodeSeq(failedStages)
+
+ val content = <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Active Stages Number:</strong> {activeStages.size} </li>
+ <li><strong>Completed Stages Number:</strong> {completedStages.size} </li>
+ <li><strong>Failed Stages Number:</strong> {failedStages.size} </li>
+ <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
+ </ul>
+ </div>
+ </div> ++
+ <h3>Pools </h3> ++ poolTable.toNodeSeq ++
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq++
+ <h3>Completed Stages : {completedStages.size}</h3> ++ completedStageNodeSeq++
+ <h3>Failed Stages : {failedStages.size}</h3> ++ failedStageNodeSeq
+
+ headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs)
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
new file mode 100644
index 0000000000..1244f9538b
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -0,0 +1,140 @@
+package spark.ui.jobs
+
+import scala.Seq
+import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+
+import spark.{ExceptionFailure, SparkContext, Success, Utils}
+import spark.scheduler._
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+import collection.mutable
+
+private[spark] class FairJobProgressListener(val sparkContext: SparkContext)
+ extends JobProgressListener(sparkContext) {
+
+ val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
+ val DEFAULT_POOL_NAME = "default"
+
+ override val stageToPool = HashMap[Stage, String]()
+ override val poolToActiveStages = HashMap[String, HashSet[Stage]]()
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ super.onStageCompleted(stageCompleted)
+ val stage = stageCompleted.stageInfo.stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+ super.onStageSubmitted(stageSubmitted)
+ val stage = stageSubmitted.stage
+ var poolName = DEFAULT_POOL_NAME
+ if (stageSubmitted.properties != null) {
+ poolName = stageSubmitted.properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
+ }
+ stageToPool(stage) = poolName
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+ stages += stage
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ super.onJobEnd(jobEnd)
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ poolToActiveStages(stageToPool(stage)) -= stage
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+}
+
+private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
+ // How many stages to remember
+ val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+
+ def stageToPool: HashMap[Stage, String] = null
+ def poolToActiveStages: HashMap[String, HashSet[Stage]] =null
+
+ val activeStages = HashSet[Stage]()
+ val completedStages = ListBuffer[Stage]()
+ val failedStages = ListBuffer[Stage]()
+
+ val stageToTasksComplete = HashMap[Int, Int]()
+ val stageToTasksFailed = HashMap[Int, Int]()
+ val stageToTaskInfos =
+ HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {}
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ val stage = stageCompleted.stageInfo.stage
+ activeStages -= stage
+ completedStages += stage
+ trimIfNecessary(completedStages)
+ }
+
+ /** If stages is too large, remove and garbage collect old stages */
+ def trimIfNecessary(stages: ListBuffer[Stage]) {
+ if (stages.size > RETAINED_STAGES) {
+ val toRemove = RETAINED_STAGES / 10
+ stages.takeRight(toRemove).foreach( s => {
+ stageToTaskInfos.remove(s.id)
+ })
+ stages.trimEnd(toRemove)
+ }
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
+ activeStages += stageSubmitted.stage
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val sid = taskEnd.task.stageId
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ (Some(e), e.metrics)
+ case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Some(taskEnd.taskMetrics))
+ }
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ stageToTaskInfos(sid) = taskList
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+
+ /** Is this stage's input from a shuffle read. */
+ def hasShuffleRead(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+
+ /** Is this stage's output to a shuffle write. */
+ def hasShuffleWrite(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 84730cc091..e610252242 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -14,9 +14,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
import collection.mutable
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@@ -24,104 +24,38 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def listener = _listener.get
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
+ private val poolPage = new PoolPage(this)
+
+ var stageTable: StageTable = null
+ var stagePoolInfo: StagePoolInfo = null
+ var poolTable: PoolTable = null
+ var stagePagePoolSource: PoolSource = null
def start() {
- _listener = Some(new JobProgressListener)
+ sc.getSchedulingMode match {
+ case SchedulingMode.FIFO =>
+ _listener = Some(new JobProgressListener(sc))
+ stagePoolInfo = new FIFOStagePoolInfo()
+ stagePagePoolSource = new FIFOSource()
+ case SchedulingMode.FAIR =>
+ _listener = Some(new FairJobProgressListener(sc))
+ stagePoolInfo = new FairStagePoolInfo(listener)
+ stagePagePoolSource = new FairSource(sc)
+ }
+
sc.addSparkListener(listener)
+ stageTable = new StageTable(dateFmt, formatDuration, listener)
+ poolTable = new PoolTable(listener)
}
def formatDuration(ms: Long) = Utils.msDurationToString(ms)
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+ ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
("/stages", (request: HttpServletRequest) => indexPage.render(request))
)
}
-
-private[spark] class JobProgressListener extends SparkListener {
- // How many stages to remember
- val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
-
- val activeStages = HashSet[Stage]()
- val completedStages = ListBuffer[Stage]()
- val failedStages = ListBuffer[Stage]()
-
- val stageToTasksComplete = HashMap[Int, Int]()
- val stageToTasksFailed = HashMap[Int, Int]()
- val stageToTaskInfos =
- HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-
- override def onJobStart(jobStart: SparkListenerJobStart) {}
-
- override def onStageCompleted(stageCompleted: StageCompleted) = {
- val stage = stageCompleted.stageInfo.stage
- activeStages -= stage
- completedStages += stage
- trimIfNecessary(completedStages)
- }
-
- /** If stages is too large, remove and garbage collect old stages */
- def trimIfNecessary(stages: ListBuffer[Stage]) {
- if (stages.size > RETAINED_STAGES) {
- val toRemove = RETAINED_STAGES / 10
- stages.takeRight(toRemove).foreach( s => {
- stageToTaskInfos.remove(s.id)
- })
- stages.trimEnd(toRemove)
- }
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
- activeStages += stageSubmitted.stage
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val sid = taskEnd.task.stageId
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
- taskEnd.reason match {
- case e: ExceptionFailure =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case _ =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
- }
- val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- activeStages -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
- case _ =>
- }
- case _ =>
- }
- }
-
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
new file mode 100644
index 0000000000..00703887c3
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -0,0 +1,38 @@
+package spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+
+/** Page showing specific pool details*/
+private[spark] class PoolPage(parent: JobProgressUI) {
+ def listener = parent.listener
+
+ def stageTable: StageTable = parent.stageTable
+
+ def poolTable: PoolTable = parent.poolTable
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val poolName = request.getParameter("poolname")
+ val poolToActiveStages = listener.poolToActiveStages
+ val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
+ val stageToPool = listener.stageToPool
+
+ val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName)
+ poolTable.setPoolSource(poolDetailPoolSource)
+
+ stageTable.setStagePoolInfo(parent.stagePoolInfo)
+
+ val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
+
+ val content = <h3>Pool </h3> ++ poolTable.toNodeSeq ++
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq
+
+ headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
new file mode 100644
index 0000000000..bb8be4b26e
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -0,0 +1,98 @@
+package spark.ui.jobs
+
+import java.util.Date
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.Some
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import spark.SparkContext
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.storage.StorageLevel
+import spark.scheduler.cluster.Schedulable
+
+/*
+ * Interface for get pools seq showing on Index or pool detail page
+ */
+
+private[spark] trait PoolSource {
+ def getPools: Seq[Schedulable]
+}
+
+/*
+ * Pool source for FIFO scheduler algorithm on Index page
+ */
+private[spark] class FIFOSource() extends PoolSource{
+ def getPools: Seq[Schedulable] = {
+ Seq[Schedulable]()
+ }
+}
+
+/*
+ * Pool source for Fair scheduler algorithm on Index page
+ */
+private[spark] class FairSource(sc: SparkContext) extends PoolSource{
+ def getPools: Seq[Schedulable] = {
+ sc.getPoolsInfo.toSeq
+ }
+}
+
+/*
+ * specific pool info for pool detail page
+ */
+private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource{
+ def getPools: Seq[Schedulable] = {
+ val pools = HashSet[Schedulable]()
+ pools += sc.getPoolNameToPool(poolName)
+ pools.toSeq
+ }
+}
+
+/** Table showing list of pools */
+private[spark] class PoolTable(listener: JobProgressListener) {
+
+ var poolSource: PoolSource = null
+ var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
+
+ def toNodeSeq: Seq[Node] = {
+ poolTable(poolRow, poolSource.getPools)
+ }
+
+ def setPoolSource(poolSource: PoolSource) {
+ this.poolSource = poolSource
+ }
+
+ //pool tables
+ def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Pool Name</th>
+ <th>Minimum Share</th>
+ <th>Pool Weight</th>
+ <td>Active Stages</td>
+ <td>Running Tasks</td>
+ <td>SchedulingMode</td>
+ </thead>
+ <tbody>
+ {rows.map(r => makeRow(r, poolToActiveStages))}
+ </tbody>
+ </table>
+ }
+
+ def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+ <tr>
+ <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+ <td>{p.minShare}</td>
+ <td>{p.weight}</td>
+ <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
+ <td>{p.runningTasks}</td>
+ <td>{p.schedulingMode}</td>
+ </tr>
+ }
+}
+
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
new file mode 100644
index 0000000000..83e566c55b
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -0,0 +1,143 @@
+package spark.ui.jobs
+
+import java.util.Date
+import java.text.SimpleDateFormat
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.Some
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashMap
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.storage.StorageLevel
+
+/*
+ * Interface to get stage's pool name
+ */
+private[spark] trait StagePoolInfo {
+ def getStagePoolName(s: Stage): String
+
+ def hasHerf: Boolean
+}
+
+/*
+ * For FIFO scheduler algorithm, just show "N/A" and its link status is false
+ */
+private[spark] class FIFOStagePoolInfo extends StagePoolInfo {
+ def getStagePoolName(s: Stage): String = "N/A"
+
+ def hasHerf: Boolean = false
+}
+
+/*
+ * For Fair scheduler algorithm, show its pool name and pool detail link status is true
+ */
+private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo {
+ def getStagePoolName(s: Stage): String = {
+ listener.stageToPool(s)
+ }
+
+ def hasHerf: Boolean = true
+}
+
+/** Page showing list of all ongoing and recently finished stages */
+private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuration: Long => String, val listener: JobProgressListener) {
+
+ var stagePoolInfo: StagePoolInfo = null
+
+ def toNodeSeq(stages: Seq[Stage]): Seq[Node] = {
+ stageTable(stageRow, stages)
+ }
+
+ def setStagePoolInfo(stagePoolInfo: StagePoolInfo) {
+ this.stagePoolInfo = stagePoolInfo
+ }
+
+ /** Special table which merges two header cells. */
+ def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Stage Id</th>
+ <th>Pool Name</th>
+ <th>Origin</th>
+ <th>Submitted</th>
+ <td>Duration</td>
+ <td colspan="2">Tasks: Complete/Total</td>
+ <td>Shuffle Activity</td>
+ <td>Stored RDD</td>
+ </thead>
+ <tbody>
+ {rows.map(r => makeRow(r))}
+ </tbody>
+ </table>
+ }
+
+ def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ submitted match {
+ case Some(t) => formatDuration(completed - t)
+ case _ => "Unknown"
+ }
+ }
+
+ def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
+ val width=130
+ val height=15
+ val completeWidth = (completed.toDouble / total) * width
+
+ <svg width={width.toString} height={height.toString}>
+ <rect width={width.toString} height={height.toString}
+ fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
+ <rect width={completeWidth.toString} height={height.toString}
+ fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
+ </svg>
+ }
+
+
+ def stageRow(s: Stage): Seq[Node] = {
+ val submissionTime = s.submissionTime match {
+ case Some(t) => dateFmt.format(new Date(t))
+ case None => "Unknown"
+ }
+ val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
+ val shuffleInfo = (read, write) match {
+ case (true, true) => "Read/Write"
+ case (true, false) => "Read"
+ case (false, true) => "Write"
+ case _ => ""
+ }
+ val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+ val totalTasks = s.numPartitions
+
+ val poolName = stagePoolInfo.getStagePoolName(s)
+
+ <tr>
+ <td>{s.id}</td>
+ <td>{if (stagePoolInfo.hasHerf) {
+ <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a>
+ } else {
+ {poolName}
+ }}</td>
+ <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td>
+ <td>{submissionTime}</td>
+ <td>{getElapsedTime(s.submissionTime,
+ s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
+ <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
+ {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+ case f if f > 0 => "(%s failed)".format(f)
+ case _ =>
+ }}
+ </td>
+ <td>{shuffleInfo}</td>
+ <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
+ <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
+ {Option(s.rdd.name).getOrElse(s.rdd.id)}
+ </a>
+ }}
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index 30e6fef950..da72bfbf89 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -22,6 +22,10 @@ import spark.TaskEndReason
import spark.{FetchFailed, Success}
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
@@ -39,6 +43,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 699901f1a1..328e7e7529 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -40,7 +40,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
- joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
+ joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")