aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala98
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala10
-rw-r--r--core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json2
-rw-r--r--project/MimaExcludes.scala6
7 files changed, 103 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index fe372116f1..3adf5b1109 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -55,11 +55,13 @@ class ExecutorSummary private[spark](
val rddBlocks: Int,
val memoryUsed: Long,
val diskUsed: Long,
+ val maxTasks: Int,
val activeTasks: Int,
val failedTasks: Int,
val completedTasks: Int,
val totalTasks: Int,
val totalDuration: Long,
+ val totalGCTime: Long,
val totalInputBytes: Long,
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index eb53aa8e23..cf45414c4f 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -195,7 +195,7 @@ private[spark] object SparkUI {
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
- val executorsListener = new ExecutorsListener(storageStatusListener)
+ val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index cb122eaed8..2d2d80be4a 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -87,4 +87,7 @@ private[spark] object ToolTips {
multiple operations (e.g. two map() functions) if they can be pipelined. Some operations
also create multiple RDDs internally. Cached RDDs are shown in green.
"""
+
+ val TASK_TIME =
+ "Shaded red when garbage collection (GC) time is over 10% of task time"
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 440dfa2679..e36b96b3e6 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -50,6 +50,8 @@ private[ui] class ExecutorsPage(
threadDumpEnabled: Boolean)
extends WebUIPage("") {
private val listener = parent.listener
+ // When GCTimePercent is edited change ToolTips.TASK_TIME to match
+ private val GCTimePercent = 0.1
def render(request: HttpServletRequest): Seq[Node] = {
val (storageStatusList, execInfo) = listener.synchronized {
@@ -77,7 +79,7 @@ private[ui] class ExecutorsPage(
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
- <th>Task Time</th>
+ <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
@@ -129,13 +131,8 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={diskUsed.toString}>
{Utils.bytesToString(diskUsed)}
</td>
- <td>{info.activeTasks}</td>
- <td>{info.failedTasks}</td>
- <td>{info.completedTasks}</td>
- <td>{info.totalTasks}</td>
- <td sorttable_customkey={info.totalDuration.toString}>
- {Utils.msDurationToString(info.totalDuration)}
- </td>
+ {taskData(info.maxTasks, info.activeTasks, info.failedTasks, info.completedTasks,
+ info.totalTasks, info.totalDuration, info.totalGCTime)}
<td sorttable_customkey={info.totalInputBytes.toString}>
{Utils.bytesToString(info.totalInputBytes)}
</td>
@@ -177,7 +174,6 @@ private[ui] class ExecutorsPage(
val maximumMemory = execInfo.map(_.maxMemory).sum
val memoryUsed = execInfo.map(_.memoryUsed).sum
val diskUsed = execInfo.map(_.diskUsed).sum
- val totalDuration = execInfo.map(_.totalDuration).sum
val totalInputBytes = execInfo.map(_.totalInputBytes).sum
val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum
val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum
@@ -192,13 +188,13 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={diskUsed.toString}>
{Utils.bytesToString(diskUsed)}
</td>
- <td>{execInfo.map(_.activeTasks).sum}</td>
- <td>{execInfo.map(_.failedTasks).sum}</td>
- <td>{execInfo.map(_.completedTasks).sum}</td>
- <td>{execInfo.map(_.totalTasks).sum}</td>
- <td sorttable_customkey={totalDuration.toString}>
- {Utils.msDurationToString(totalDuration)}
- </td>
+ {taskData(execInfo.map(_.maxTasks).sum,
+ execInfo.map(_.activeTasks).sum,
+ execInfo.map(_.failedTasks).sum,
+ execInfo.map(_.completedTasks).sum,
+ execInfo.map(_.totalTasks).sum,
+ execInfo.map(_.totalDuration).sum,
+ execInfo.map(_.totalGCTime).sum)}
<td sorttable_customkey={totalInputBytes.toString}>
{Utils.bytesToString(totalInputBytes)}
</td>
@@ -219,7 +215,7 @@ private[ui] class ExecutorsPage(
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>
- <th>Task Time</th>
+ <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
<th>
@@ -233,6 +229,70 @@ private[ui] class ExecutorsPage(
</tbody>
</table>
}
+
+ private def taskData(
+ maxTasks: Int,
+ activeTasks: Int,
+ failedTasks: Int,
+ completedTasks: Int,
+ totalTasks: Int,
+ totalDuration: Long,
+ totalGCTime: Long):
+ Seq[Node] = {
+ // Determine Color Opacity from 0.5-1
+ // activeTasks range from 0 to maxTasks
+ val activeTasksAlpha =
+ if (maxTasks > 0) {
+ (activeTasks.toDouble / maxTasks) * 0.5 + 0.5
+ } else {
+ 1
+ }
+ // failedTasks range max at 10% failure, alpha max = 1
+ val failedTasksAlpha =
+ if (totalTasks > 0) {
+ math.min(10 * failedTasks.toDouble / totalTasks, 1) * 0.5 + 0.5
+ } else {
+ 1
+ }
+ // totalDuration range from 0 to 50% GC time, alpha max = 1
+ val totalDurationAlpha =
+ if (totalDuration > 0) {
+ math.min(totalGCTime.toDouble / totalDuration + 0.5, 1)
+ } else {
+ 1
+ }
+
+ val tableData =
+ <td style={
+ if (activeTasks > 0) {
+ "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white"
+ } else {
+ ""
+ }
+ }>{activeTasks}</td>
+ <td style={
+ if (failedTasks > 0) {
+ "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white"
+ } else {
+ ""
+ }
+ }>{failedTasks}</td>
+ <td>{completedTasks}</td>
+ <td>{totalTasks}</td>
+ <td sorttable_customkey={totalDuration.toString} style={
+ // Red if GC time over GCTimePercent of total time
+ if (totalGCTime > GCTimePercent * totalDuration) {
+ "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white"
+ } else {
+ ""
+ }
+ }>
+ {Utils.msDurationToString(totalDuration)}
+ ({Utils.msDurationToString(totalGCTime)})
+ </td>;
+
+ tableData
+ }
}
private[spark] object ExecutorsPage {
@@ -245,11 +305,13 @@ private[spark] object ExecutorsPage {
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
+ val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
+ val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
@@ -261,11 +323,13 @@ private[spark] object ExecutorsPage {
rddBlocks,
memUsed,
diskUsed,
+ maxTasks,
activeTasks,
failedTasks,
completedTasks,
totalTasks,
totalDuration,
+ totalGCTime,
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 160d7a4dff..a9e926b158 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap
-import org.apache.spark.{ExceptionFailure, Resubmitted, SparkContext}
+import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
@@ -43,11 +43,14 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
-class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
+class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
+ extends SparkListener {
+ val executorToTasksMax = HashMap[String, Int]()
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
+ val executorToJvmGCTime = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToInputRecords = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
@@ -62,6 +65,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
+ executorToTasksMax(eid) =
+ executorAdded.executorInfo.totalCores / conf.getInt("spark.task.cpus", 1)
executorIdToData(eid) = ExecutorUIData(executorAdded.time)
}
@@ -131,6 +136,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
+ executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
}
}
}
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index cb622e1472..94f8aeac55 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -4,11 +4,13 @@
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"diskUsed" : 0,
+ "maxTasks" : 0,
"activeTasks" : 0,
"failedTasks" : 1,
"completedTasks" : 31,
"totalTasks" : 32,
"totalDuration" : 8820,
+ "totalGCTime" : 352,
"totalInputBytes" : 28000288,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 13180,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c65fae482c..501456b043 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -127,6 +127,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockManager"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore")
+ ) ++ Seq(
+ // SPARK-12149 Added new fields to ExecutorSummary
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
) ++
// SPARK-12665 Remove deprecated and unused classes
Seq(
@@ -301,6 +304,9 @@ object MimaExcludes {
// SPARK-3580 Add getNumPartitions method to JavaRDD
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.getNumPartitions")
+ ) ++ Seq(
+ // SPARK-12149 Added new fields to ExecutorSummary
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
) ++
// SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a
// private class.