aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala54
1 files changed, 41 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 42e9be6e19..a31a7e1d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
+ "Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@@ -73,10 +74,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
+ <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+ <td>{Utils.bytesToString(kv(11).toLong)}</td>
+ <td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
- val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
+ val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
@@ -99,17 +103,21 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
}
- def getExecInfo(a: Int): Seq[String] = {
- val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
- val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
- val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
- val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
- val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
- val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
- val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
- val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
+ def getExecInfo(statusId: Int): Seq[String] = {
+ val status = sc.getExecutorStorageStatus(statusId)
+ val execId = status.blockManagerId.executorId
+ val hostPort = status.blockManagerId.hostPort
+ val rddBlocks = status.blocks.size.toString
+ val memUsed = status.memUsed().toString
+ val maxMem = status.maxMem.toString
+ val diskUsed = status.diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
+ val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+ val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@@ -121,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
- totalTasks.toString
+ totalTasks.toString,
+ totalDuration.toString,
+ totalShuffleRead.toString,
+ totalShuffleWrite.toString
)
}
@@ -129,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@@ -139,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+ executorToDuration.put(eid, newDuration)
+
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@@ -149,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
+
+ // update shuffle read/write
+ if (null != taskEnd.taskMetrics) {
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+ executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+ shuffleRead.remoteBytesRead))
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+ executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+ shuffleWrite.shuffleBytesWritten))
+ }
}
}
}