aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala10
6 files changed, 58 insertions, 57 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 9c92a50150..f8d6e9fbbb 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -147,7 +147,7 @@ private[v1] object AllStagesResource {
speculative = uiData.taskInfo.speculative,
accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
errorMessage = uiData.errorMessage,
- taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
+ taskMetrics = uiData.metrics.map { convertUiTaskMetrics }
)
}
@@ -155,7 +155,7 @@ private[v1] object AllStagesResource {
allTaskData: Iterable[TaskUIData],
quantiles: Array[Double]): TaskMetricDistributions = {
- val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
+ val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq
def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 788f35ec77..3fd0efd3a1 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -70,7 +70,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
- executorIdToData(eid) = ExecutorUIData(executorAdded.time)
+ executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
}
override def onExecutorRemoved(
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ed3ab66e3b..13f5f84d06 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -396,13 +396,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
None
}
taskMetrics.foreach { m =>
- val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
+ val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
- taskData.taskMetrics = taskMetrics
+ taskData.metrics = taskMetrics
taskData.errorMessage = errorMessage
for (
@@ -506,9 +506,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
taskData.foreach { t =>
if (!t.taskInfo.finished) {
- updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics)
+ updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
- t.taskMetrics = Some(metrics)
+ t.metrics = Some(metrics)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 689ab7dd5e..8a44bbd9fc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -330,7 +330,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else taskTable.dataSource.slicedTaskIds
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
+ val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined)
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -348,8 +348,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
}
- val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorDeserializeTime.toDouble
+ val deserializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
@@ -359,13 +359,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)
- val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorRunTime.toDouble
+ val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes)
- val gcTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.jvmGCTime.toDouble
+ val gcTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.jvmGCTime.toDouble
}
val gcQuantiles =
<td>
@@ -374,8 +374,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(gcTimes)
- val serializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.resultSerializationTime.toDouble
+ val serializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.resultSerializationTime.toDouble
}
val serializationQuantiles =
<td>
@@ -385,8 +385,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(serializationTimes)
- val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
- getGettingResultTime(info, currentTime).toDouble
+ val gettingResultTimes = validTasks.map { taskUIData: TaskUIData =>
+ getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
}
val gettingResultQuantiles =
<td>
@@ -397,8 +397,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.peakExecutionMemory.toDouble
+ val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
@@ -412,8 +412,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
- val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
- getSchedulerDelay(info, metrics.get, currentTime).toDouble
+ val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
+ getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble
}
val schedulerDelayTitle = <td><span data-toggle="tooltip"
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
@@ -427,30 +427,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
)
}
- val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ val inputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val inputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val inputQuantiles = <td>Input Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
- val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val outputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val outputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val outputQuantiles = <td>Output Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
- val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
+ val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
val shuffleReadBlockedQuantiles =
<td>
@@ -461,11 +461,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
- val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
+ val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val shuffleReadTotalQuantiles =
<td>
@@ -476,8 +476,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
- val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadRemoteQuantiles =
<td>
@@ -488,25 +488,25 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantiles(shuffleReadRemoteSizes)
- val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
- val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.memoryBytesSpilled.toDouble
+ val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.memoryBytesSpilled.toDouble
}
val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
getFormattedSizeQuantiles(memoryBytesSpilledSizes)
- val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.diskBytesSpilled.toDouble
+ val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.diskBytesSpilled.toDouble
}
val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
getFormattedSizeQuantiles(diskBytesSpilledSizes)
@@ -601,7 +601,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100
- val metricsOpt = taskUIData.taskMetrics
+ val metricsOpt = taskUIData.metrics
val shuffleReadTime =
metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L)
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
@@ -868,7 +868,8 @@ private[ui] class TaskDataSource(
def slicedTaskIds: Set[Long] = _slicedTaskIds
private def taskRow(taskData: TaskUIData): TaskTableRowData = {
- val TaskUIData(info, metrics, errorMessage) = taskData
+ val info = taskData.taskInfo
+ val metrics = taskData.metrics
val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
@@ -1014,7 +1015,7 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- errorMessage.getOrElse(""))
+ taskData.errorMessage.getOrElse(""))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 78165d7b74..b454ef1b20 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -105,12 +105,12 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
- case class TaskUIData(
+ class TaskUIData(
var taskInfo: TaskInfo,
- var taskMetrics: Option[TaskMetrics] = None,
+ var metrics: Option[TaskMetrics] = None,
var errorMessage: Option[String] = None)
- case class ExecutorUIData(
+ class ExecutorUIData(
val startTime: Long,
var finishTime: Option[Long] = None,
var finishReason: Option[String] = None)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 9876bded33..7d4c0863bc 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -322,11 +322,11 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 207)
assert(stage0Data.outputBytes == 116)
assert(stage1Data.outputBytes == 208)
- assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
- assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 102)
- assert(stage1Data.taskData.get(1236L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 202)
// task that was included in a heartbeat
@@ -355,9 +355,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 614)
assert(stage0Data.outputBytes == 416)
assert(stage1Data.outputBytes == 616)
- assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
- assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 402)
}
}