aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala112
1 files changed, 66 insertions, 46 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 676f445751..678571fd4f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -17,14 +17,13 @@
package org.apache.spark.ui.exec
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{LinkedHashMap, ListBuffer}
import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
import org.apache.spark.ui.{SparkUI, SparkUITab}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
@@ -38,6 +37,25 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
}
}
+private[ui] case class ExecutorTaskSummary(
+ var executorId: String,
+ var totalCores: Int = 0,
+ var tasksMax: Int = 0,
+ var tasksActive: Int = 0,
+ var tasksFailed: Int = 0,
+ var tasksComplete: Int = 0,
+ var duration: Long = 0L,
+ var jvmGCTime: Long = 0L,
+ var inputBytes: Long = 0L,
+ var inputRecords: Long = 0L,
+ var outputBytes: Long = 0L,
+ var outputRecords: Long = 0L,
+ var shuffleRead: Long = 0L,
+ var shuffleWrite: Long = 0L,
+ var executorLogs: Map[String, String] = Map.empty,
+ var isAlive: Boolean = true
+)
+
/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
@@ -45,21 +63,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
extends SparkListener {
- val executorToTotalCores = HashMap[String, Int]()
- val executorToTasksMax = HashMap[String, Int]()
- val executorToTasksActive = HashMap[String, Int]()
- val executorToTasksComplete = HashMap[String, Int]()
- val executorToTasksFailed = HashMap[String, Int]()
- val executorToDuration = HashMap[String, Long]()
- val executorToJvmGCTime = HashMap[String, Long]()
- val executorToInputBytes = HashMap[String, Long]()
- val executorToInputRecords = HashMap[String, Long]()
- val executorToOutputBytes = HashMap[String, Long]()
- val executorToOutputRecords = HashMap[String, Long]()
- val executorToShuffleRead = HashMap[String, Long]()
- val executorToShuffleWrite = HashMap[String, Long]()
- val executorToLogUrls = HashMap[String, Map[String, String]]()
- val executorIdToData = HashMap[String, ExecutorUIData]()
+ var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
+ var executorEvents = new ListBuffer[SparkListenerEvent]()
+
+ private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000)
+ private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
@@ -67,18 +75,29 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
- executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
- executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
- executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
- executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
+ taskSummary.totalCores = executorAdded.executorInfo.totalCores
+ taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
+ executorEvents += executorAdded
+ if (executorEvents.size > maxTimelineExecutors) {
+ executorEvents.remove(0)
+ }
+
+ val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
+ if (deadExecutors.size > retainedDeadExecutors) {
+ val head = deadExecutors.head
+ executorToTaskSummary.remove(head._1)
+ }
}
override def onExecutorRemoved(
executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
- val eid = executorRemoved.executorId
- val uiData = executorIdToData(eid)
- uiData.finishTime = Some(executorRemoved.time)
- uiData.finishReason = Some(executorRemoved.reason)
+ executorEvents += executorRemoved
+ if (executorEvents.size > maxTimelineExecutors) {
+ executorEvents.remove(0)
+ }
+ executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
@@ -87,19 +106,25 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
- storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
+ storageStatus.foreach { s =>
+ val eid = s.blockManagerId.executorId
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.executorLogs = logs.toMap
+ }
}
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.tasksActive += 1
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskEnd.reason match {
case Resubmitted =>
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
@@ -108,31 +133,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
// metrics added by each attempt, but this is much more complicated.
return
case e: ExceptionFailure =>
- executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ taskSummary.tasksFailed += 1
case _ =>
- executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ taskSummary.tasksComplete += 1
}
-
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
- executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+ if (taskSummary.tasksActive >= 1) {
+ taskSummary.tasksActive -= 1
+ }
+ taskSummary.duration += info.duration
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
- executorToInputBytes(eid) =
- executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead
- executorToInputRecords(eid) =
- executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead
- executorToOutputBytes(eid) =
- executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten
- executorToOutputRecords(eid) =
- executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten
-
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten
- executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
+ taskSummary.inputBytes += metrics.inputMetrics.bytesRead
+ taskSummary.inputRecords += metrics.inputMetrics.recordsRead
+ taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
+ taskSummary.outputRecords += metrics.outputMetrics.recordsWritten
+
+ taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
+ taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
+ taskSummary.jvmGCTime += metrics.jvmGCTime
}
}
}