aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala85
1 files changed, 43 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 689ab7dd5e..8a44bbd9fc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -330,7 +330,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else taskTable.dataSource.slicedTaskIds
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
+ val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined)
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -348,8 +348,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
}
- val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorDeserializeTime.toDouble
+ val deserializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
@@ -359,13 +359,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)
- val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorRunTime.toDouble
+ val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes)
- val gcTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.jvmGCTime.toDouble
+ val gcTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.jvmGCTime.toDouble
}
val gcQuantiles =
<td>
@@ -374,8 +374,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(gcTimes)
- val serializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.resultSerializationTime.toDouble
+ val serializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.resultSerializationTime.toDouble
}
val serializationQuantiles =
<td>
@@ -385,8 +385,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(serializationTimes)
- val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
- getGettingResultTime(info, currentTime).toDouble
+ val gettingResultTimes = validTasks.map { taskUIData: TaskUIData =>
+ getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
}
val gettingResultQuantiles =
<td>
@@ -397,8 +397,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.peakExecutionMemory.toDouble
+ val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
@@ -412,8 +412,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
- val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
- getSchedulerDelay(info, metrics.get, currentTime).toDouble
+ val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
+ getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble
}
val schedulerDelayTitle = <td><span data-toggle="tooltip"
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
@@ -427,30 +427,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
)
}
- val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ val inputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val inputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val inputQuantiles = <td>Input Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
- val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val outputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val outputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val outputQuantiles = <td>Output Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
- val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
+ val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
val shuffleReadBlockedQuantiles =
<td>
@@ -461,11 +461,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
- val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
+ val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val shuffleReadTotalQuantiles =
<td>
@@ -476,8 +476,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
- val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadRemoteQuantiles =
<td>
@@ -488,25 +488,25 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantiles(shuffleReadRemoteSizes)
- val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
- val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.memoryBytesSpilled.toDouble
+ val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.memoryBytesSpilled.toDouble
}
val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
getFormattedSizeQuantiles(memoryBytesSpilledSizes)
- val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.diskBytesSpilled.toDouble
+ val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.diskBytesSpilled.toDouble
}
val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
getFormattedSizeQuantiles(diskBytesSpilledSizes)
@@ -601,7 +601,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100
- val metricsOpt = taskUIData.taskMetrics
+ val metricsOpt = taskUIData.metrics
val shuffleReadTime =
metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L)
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
@@ -868,7 +868,8 @@ private[ui] class TaskDataSource(
def slicedTaskIds: Set[Long] = _slicedTaskIds
private def taskRow(taskData: TaskUIData): TaskTableRowData = {
- val TaskUIData(info, metrics, errorMessage) = taskData
+ val info = taskData.taskInfo
+ val metrics = taskData.metrics
val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
@@ -1014,7 +1015,7 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- errorMessage.getOrElse(""))
+ taskData.errorMessage.getOrElse(""))
}
/**