aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-17 18:58:48 -0700
committerReynold Xin <rxin@apache.org>2014-07-17 18:58:48 -0700
commit72e9021eaf26f31a82120505f8b764b18fbe8d48 (patch)
tree7fd67414ac728bc5b10217812aadab275f82c1a9 /core
parent935fe65ff6559a0e3b481e7508fa14337b23020b (diff)
downloadspark-72e9021eaf26f31a82120505f8b764b18fbe8d48.tar.gz
spark-72e9021eaf26f31a82120505f8b764b18fbe8d48.tar.bz2
spark-72e9021eaf26f31a82120505f8b764b18fbe8d48.zip
[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener
This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue. @andrewor14 Author: Reynold Xin <rxin@apache.org> Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits: 1ac3f97 [Reynold Xin] Oops. Properly handle description. f5736ad [Reynold Xin] Code review comments. b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables 7a7b6c4 [Reynold Xin] Revert css change. f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up. 63256f5 [Reynold Xin] [SPARK-2320] Reduce <pre> block font size.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala156
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala36
7 files changed, 205 insertions, 224 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
deleted file mode 100644
index c4a8996c0b..0000000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Class for reporting aggregated metrics for each executor in stage UI.
- */
-@DeveloperApi
-class ExecutorSummary {
- var taskTime : Long = 0
- var failedTasks : Int = 0
- var succeededTasks : Int = 0
- var inputBytes: Long = 0
- var shuffleRead : Long = 0
- var shuffleWrite : Long = 0
- var memoryBytesSpilled : Long = 0
- var diskBytesSpilled : Long = 0
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 52020954ea..0cc51c8737 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
import scala.xml.Node
import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils
/** Page showing executor summary */
@@ -64,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}
- val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
- executorIdToSummary match {
- case Some(x) =>
- x.toSeq.sortBy(_._1).map { case (k, v) => {
- // scalastyle:off
+ listener.stageIdToData.get(stageId) match {
+ case Some(stageData: StageUIData) =>
+ stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
@@ -76,16 +75,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
- <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
- <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
- <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
- <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
- <td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
+ <td sorttable_customekey={v.inputBytes.toString}>
+ {Utils.bytesToString(v.inputBytes)}</td>
+ <td sorttable_customekey={v.shuffleRead.toString}>
+ {Utils.bytesToString(v.shuffleRead)}</td>
+ <td sorttable_customekey={v.shuffleWrite.toString}>
+ {Utils.bytesToString(v.shuffleWrite)}</td>
+ <td sorttable_customekey={v.memoryBytesSpilled.toString}>
+ {Utils.bytesToString(v.memoryBytesSpilled)}</td>
+ <td sorttable_customekey={v.diskBytesSpilled.toString}>
+ {Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
- // scalastyle:on
}
- }
- case _ => Seq[Node]()
+ case None =>
+ Seq.empty[Node]
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2286a7f952..efb527b4f0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.ui.jobs.UIData._
/**
* :: DeveloperApi ::
@@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
* updating the internal data structures concurrently.
*/
@DeveloperApi
-class JobProgressListener(conf: SparkConf) extends SparkListener {
+class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
import JobProgressListener._
@@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
- // TODO: Should probably consolidate all following into a single hash map.
- val stageIdToTime = HashMap[Int, Long]()
- val stageIdToInputBytes = HashMap[Int, Long]()
- val stageIdToShuffleRead = HashMap[Int, Long]()
- val stageIdToShuffleWrite = HashMap[Int, Long]()
- val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
- val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
- val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
- val stageIdToTasksComplete = HashMap[Int, Int]()
- val stageIdToTasksFailed = HashMap[Int, Int]()
- val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
- val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
- val stageIdToPool = HashMap[Int, String]()
- val stageIdToDescription = HashMap[Int, String]()
+ val stageIdToData = new HashMap[Int, StageUIData]
+
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
- // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
- poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
+ val stageData = stageIdToData.getOrElseUpdate(stageId, {
+ logWarning("Stage completed for unknown stage " + stageId)
+ new StageUIData
+ })
+
+ poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
@@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
- stages.take(toRemove).foreach { s =>
- stageIdToTime.remove(s.stageId)
- stageIdToInputBytes.remove(s.stageId)
- stageIdToShuffleRead.remove(s.stageId)
- stageIdToShuffleWrite.remove(s.stageId)
- stageIdToMemoryBytesSpilled.remove(s.stageId)
- stageIdToDiskBytesSpilled.remove(s.stageId)
- stageIdToTasksActive.remove(s.stageId)
- stageIdToTasksComplete.remove(s.stageId)
- stageIdToTasksFailed.remove(s.stageId)
- stageIdToTaskData.remove(s.stageId)
- stageIdToExecutorSummaries.remove(s.stageId)
- stageIdToPool.remove(s.stageId)
- stageIdToDescription.remove(s.stageId)
- }
+ stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
stages.trimStart(toRemove)
}
}
@@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
- stageIdToPool(stage.stageId) = poolName
- val description = Option(stageSubmitted.properties).flatMap {
+ val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
+ stageData.schedulingPool = poolName
+
+ stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
- description.map(d => stageIdToDescription(stage.stageId) = d)
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
stages(stage.stageId) = stage
}
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val sid = taskStart.stageId
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
- val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
- tasksActive(taskInfo.taskId) = taskInfo
- val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
- taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
- stageIdToTaskData(sid) = taskMap
+ val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+ logWarning("Task start for unknown stage " + taskStart.stageId)
+ new StageUIData
+ })
+ stageData.numActiveTasks += 1
+ stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
}
}
@@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val sid = taskEnd.stageId
val info = taskEnd.taskInfo
-
if (info != null) {
+ val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+ logWarning("Task end for unknown stage " + taskEnd.stageId)
+ new StageUIData
+ })
+
// create executor summary map if necessary
- val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
- op = new HashMap[String, ExecutorSummary]())
+ val executorSummaryMap = stageData.executorSummary
executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)
- val executorSummary = executorSummaryMap.get(info.executorId)
- executorSummary match {
- case Some(y) => {
- // first update failed-task, succeed-task
- taskEnd.reason match {
- case Success =>
- y.succeededTasks += 1
- case _ =>
- y.failedTasks += 1
- }
-
- // update duration
- y.taskTime += info.duration
-
- val metrics = taskEnd.taskMetrics
- if (metrics != null) {
- metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
- metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
- metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
- y.memoryBytesSpilled += metrics.memoryBytesSpilled
- y.diskBytesSpilled += metrics.diskBytesSpilled
- }
+ executorSummaryMap.get(info.executorId).foreach { y =>
+ // first update failed-task, succeed-task
+ taskEnd.reason match {
+ case Success =>
+ y.succeededTasks += 1
+ case _ =>
+ y.failedTasks += 1
+ }
+
+ // update duration
+ y.taskTime += info.duration
+
+ val metrics = taskEnd.taskMetrics
+ if (metrics != null) {
+ metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
+ metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
+ metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+ y.memoryBytesSpilled += metrics.memoryBytesSpilled
+ y.diskBytesSpilled += metrics.diskBytesSpilled
}
- case _ => {}
}
- val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
- // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
- tasksActive.remove(info.taskId)
+ stageData.numActiveTasks -= 1
val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
taskEnd.reason match {
case org.apache.spark.Success =>
- stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
+ stageData.numCompleteTasks += 1
(None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
- stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+ stageData.numFailedTasks += 1
(Some(e.toErrorString), e.metrics)
case e: TaskFailedReason => // All other failure cases
- stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+ stageData.numFailedTasks += 1
(Some(e.toErrorString), None)
}
- stageIdToTime.getOrElseUpdate(sid, 0L)
- val time = metrics.map(_.executorRunTime).getOrElse(0L)
- stageIdToTime(sid) += time
- stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+ val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
+ stageData.executorRunTime += taskRunTime
val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
- stageIdToInputBytes(sid) += inputBytes
+ stageData.inputBytes += inputBytes
- stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
- stageIdToShuffleRead(sid) += shuffleRead
+ stageData.shuffleReadBytes += shuffleRead
- stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
- stageIdToShuffleWrite(sid) += shuffleWrite
+ stageData.shuffleWriteBytes += shuffleWrite
- stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
- stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
+ stageData.memoryBytesSpilled += memoryBytesSpilled
- stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
- stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
+ stageData.diskBytesSpilled += diskBytesSpilled
- val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
- taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
- stageIdToTaskData(sid) = taskMap
+ stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
}
- }
+ } // end of onTaskEnd
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
@@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
}
-@DeveloperApi
-case class TaskUIData(
- taskInfo: TaskInfo,
- taskMetrics: Option[TaskMetrics] = None,
- errorMessage: Option[String] = None)
-
private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8c3821bd7c..cab26b9e2f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
+import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
@@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
+ val stageDataOption = listener.stageIdToData.get(stageId)
- if (!listener.stageIdToTaskData.contains(stageId)) {
+ if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
val content =
<div>
<h4>Summary Metrics</h4> No tasks have started yet
@@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
}
- val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
+ val stageData = stageDataOption.get
+ val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
- val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
- val hasInput = inputBytes > 0
- val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
- val hasShuffleRead = shuffleReadBytes > 0
- val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
- val hasShuffleWrite = shuffleWriteBytes > 0
- val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
- val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
- val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
-
- var activeTime = 0L
- val now = System.currentTimeMillis
- val tasksActive = listener.stageIdToTasksActive(stageId).values
- tasksActive.foreach(activeTime += _.timeRunning(now))
+ val hasInput = stageData.inputBytes > 0
+ val hasShuffleRead = stageData.shuffleReadBytes > 0
+ val hasShuffleWrite = stageData.shuffleWriteBytes > 0
+ val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
// scalastyle:off
val summary =
@@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<ul class="unstyled">
<li>
<strong>Total task time across all tasks: </strong>
- {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+ {UIUtils.formatDuration(stageData.executorRunTime)}
</li>
{if (hasInput)
<li>
<strong>Input: </strong>
- {Utils.bytesToString(inputBytes)}
+ {Utils.bytesToString(stageData.inputBytes)}
</li>
}
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
- {Utils.bytesToString(shuffleReadBytes)}
+ {Utils.bytesToString(stageData.shuffleReadBytes)}
</li>
}
{if (hasShuffleWrite)
<li>
<strong>Shuffle write: </strong>
- {Utils.bytesToString(shuffleWriteBytes)}
+ {Utils.bytesToString(stageData.shuffleWriteBytes)}
</li>
}
{if (hasBytesSpilled)
<li>
<strong>Shuffle spill (memory): </strong>
- {Utils.bytesToString(memoryBytesSpilled)}
+ {Utils.bytesToString(stageData.memoryBytesSpilled)}
</li>
<li>
<strong>Shuffle spill (disk): </strong>
- {Utils.bytesToString(diskBytesSpilled)}
+ {Utils.bytesToString(stageData.diskBytesSpilled)}
</li>
}
</ul>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index fd8d0b5cdd..5f45c0ced5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,12 +17,11 @@
package org.apache.spark.ui.jobs
-import java.util.Date
-
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.apache.spark.scheduler.{StageInfo, TaskInfo}
+import java.util.Date
+
+import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
@@ -71,14 +70,14 @@ private[ui] class StageTableBase(
</table>
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+ private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] =
{
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress">
<span style="text-align:center; position:absolute; width:100%; left:0;">
- {completed}/{total} {failed}
+ {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
</span>
<div class="bar bar-completed" style={completeWidth}></div>
<div class="bar bar-running" style={startWidth}></div>
@@ -108,13 +107,23 @@ private[ui] class StageTableBase(
<pre class="stage-details collapsed">{s.details}</pre>
}
- listener.stageIdToDescription.get(s.stageId)
- .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
- .getOrElse(<div>{nameLink} {killLink} {details}</div>)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
+ if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
+ val desc = stageDataOption.get.description
+ <div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
+ } else {
+ <div>{killLink} {nameLink} {details}</div>
+ }
}
protected def stageRow(s: StageInfo): Seq[Node] = {
- val poolName = listener.stageIdToPool.get(s.stageId)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ if (stageDataOption.isEmpty) {
+ return <td>{s.stageId}</td><td>No data available for this stage</td>
+ }
+
+ val stageData = stageDataOption.get
val submissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
@@ -124,35 +133,20 @@ private[ui] class StageTableBase(
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val startedTasks =
- listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
- val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
- val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ => ""
- }
- val totalTasks = s.numTasks
- val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
- val inputRead = inputSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
- val shuffleRead = shuffleReadSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L)
- val shuffleWrite = shuffleWriteSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
+
+ val inputRead = stageData.inputBytes
+ val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+ val shuffleRead = stageData.shuffleReadBytes
+ val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
+ val shuffleWrite = stageData.shuffleWriteBytes
+ val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
+
<td>{s.stageId}</td> ++
{if (isFairScheduler) {
<td>
<a href={"%s/stages/pool?poolname=%s"
- .format(UIUtils.prependBaseUri(basePath), poolName.get)}>
- {poolName.get}
+ .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
+ {stageData.schedulingPool}
</a>
</td>
} else {
@@ -162,11 +156,12 @@ private[ui] class StageTableBase(
<td valign="middle">{submissionTime}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="progress-cell">
- {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+ {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks,
+ stageData.numFailedTasks, s.numTasks)}
</td>
- <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
- <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
- <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
+ <td sorttable_customekey={inputRead.toString}>{inputReadWithUnit}</td>
+ <td sorttable_customekey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
+ <td sorttable_customekey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
}
/** Render an HTML row that represents a stage */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
new file mode 100644
index 0000000000..be11a11695
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
+
+import scala.collection.mutable.HashMap
+
+private[jobs] object UIData {
+
+ class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var inputBytes : Long = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+ var memoryBytesSpilled : Long = 0
+ var diskBytesSpilled : Long = 0
+ }
+
+ class StageUIData {
+ var numActiveTasks: Int = _
+ var numCompleteTasks: Int = _
+ var numFailedTasks: Int = _
+
+ var executorRunTime: Long = _
+
+ var inputBytes: Long = _
+ var shuffleReadBytes: Long = _
+ var shuffleWriteBytes: Long = _
+ var memoryBytesSpilled: Long = _
+ var diskBytesSpilled: Long = _
+
+ var schedulingPool: String = ""
+ var description: Option[String] = None
+
+ var taskData = new HashMap[Long, TaskUIData]
+ var executorSummary = new HashMap[String, ExecutorSummary]
+ }
+
+ case class TaskUIData(
+ taskInfo: TaskInfo,
+ taskMetrics: Option[TaskMetrics] = None,
+ errorMessage: Option[String] = None)
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index fa43b66c6c..a855662480 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
}
listener.completedStages.size should be (5)
- listener.completedStages.filter(_.stageId == 50).size should be (1)
- listener.completedStages.filter(_.stageId == 49).size should be (1)
- listener.completedStages.filter(_.stageId == 48).size should be (1)
- listener.completedStages.filter(_.stageId == 47).size should be (1)
- listener.completedStages.filter(_.stageId == 46).size should be (1)
+ listener.completedStages.count(_.stageId == 50) should be (1)
+ listener.completedStages.count(_.stageId == 49) should be (1)
+ listener.completedStages.count(_.stageId == 48) should be (1)
+ listener.completedStages.count(_.stageId == 47) should be (1)
+ listener.completedStages.count(_.stageId == 46) should be (1)
}
test("test executor id to summary") {
@@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
-
- // nothing in it
- assert(listener.stageIdToExecutorSummaries.size == 0)
+ assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
@@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
@@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.size == 1)
+ assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 2000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
+ .shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
@@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
TaskKilled,
ExecutorLostFailure,
UnknownReason)
+ var failCount = 0
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ failCount += 1
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
}