aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-23 10:38:20 -0800
committerReynold Xin <rxin@apache.org>2013-12-23 10:38:20 -0800
commit11107c9de524a7397ec99333f5d7b00886217781 (patch)
treecec009c2aa6c13dc753efda3100c620ab8037dc4 /core/src/main/scala/org/apache
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
parent2f689ba97b437092bf52063cface12aa9ee09bf3 (diff)
downloadspark-11107c9de524a7397ec99333f5d7b00886217781.tar.gz
spark-11107c9de524a7397ec99333f5d7b00886217781.tar.bz2
spark-11107c9de524a7397ec99333f5d7b00886217781.zip
Merge pull request #244 from leftnoteasy/master
Added SPARK-968 implementation for review Added SPARK-968 implementation for review
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
6 files changed, 184 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index e596690bc3..a31a7e1d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
+ "Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
+ <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+ <td>{Utils.bytesToString(kv(11).toLong)}</td>
+ <td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
- totalTasks.toString
+ totalTasks.toString,
+ totalDuration.toString,
+ totalShuffleRead.toString,
+ totalShuffleWrite.toString
)
}
@@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+ executorToDuration.put(eid, newDuration)
+
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
+
+ // update shuffle read/write
+ if (null != taskEnd.taskMetrics) {
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+ executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+ shuffleRead.remoteBytesRead))
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+ executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+ shuffleWrite.shuffleBytesWritten))
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
new file mode 100644
index 0000000000..3c53e88380
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+/** class for reporting aggregated metrics for each executors in stageUI */
+private[spark] class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
new file mode 100644
index 0000000000..0dd876480a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import scala.xml.Node
+
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.util.Utils
+import scala.collection.mutable
+
+/** Page showing executor summary */
+private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
+
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+ def toNodeSeq(): Seq[Node] = {
+ listener.synchronized {
+ executorTable()
+ }
+ }
+
+ /** Special table which merges two header cells. */
+ private def executorTable[T](): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Executor ID</th>
+ <th>Address</th>
+ <th>Task Time</th>
+ <th>Total Tasks</th>
+ <th>Failed Tasks</th>
+ <th>Succeeded Tasks</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
+ </thead>
+ <tbody>
+ {createExecutorTable()}
+ </tbody>
+ </table>
+ }
+
+ private def createExecutorTable() : Seq[Node] = {
+ // make a executor-id -> address map
+ val executorIdToAddress = mutable.HashMap[String, String]()
+ val storageStatusList = parent.sc.getExecutorStorageStatus
+ for (statusId <- 0 until storageStatusList.size) {
+ val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
+ val address = blockManagerId.hostPort
+ val executorId = blockManagerId.executorId
+ executorIdToAddress.put(executorId, address)
+ }
+
+ val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
+ executorIdToSummary match {
+ case Some(x) => {
+ x.toSeq.sortBy(_._1).map{
+ case (k,v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
+ <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ <td>{Utils.bytesToString(v.shuffleRead)}</td>
+ <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ </tr>
+ }
+ }
+ }
+ case _ => { Seq[Node]() }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 6b854740d6..07a42f0503 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
+
+ // create executor summary map if necessary
+ val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
+ op = new HashMap[String, ExecutorSummary]())
+ executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
+ op = new ExecutorSummary())
+
+ val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
+ executorSummary match {
+ case Some(y) => {
+ // first update failed-task, succeed-task
+ taskEnd.reason match {
+ case Success =>
+ y.succeededTasks += 1
+ case _ =>
+ y.failedTasks += 1
+ }
+
+ // update duration
+ y.taskTime += taskEnd.taskInfo.duration
+
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
+ y.shuffleRead += shuffleRead.remoteBytesRead
+ }
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ y.shuffleWrite += shuffleWrite.shuffleBytesWritten
+ }
+ }
+ case _ => {}
+ }
+
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
+
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 996e1b4d1a..8dcfeacb60 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
- <strong>Total duration across all tasks: </strong>
+ <strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@@ -166,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
-
+ val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+ <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3c6d..463d85dfd5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
- <th>Duration</th>
+ <th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>