aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorwangda.tan <wheeleast@gmail.com>2013-12-22 21:43:15 +0800
committerwangda.tan <wheeleast@gmail.com>2013-12-22 21:43:15 +0800
commitc979eecdf6a11462595aba9d5b8fc942682cf85d (patch)
tree0f912165ffe99098f189155cb0ab44e3f1e1b13b /core/src/main/scala/org/apache
parent59e53fa21caa202a57093c74ada128fca2be5bac (diff)
downloadspark-c979eecdf6a11462595aba9d5b8fc942682cf85d.tar.gz
spark-c979eecdf6a11462595aba9d5b8fc942682cf85d.tar.bz2
spark-c979eecdf6a11462595aba9d5b8fc942682cf85d.zip
added changes according to comments from rxin
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
7 files changed, 24 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index f62ae37466..a31a7e1d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read",
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
"Shuffle Write")
def execRow(kv: Seq[String]) = {
@@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
// update shuffle read/write
if (null != taskEnd.taskMetrics) {
- val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
- shuffleRead match {
- case Some(s) =>
- val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
- executorToShuffleRead.put(eid, newShuffleRead)
- case _ => {}
- }
- val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
- shuffleWrite match {
- case Some(s) => {
- val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
- executorToShuffleWrite.put(eid, newShuffleWrite)
- }
- case _ => {}
- }
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+ executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+ shuffleRead.remoteBytesRead))
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+ executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+ shuffleWrite.shuffleBytesWritten))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 75c0dd2c7f..3c53e88380 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -17,8 +17,9 @@
package org.apache.spark.ui.jobs
-private[spark] class ExecutorSummary() {
- var duration : Long = 0
+/** class for reporting aggregated metrics for each executors in stageUI */
+private[spark] class ExecutorSummary {
+ var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
var shuffleRead : Long = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 763d5a344b..0e9dd4a8c7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Executor ID</th>
- <th>Duration</th>
+ <th>Task Time</th>
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
@@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
case (k,v) => {
<tr>
<td>{k}</td>
- <td>{parent.formatDuration(v.duration)}</td>
+ <td>{parent.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 854afb665a..ca5a28625b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -57,10 +57,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
<li>
- <a href="#executors"><strong>Executor Summary:</strong></a>
- {listener.stageIdToExecutorSummaries.size}
- </li>
- <li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 64ce715993..07a42f0503 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
// update duration
- y.duration += taskEnd.taskInfo.duration
-
- // update shuffle read/write
- if (null != taskEnd.taskMetrics) {
- val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
- shuffleRead match {
- case Some(s) =>
- y.shuffleRead += s.remoteBytesRead
- case _ => {}
- }
- val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
- shuffleWrite match {
- case Some(s) => {
- y.shuffleWrite += s.shuffleBytesWritten
- }
- case _ => {}
- }
+ y.taskTime += taskEnd.taskInfo.duration
+
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
+ y.shuffleRead += shuffleRead.remoteBytesRead
+ }
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ y.shuffleWrite += shuffleWrite.shuffleBytesWritten
}
}
case _ => {}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c077613b1d..d8a6c9e2dc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
- <strong>Total duration across all tasks: </strong>
+ <strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
- <h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+ <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3c6d..463d85dfd5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
- <th>Duration</th>
+ <th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>