aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala48
1 files changed, 20 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index d012ba4dbb..73861ae674 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,17 +20,13 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.xml.Node
-import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.util.Utils
/** Page showing executor summary */
-private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
+ private lazy val listener = parent.listener
- val listener = parent.listener
- val dateFmt = parent.dateFmt
- val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
-
- def toNodeSeq(): Seq[Node] = {
+ def toNodeSeq: Seq[Node] = {
listener.synchronized {
executorTable()
}
@@ -58,11 +54,9 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
}
private def createExecutorTable() : Seq[Node] = {
- // make a executor-id -> address map
+ // Make an executor-id -> address map
val executorIdToAddress = mutable.HashMap[String, String]()
- val storageStatusList = parent.sc.getExecutorStorageStatus
- for (statusId <- 0 until storageStatusList.size) {
- val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
+ listener.blockManagerIds.foreach { blockManagerId =>
val address = blockManagerId.hostPort
val executorId = blockManagerId.executorId
executorIdToAddress.put(executorId, address)
@@ -70,25 +64,23 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
- case Some(x) => {
- x.toSeq.sortBy(_._1).map{
- case (k,v) => {
- <tr>
- <td>{k}</td>
- <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
- <td>{parent.formatDuration(v.taskTime)}</td>
- <td>{v.failedTasks + v.succeededTasks}</td>
- <td>{v.failedTasks}</td>
- <td>{v.succeededTasks}</td>
- <td>{Utils.bytesToString(v.shuffleRead)}</td>
- <td>{Utils.bytesToString(v.shuffleWrite)}</td>
- <td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
- <td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
- </tr>
- }
+ case Some(x) =>
+ x.toSeq.sortBy(_._1).map { case (k, v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
+ <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ <td>{Utils.bytesToString(v.shuffleRead)}</td>
+ <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ <td>{Utils.bytesToString(v.memoryBytesSpilled)}</td>
+ <td>{Utils.bytesToString(v.diskBytesSpilled)}</td>
+ </tr>
}
}
- case _ => { Seq[Node]() }
+ case _ => Seq[Node]()
}
}
}