aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcenyuhai <cenyuhai@didichuxing.com>2016-09-15 09:58:53 +0100
committerSean Owen <sowen@cloudera.com>2016-09-15 09:58:53 +0100
commitad79fc0a8407a950a03869f2f8cdc3ed0bf13875 (patch)
treee2236983333729323ea39f2e6d93fa379ab15e69
parent647ee05e5815bde361662a9286ac602c44b4d4e6 (diff)
downloadspark-ad79fc0a8407a950a03869f2f8cdc3ed0bf13875.tar.gz
spark-ad79fc0a8407a950a03869f2f8cdc3ed0bf13875.tar.bz2
spark-ad79fc0a8407a950a03869f2f8cdc3ed0bf13875.zip
[SPARK-17406][WEB UI] limit timeline executor events
## What changes were proposed in this pull request? The job page will be too slow to open when there are thousands of executor events(added or removed). I found that in ExecutorsTab file, executorIdToData will not remove elements, it will increase all the time.Before this pr, it looks like [timeline1.png](https://issues.apache.org/jira/secure/attachment/12827112/timeline1.png). After this pr, it looks like [timeline2.png](https://issues.apache.org/jira/secure/attachment/12827113/timeline2.png)(we can set how many executor events will be displayed) Author: cenyuhai <cenyuhai@didichuxing.com> Closes #14969 from cenyuhai/SPARK-17406.
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala112
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala5
-rw-r--r--project/MimaExcludes.scala12
8 files changed, 162 insertions, 148 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 982e8915a8..7953d77fd7 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -17,14 +17,12 @@
package org.apache.spark.ui.exec
-import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{UIUtils, WebUIPage}
// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
private[ui] case class ExecutorSummaryInfo(
@@ -83,18 +81,7 @@ private[spark] object ExecutorsPage {
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
- val totalCores = listener.executorToTotalCores.getOrElse(execId, 0)
- val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
- val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
- val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
- val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
- val totalTasks = activeTasks + failedTasks + completedTasks
- val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
- val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
- val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
- val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
- val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
- val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
+ val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
new ExecutorSummary(
execId,
@@ -103,19 +90,19 @@ private[spark] object ExecutorsPage {
rddBlocks,
memUsed,
diskUsed,
- totalCores,
- maxTasks,
- activeTasks,
- failedTasks,
- completedTasks,
- totalTasks,
- totalDuration,
- totalGCTime,
- totalInputBytes,
- totalShuffleRead,
- totalShuffleWrite,
+ taskSummary.totalCores,
+ taskSummary.tasksMax,
+ taskSummary.tasksActive,
+ taskSummary.tasksFailed,
+ taskSummary.tasksComplete,
+ taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete,
+ taskSummary.duration,
+ taskSummary.jvmGCTime,
+ taskSummary.inputBytes,
+ taskSummary.shuffleRead,
+ taskSummary.shuffleWrite,
maxMem,
- executorLogs
+ taskSummary.executorLogs
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 676f445751..678571fd4f 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -17,14 +17,13 @@
package org.apache.spark.ui.exec
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{LinkedHashMap, ListBuffer}
import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
import org.apache.spark.ui.{SparkUI, SparkUITab}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
@@ -38,6 +37,25 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
}
}
+private[ui] case class ExecutorTaskSummary(
+ var executorId: String,
+ var totalCores: Int = 0,
+ var tasksMax: Int = 0,
+ var tasksActive: Int = 0,
+ var tasksFailed: Int = 0,
+ var tasksComplete: Int = 0,
+ var duration: Long = 0L,
+ var jvmGCTime: Long = 0L,
+ var inputBytes: Long = 0L,
+ var inputRecords: Long = 0L,
+ var outputBytes: Long = 0L,
+ var outputRecords: Long = 0L,
+ var shuffleRead: Long = 0L,
+ var shuffleWrite: Long = 0L,
+ var executorLogs: Map[String, String] = Map.empty,
+ var isAlive: Boolean = true
+)
+
/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
@@ -45,21 +63,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
extends SparkListener {
- val executorToTotalCores = HashMap[String, Int]()
- val executorToTasksMax = HashMap[String, Int]()
- val executorToTasksActive = HashMap[String, Int]()
- val executorToTasksComplete = HashMap[String, Int]()
- val executorToTasksFailed = HashMap[String, Int]()
- val executorToDuration = HashMap[String, Long]()
- val executorToJvmGCTime = HashMap[String, Long]()
- val executorToInputBytes = HashMap[String, Long]()
- val executorToInputRecords = HashMap[String, Long]()
- val executorToOutputBytes = HashMap[String, Long]()
- val executorToOutputRecords = HashMap[String, Long]()
- val executorToShuffleRead = HashMap[String, Long]()
- val executorToShuffleWrite = HashMap[String, Long]()
- val executorToLogUrls = HashMap[String, Map[String, String]]()
- val executorIdToData = HashMap[String, ExecutorUIData]()
+ var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
+ var executorEvents = new ListBuffer[SparkListenerEvent]()
+
+ private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000)
+ private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
@@ -67,18 +75,29 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
- executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
- executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
- executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
- executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
+ taskSummary.totalCores = executorAdded.executorInfo.totalCores
+ taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
+ executorEvents += executorAdded
+ if (executorEvents.size > maxTimelineExecutors) {
+ executorEvents.remove(0)
+ }
+
+ val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
+ if (deadExecutors.size > retainedDeadExecutors) {
+ val head = deadExecutors.head
+ executorToTaskSummary.remove(head._1)
+ }
}
override def onExecutorRemoved(
executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
- val eid = executorRemoved.executorId
- val uiData = executorIdToData(eid)
- uiData.finishTime = Some(executorRemoved.time)
- uiData.finishReason = Some(executorRemoved.reason)
+ executorEvents += executorRemoved
+ if (executorEvents.size > maxTimelineExecutors) {
+ executorEvents.remove(0)
+ }
+ executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
@@ -87,19 +106,25 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
- storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
+ storageStatus.foreach { s =>
+ val eid = s.blockManagerId.executorId
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.executorLogs = logs.toMap
+ }
}
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+ taskSummary.tasksActive += 1
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
+ val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskEnd.reason match {
case Resubmitted =>
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
@@ -108,31 +133,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
// metrics added by each attempt, but this is much more complicated.
return
case e: ExceptionFailure =>
- executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ taskSummary.tasksFailed += 1
case _ =>
- executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ taskSummary.tasksComplete += 1
}
-
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
- executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+ if (taskSummary.tasksActive >= 1) {
+ taskSummary.tasksActive -= 1
+ }
+ taskSummary.duration += info.duration
// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
- executorToInputBytes(eid) =
- executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead
- executorToInputRecords(eid) =
- executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead
- executorToOutputBytes(eid) =
- executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten
- executorToOutputRecords(eid) =
- executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten
-
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten
- executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
+ taskSummary.inputBytes += metrics.inputMetrics.bytesRead
+ taskSummary.inputRecords += metrics.inputMetrics.recordsRead
+ taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
+ taskSummary.outputRecords += metrics.outputMetrics.recordsWritten
+
+ taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
+ taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
+ taskSummary.jvmGCTime += metrics.jvmGCTime
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index e5363ce8ca..c04964ec66 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -28,9 +28,9 @@ import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
import org.apache.spark.ui._
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished jobs */
@@ -123,55 +123,55 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
}
- private def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = {
+ private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]):
+ Seq[String] = {
val events = ListBuffer[String]()
executorUIDatas.foreach {
- case (executorId, event) =>
+ case a: SparkListenerExecutorAdded =>
val addedEvent =
s"""
|{
| 'className': 'executor added',
| 'group': 'executors',
- | 'start': new Date(${event.startTime}),
+ | 'start': new Date(${a.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
- | 'data-title="Executor ${executorId}<br>' +
- | 'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' +
- | 'data-html="true">Executor ${executorId} added</div>'
+ | 'data-title="Executor ${a.executorId}<br>' +
+ | 'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+ | 'data-html="true">Executor ${a.executorId} added</div>'
|}
""".stripMargin
events += addedEvent
+ case e: SparkListenerExecutorRemoved =>
+ val removedEvent =
+ s"""
+ |{
+ | 'className': 'executor removed',
+ | 'group': 'executors',
+ | 'start': new Date(${e.time}),
+ | 'content': '<div class="executor-event-content"' +
+ | 'data-toggle="tooltip" data-placement="bottom"' +
+ | 'data-title="Executor ${e.executorId}<br>' +
+ | 'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+ | '${
+ if (e.reason != null) {
+ s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+ } else {
+ ""
+ }
+ }"' +
+ | 'data-html="true">Executor ${e.executorId} removed</div>'
+ |}
+ """.stripMargin
+ events += removedEvent
- if (event.finishTime.isDefined) {
- val removedEvent =
- s"""
- |{
- | 'className': 'executor removed',
- | 'group': 'executors',
- | 'start': new Date(${event.finishTime.get}),
- | 'content': '<div class="executor-event-content"' +
- | 'data-toggle="tooltip" data-placement="bottom"' +
- | 'data-title="Executor ${executorId}<br>' +
- | 'Removed at ${UIUtils.formatDate(new Date(event.finishTime.get))}' +
- | '${
- if (event.finishReason.isDefined) {
- s"""<br>Reason: ${event.finishReason.get.replace("\n", " ")}"""
- } else {
- ""
- }
- }"' +
- | 'data-html="true">Executor ${executorId} removed</div>'
- |}
- """.stripMargin
- events += removedEvent
- }
}
events.toSeq
}
private def makeTimeline(
jobs: Seq[JobUIData],
- executors: HashMap[String, ExecutorUIData],
+ executors: Seq[SparkListenerEvent],
startTime: Long): Seq[Node] = {
val jobEventJsonAsStrSeq = makeJobEvent(jobs)
@@ -353,7 +353,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
var content = summary
val executorListener = parent.executorListener
content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
- executorListener.executorIdToData, startTime)
+ executorListener.executorEvents, startTime)
if (shouldShowActiveJobs) {
content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 133c3b1b9a..9fb3f35fd9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -118,7 +118,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<div style="float: left">{k}</div>
<div style="float: right">
{
- val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
+ val logs = parent.executorsListener.executorToTaskSummary.get(k)
+ .map(_.executorLogs).getOrElse(Map.empty)
logs.map {
case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 0ec42d68d3..2f7f8976a8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -20,15 +20,14 @@ package org.apache.spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.collection.mutable.{Buffer, HashMap, ListBuffer}
+import scala.collection.mutable.{Buffer, ListBuffer}
import scala.xml.{Node, NodeSeq, Unparsed, Utility}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
/** Page showing statistics and stage list for a given job */
private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
@@ -93,55 +92,55 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
}
- def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = {
+ def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): Seq[String] = {
val events = ListBuffer[String]()
executorUIDatas.foreach {
- case (executorId, event) =>
+ case a: SparkListenerExecutorAdded =>
val addedEvent =
s"""
|{
| 'className': 'executor added',
| 'group': 'executors',
- | 'start': new Date(${event.startTime}),
+ | 'start': new Date(${a.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
- | 'data-title="Executor ${executorId}<br>' +
- | 'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' +
- | 'data-html="true">Executor ${executorId} added</div>'
+ | 'data-title="Executor ${a.executorId}<br>' +
+ | 'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+ | 'data-html="true">Executor ${a.executorId} added</div>'
|}
""".stripMargin
events += addedEvent
- if (event.finishTime.isDefined) {
- val removedEvent =
- s"""
- |{
- | 'className': 'executor removed',
- | 'group': 'executors',
- | 'start': new Date(${event.finishTime.get}),
- | 'content': '<div class="executor-event-content"' +
- | 'data-toggle="tooltip" data-placement="bottom"' +
- | 'data-title="Executor ${executorId}<br>' +
- | 'Removed at ${UIUtils.formatDate(new Date(event.finishTime.get))}' +
- | '${
- if (event.finishReason.isDefined) {
- s"""<br>Reason: ${event.finishReason.get.replace("\n", " ")}"""
- } else {
- ""
- }
- }"' +
- | 'data-html="true">Executor ${executorId} removed</div>'
- |}
- """.stripMargin
- events += removedEvent
- }
+ case e: SparkListenerExecutorRemoved =>
+ val removedEvent =
+ s"""
+ |{
+ | 'className': 'executor removed',
+ | 'group': 'executors',
+ | 'start': new Date(${e.time}),
+ | 'content': '<div class="executor-event-content"' +
+ | 'data-toggle="tooltip" data-placement="bottom"' +
+ | 'data-title="Executor ${e.executorId}<br>' +
+ | 'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+ | '${
+ if (e.reason != null) {
+ s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+ } else {
+ ""
+ }
+ }"' +
+ | 'data-html="true">Executor ${e.executorId} removed</div>'
+ |}
+ """.stripMargin
+ events += removedEvent
+
}
events.toSeq
}
private def makeTimeline(
stages: Seq[StageInfo],
- executors: HashMap[String, ExecutorUIData],
+ executors: Seq[SparkListenerEvent],
appStartTime: Long): Seq[Node] = {
val stageEventJsonAsStrSeq = makeStageEvent(stages)
@@ -319,7 +318,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
val operationGraphListener = parent.operationGraphListener
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
- executorListener.executorIdToData, appStartTime)
+ executorListener.executorEvents, appStartTime)
content ++= UIUtils.showDagVizForJob(
jobId, operationGraphListener.getOperationGraphForJob(jobId))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index de787f2577..c322ae0972 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -1017,8 +1017,8 @@ private[ui] class TaskDataSource(
None
}
- val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty)
-
+ val logs = executorsListener.executorToTaskSummary.get(info.executorId)
+ .map(_.executorLogs).getOrElse(Map.empty)
new TaskTableRowData(
info.index,
info.taskId,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 74bca9931a..c729f03b3c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -177,11 +177,6 @@ private[spark] object UIData {
}
}
- class ExecutorUIData(
- val startTime: Long,
- var finishTime: Option[Long] = None,
- var finishReason: Option[String] = None)
-
case class TaskMetricsUIData(
executorDeserializeTime: Long,
executorRunTime: Long,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fbd78aeb20..37fff2efa4 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -426,6 +426,18 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"),