aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorwangda.tan <wheeleast@gmail.com>2013-12-09 00:02:59 +0800
committerwangda.tan <wheeleast@gmail.com>2013-12-09 00:02:59 +0800
commit48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae (patch)
tree818eaa14b7ea1272d84659ed4ec4346a49eac65b /core
parent740922f25d5f81617fbe02c7bcd1610d6426bbef (diff)
downloadspark-48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae.tar.gz
spark-48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae.tar.bz2
spark-48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae.zip
SPARK-968, In stage UI, add an overview section that shows task stats grouped by executor id
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala89
5 files changed, 234 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
new file mode 100644
index 0000000000..f2ee12081c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+private[spark] class ExecutorSummary() {
+ var duration : Long = 0
+ var totalTasks : Int = 0
+ var failedTasks : Int = 0
+ var succeedTasks : Int = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
new file mode 100644
index 0000000000..c6823cd823
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+
+import scala.xml.Node
+
+import org.apache.spark.scheduler.SchedulingMode
+
+
+/** Page showing executor summary */
+private[spark] class ExecutorTable(val parent: JobProgressUI) {
+
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+ def toNodeSeq(): Seq[Node] = {
+ listener.synchronized {
+ executorTable()
+ }
+ }
+
+ /** Special table which merges two header cells. */
+ private def executorTable[T](): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Executor ID</th>
+ <th>Duration</th>
+ <th>#Tasks</th>
+ <th>#Failed Tasks</th>
+ <th>#Succeed Tasks</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
+ </thead>
+ <tbody>
+ {createExecutorTable()}
+ </tbody>
+ </table>
+ }
+
+ private def createExecutorTable() : Seq[Node] = {
+ val executorIdToSummary = listener.executorIdToSummary
+ executorIdToSummary.toSeq.sortBy(_._1).map{
+ case (k,v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{v.duration} ms</td>
+ <td>{v.totalTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeedTasks}</td>
+ <td>{v.shuffleRead}</td>
+ <td>{v.shuffleWrite}</td>
+ </tr>
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index ca5a28625b..653a84b60f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -45,6 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+ val executorTable = new ExecutorTable(parent)
val pools = listener.sc.getAllPools
val poolTable = new PoolTable(pools, listener)
@@ -57,6 +58,10 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
<li>
+ <a href="#executors"><strong>Executor Summary:</strong></a>
+ {listener.executorIdToSummary.size}
+ </li>
+ <li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
@@ -77,6 +82,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
} else {
Seq()
}} ++
+ <h4 id="executor">Executor Summary</h4> ++
+ executorTable.toNodeSeq++
<h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq++
<h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 6b854740d6..2635478592 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ val executorIdToSummary = HashMap[String, ExecutorSummary]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -114,6 +115,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageIdToTaskInfos(sid) = taskList
+ val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId,
+ op = new ExecutorSummary())
+ executorSummary.totalTasks += 1
}
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult)
@@ -123,9 +127,43 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ // update executor summary
+ val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId)
+ executorSummary match {
+ case Some(x) => {
+ // first update failed-task, succeed-task
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ x.failedTasks += 1
+ case _ =>
+ x.succeedTasks += 1
+ }
+
+ // update duration
+ x.duration += taskEnd.taskInfo.duration
+
+ // update shuffle read/write
+ val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
+ shuffleRead match {
+ case Some(s) =>
+ x.shuffleRead += s.remoteBytesRead
+ case _ => {}
+ }
+ val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
+ shuffleWrite match {
+ case Some(s) => {
+ x.shuffleWrite += s.shuffleBytesWritten
+ }
+ case _ => {}
+ }
+ }
+ case _ => {}
+ }
+
val sid = taskEnd.task.stageId
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
+
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
new file mode 100644
index 0000000000..90a58978c7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.scalatest.FunSuite
+import org.apache.spark.scheduler._
+import org.apache.spark.SparkContext
+import org.apache.spark.Success
+import org.apache.spark.scheduler.SparkListenerTaskStart
+import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+
+class JobProgressListenerSuite extends FunSuite {
+ test("test executor id to summary") {
+ val sc = new SparkContext("local", "joblogger")
+ val listener = new JobProgressListener(sc)
+ val taskMetrics = new TaskMetrics()
+ val shuffleReadMetrics = new ShuffleReadMetrics()
+
+ // nothing in it
+ assert(listener.executorIdToSummary.size == 0)
+
+ // launched a task, should get an item in map
+ listener.onTaskStart(new SparkListenerTaskStart(
+ new ShuffleMapTask(0, null, null, 0, null),
+ new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
+ assert(listener.executorIdToSummary.size == 1)
+
+ // finish this task, should get updated shuffleRead
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000)
+
+ // finish a task with unknown executor-id, nothing should happen
+ taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.executorIdToSummary.size == 1)
+
+ // launched a task
+ listener.onTaskStart(new SparkListenerTaskStart(
+ new ShuffleMapTask(0, null, null, 0, null),
+ new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)))
+ assert(listener.executorIdToSummary.size == 1)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000)
+
+ // launched a task in another exec
+ listener.onTaskStart(new SparkListenerTaskStart(
+ new ShuffleMapTask(0, null, null, 0, null),
+ new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)))
+ assert(listener.executorIdToSummary.size == 2)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000)
+ }
+}