diff options
Diffstat (limited to 'core')
5 files changed, 234 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala new file mode 100644 index 0000000000..f2ee12081c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +private[spark] class ExecutorSummary() { + var duration : Long = 0 + var totalTasks : Int = 0 + var failedTasks : Int = 0 + var succeedTasks : Int = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala new file mode 100644 index 0000000000..c6823cd823 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + + +import scala.xml.Node + +import org.apache.spark.scheduler.SchedulingMode + + +/** Page showing executor summary */ +private[spark] class ExecutorTable(val parent: JobProgressUI) { + + val listener = parent.listener + val dateFmt = parent.dateFmt + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + + def toNodeSeq(): Seq[Node] = { + listener.synchronized { + executorTable() + } + } + + /** Special table which merges two header cells. */ + private def executorTable[T](): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Executor ID</th> + <th>Duration</th> + <th>#Tasks</th> + <th>#Failed Tasks</th> + <th>#Succeed Tasks</th> + <th>Shuffle Read</th> + <th>Shuffle Write</th> + </thead> + <tbody> + {createExecutorTable()} + </tbody> + </table> + } + + private def createExecutorTable() : Seq[Node] = { + val executorIdToSummary = listener.executorIdToSummary + executorIdToSummary.toSeq.sortBy(_._1).map{ + case (k,v) => { + <tr> + <td>{k}</td> + <td>{v.duration} ms</td> + <td>{v.totalTasks}</td> + <td>{v.failedTasks}</td> + <td>{v.succeedTasks}</td> + <td>{v.shuffleRead}</td> + <td>{v.shuffleWrite}</td> + </tr> + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index ca5a28625b..653a84b60f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -45,6 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + val executorTable = new ExecutorTable(parent) val pools = listener.sc.getAllPools val poolTable = new PoolTable(pools, listener) @@ -57,6 +58,10 @@ private[spark] class IndexPage(parent: JobProgressUI) { </li> <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> <li> + <a href="#executors"><strong>Executor Summary:</strong></a> + {listener.executorIdToSummary.size} + </li> + <li> <a href="#active"><strong>Active Stages:</strong></a> {activeStages.size} </li> @@ -77,6 +82,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { } else { Seq() }} ++ + <h4 id="executor">Executor Summary</h4> ++ + executorTable.toNodeSeq++ <h4 id="active">Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq++ <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6b854740d6..2635478592 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + val executorIdToSummary = HashMap[String, ExecutorSummary]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -114,6 +115,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageIdToTaskInfos(sid) = taskList + val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId, + op = new ExecutorSummary()) + executorSummary.totalTasks += 1 } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) @@ -123,9 +127,43 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + // update executor summary + val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId) + executorSummary match { + case Some(x) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case e: ExceptionFailure => + x.failedTasks += 1 + case _ => + x.succeedTasks += 1 + } + + // update duration + x.duration += taskEnd.taskInfo.duration + + // update shuffle read/write + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + x.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + x.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} + } + } + case _ => {} + } + val sid = taskEnd.task.stageId val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala new file mode 100644 index 0000000000..90a58978c7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.scalatest.FunSuite +import org.apache.spark.scheduler._ +import org.apache.spark.SparkContext +import org.apache.spark.Success +import org.apache.spark.scheduler.SparkListenerTaskStart +import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} + +class JobProgressListenerSuite extends FunSuite { + test("test executor id to summary") { + val sc = new SparkContext("local", "joblogger") + val listener = new JobProgressListener(sc) + val taskMetrics = new TaskMetrics() + val shuffleReadMetrics = new ShuffleReadMetrics() + + // nothing in it + assert(listener.executorIdToSummary.size == 0) + + // launched a task, should get an item in map + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 1) + + // finish this task, should get updated shuffleRead + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000) + + // finish a task with unknown executor-id, nothing should happen + taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.size == 1) + + // launched a task + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 1) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000) + + // launched a task in another exec + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 2) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000) + } +} |