diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
5 files changed, 95 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index fe372116f1..3adf5b1109 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -55,11 +55,13 @@ class ExecutorSummary private[spark]( val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, + val maxTasks: Int, val activeTasks: Int, val failedTasks: Int, val completedTasks: Int, val totalTasks: Int, val totalDuration: Long, + val totalGCTime: Long, val totalInputBytes: Long, val totalShuffleRead: Long, val totalShuffleWrite: Long, diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index eb53aa8e23..cf45414c4f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -195,7 +195,7 @@ private[spark] object SparkUI { val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener - val executorsListener = new ExecutorsListener(storageStatusListener) + val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index cb122eaed8..2d2d80be4a 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -87,4 +87,7 @@ private[spark] object ToolTips { multiple operations (e.g. two map() functions) if they can be pipelined. Some operations also create multiple RDDs internally. Cached RDDs are shown in green. """ + + val TASK_TIME = + "Shaded red when garbage collection (GC) time is over 10% of task time" } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 440dfa2679..e36b96b3e6 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -50,6 +50,8 @@ private[ui] class ExecutorsPage( threadDumpEnabled: Boolean) extends WebUIPage("") { private val listener = parent.listener + // When GCTimePercent is edited change ToolTips.TASK_TIME to match + private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { val (storageStatusList, execInfo) = listener.synchronized { @@ -77,7 +79,7 @@ private[ui] class ExecutorsPage( <th>Failed Tasks</th> <th>Complete Tasks</th> <th>Total Tasks</th> - <th>Task Time</th> + <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th> @@ -129,13 +131,8 @@ private[ui] class ExecutorsPage( <td sorttable_customkey={diskUsed.toString}> {Utils.bytesToString(diskUsed)} </td> - <td>{info.activeTasks}</td> - <td>{info.failedTasks}</td> - <td>{info.completedTasks}</td> - <td>{info.totalTasks}</td> - <td sorttable_customkey={info.totalDuration.toString}> - {Utils.msDurationToString(info.totalDuration)} - </td> + {taskData(info.maxTasks, info.activeTasks, info.failedTasks, info.completedTasks, + info.totalTasks, info.totalDuration, info.totalGCTime)} <td sorttable_customkey={info.totalInputBytes.toString}> {Utils.bytesToString(info.totalInputBytes)} </td> @@ -177,7 +174,6 @@ private[ui] class ExecutorsPage( val maximumMemory = execInfo.map(_.maxMemory).sum val memoryUsed = execInfo.map(_.memoryUsed).sum val diskUsed = execInfo.map(_.diskUsed).sum - val totalDuration = execInfo.map(_.totalDuration).sum val totalInputBytes = execInfo.map(_.totalInputBytes).sum val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum @@ -192,13 +188,13 @@ private[ui] class ExecutorsPage( <td sorttable_customkey={diskUsed.toString}> {Utils.bytesToString(diskUsed)} </td> - <td>{execInfo.map(_.activeTasks).sum}</td> - <td>{execInfo.map(_.failedTasks).sum}</td> - <td>{execInfo.map(_.completedTasks).sum}</td> - <td>{execInfo.map(_.totalTasks).sum}</td> - <td sorttable_customkey={totalDuration.toString}> - {Utils.msDurationToString(totalDuration)} - </td> + {taskData(execInfo.map(_.maxTasks).sum, + execInfo.map(_.activeTasks).sum, + execInfo.map(_.failedTasks).sum, + execInfo.map(_.completedTasks).sum, + execInfo.map(_.totalTasks).sum, + execInfo.map(_.totalDuration).sum, + execInfo.map(_.totalGCTime).sum)} <td sorttable_customkey={totalInputBytes.toString}> {Utils.bytesToString(totalInputBytes)} </td> @@ -219,7 +215,7 @@ private[ui] class ExecutorsPage( <th>Failed Tasks</th> <th>Complete Tasks</th> <th>Total Tasks</th> - <th>Task Time</th> + <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th> @@ -233,6 +229,70 @@ private[ui] class ExecutorsPage( </tbody> </table> } + + private def taskData( + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): + Seq[Node] = { + // Determine Color Opacity from 0.5-1 + // activeTasks range from 0 to maxTasks + val activeTasksAlpha = + if (maxTasks > 0) { + (activeTasks.toDouble / maxTasks) * 0.5 + 0.5 + } else { + 1 + } + // failedTasks range max at 10% failure, alpha max = 1 + val failedTasksAlpha = + if (totalTasks > 0) { + math.min(10 * failedTasks.toDouble / totalTasks, 1) * 0.5 + 0.5 + } else { + 1 + } + // totalDuration range from 0 to 50% GC time, alpha max = 1 + val totalDurationAlpha = + if (totalDuration > 0) { + math.min(totalGCTime.toDouble / totalDuration + 0.5, 1) + } else { + 1 + } + + val tableData = + <td style={ + if (activeTasks > 0) { + "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white" + } else { + "" + } + }>{activeTasks}</td> + <td style={ + if (failedTasks > 0) { + "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white" + } else { + "" + } + }>{failedTasks}</td> + <td>{completedTasks}</td> + <td>{totalTasks}</td> + <td sorttable_customkey={totalDuration.toString} style={ + // Red if GC time over GCTimePercent of total time + if (totalGCTime > GCTimePercent * totalDuration) { + "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white" + } else { + "" + } + }> + {Utils.msDurationToString(totalDuration)} + ({Utils.msDurationToString(totalGCTime)}) + </td>; + + tableData + } } private[spark] object ExecutorsPage { @@ -245,11 +305,13 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed + val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0) val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) + val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L) val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) @@ -261,11 +323,13 @@ private[spark] object ExecutorsPage { rddBlocks, memUsed, diskUsed, + maxTasks, activeTasks, failedTasks, completedTasks, totalTasks, totalDuration, + totalGCTime, totalInputBytes, totalShuffleRead, totalShuffleWrite, diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 160d7a4dff..a9e926b158 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap -import org.apache.spark.{ExceptionFailure, Resubmitted, SparkContext} +import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -43,11 +43,14 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { +class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) + extends SparkListener { + val executorToTasksMax = HashMap[String, Int]() val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() + val executorToJvmGCTime = HashMap[String, Long]() val executorToInputBytes = HashMap[String, Long]() val executorToInputRecords = HashMap[String, Long]() val executorToOutputBytes = HashMap[String, Long]() @@ -62,6 +65,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + executorToTasksMax(eid) = + executorAdded.executorInfo.totalCores / conf.getInt("spark.task.cpus", 1) executorIdToData(eid) = ExecutorUIData(executorAdded.time) } @@ -131,6 +136,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp executorToShuffleWrite(eid) = executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten } + executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime } } } |