diff options
author | Reynold Xin <rxin@apache.org> | 2013-12-23 10:38:20 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-12-23 10:38:20 -0800 |
commit | 11107c9de524a7397ec99333f5d7b00886217781 (patch) | |
tree | cec009c2aa6c13dc753efda3100c620ab8037dc4 /core/src/main/scala/org | |
parent | 0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff) | |
parent | 2f689ba97b437092bf52063cface12aa9ee09bf3 (diff) | |
download | spark-11107c9de524a7397ec99333f5d7b00886217781.tar.gz spark-11107c9de524a7397ec99333f5d7b00886217781.tar.bz2 spark-11107c9de524a7397ec99333f5d7b00886217781.zip |
Merge pull request #244 from leftnoteasy/master
Added SPARK-968 implementation for review
Added SPARK-968 implementation for review
Diffstat (limited to 'core/src/main/scala/org')
6 files changed, 184 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index e596690bc3..a31a7e1d58 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks") + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read", + "Shuffle Write") def execRow(kv: Seq[String]) = { <tr> @@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { <td>{kv(7)}</td> <td>{kv(8)}</td> <td>{kv(9)}</td> + <td>{Utils.msDurationToString(kv(10).toLong)}</td> + <td>{Utils.bytesToString(kv(11).toLong)}</td> + <td>{Utils.bytesToString(kv(12).toLong)}</td> </tr> } @@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks + val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) + val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) Seq( execId, @@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks.toString, failedTasks.toString, completedTasks.toString, - totalTasks.toString + totalTasks.toString, + totalDuration.toString, + totalShuffleRead.toString, + totalShuffleWrite.toString ) } @@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId @@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) + val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration + executorToDuration.put(eid, newDuration) + activeTasks -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { @@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } + + // update shuffle read/write + if (null != taskEnd.taskMetrics) { + taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead => + executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) + + shuffleRead.remoteBytesRead)) + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite => + executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) + + shuffleWrite.shuffleBytesWritten)) + } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala new file mode 100644 index 0000000000..3c53e88380 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +/** class for reporting aggregated metrics for each executors in stageUI */ +private[spark] class ExecutorSummary { + var taskTime : Long = 0 + var failedTasks : Int = 0 + var succeededTasks : Int = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala new file mode 100644 index 0000000000..0dd876480a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import scala.xml.Node + +import org.apache.spark.scheduler.SchedulingMode +import org.apache.spark.util.Utils +import scala.collection.mutable + +/** Page showing executor summary */ +private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { + + val listener = parent.listener + val dateFmt = parent.dateFmt + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + + def toNodeSeq(): Seq[Node] = { + listener.synchronized { + executorTable() + } + } + + /** Special table which merges two header cells. */ + private def executorTable[T](): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Executor ID</th> + <th>Address</th> + <th>Task Time</th> + <th>Total Tasks</th> + <th>Failed Tasks</th> + <th>Succeeded Tasks</th> + <th>Shuffle Read</th> + <th>Shuffle Write</th> + </thead> + <tbody> + {createExecutorTable()} + </tbody> + </table> + } + + private def createExecutorTable() : Seq[Node] = { + // make a executor-id -> address map + val executorIdToAddress = mutable.HashMap[String, String]() + val storageStatusList = parent.sc.getExecutorStorageStatus + for (statusId <- 0 until storageStatusList.size) { + val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId + val address = blockManagerId.hostPort + val executorId = blockManagerId.executorId + executorIdToAddress.put(executorId, address) + } + + val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) + executorIdToSummary match { + case Some(x) => { + x.toSeq.sortBy(_._1).map{ + case (k,v) => { + <tr> + <td>{k}</td> + <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td> + <td>{parent.formatDuration(v.taskTime)}</td> + <td>{v.failedTasks + v.succeededTasks}</td> + <td>{v.failedTasks}</td> + <td>{v.succeededTasks}</td> + <td>{Utils.bytesToString(v.shuffleRead)}</td> + <td>{Utils.bytesToString(v.shuffleWrite)}</td> + </tr> + } + } + } + case _ => { Seq[Node]() } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6b854740d6..07a42f0503 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val sid = taskEnd.task.stageId + + // create executor summary map if necessary + val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, + op = new HashMap[String, ExecutorSummary]()) + executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId, + op = new ExecutorSummary()) + + val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId) + executorSummary match { + case Some(y) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case Success => + y.succeededTasks += 1 + case _ => + y.failedTasks += 1 + } + + // update duration + y.taskTime += taskEnd.taskInfo.duration + + taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => + y.shuffleRead += shuffleRead.remoteBytesRead + } + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite => + y.shuffleWrite += shuffleWrite.shuffleBytesWritten + } + } + case _ => {} + } + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 996e1b4d1a..8dcfeacb60 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) { <div> <ul class="unstyled"> <li> - <strong>Total duration across all tasks: </strong> + <strong>Total task time across all tasks: </strong> {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} </li> {if (hasShuffleRead) @@ -166,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) { def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } - + val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ + <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++ <h4>Tasks</h4> ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9ad6de3c6d..463d85dfd5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr {if (isFairScheduler) {<th>Pool Name</th>} else {}} <th>Description</th> <th>Submitted</th> - <th>Duration</th> + <th>Task Time</th> <th>Tasks: Succeeded/Total</th> <th>Shuffle Read</th> <th>Shuffle Write</th> |