aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-14 10:12:29 -0700
committerReynold Xin <rxin@databricks.com>2016-04-14 10:12:29 -0700
commitde2ad52855aee3c60bbc4642afb180d6fe62173b (patch)
tree20a0b06e641396339262e30f010ea112f59b9838
parent0d22092cd9c8876a7f226add578ff1c025012fe9 (diff)
downloadspark-de2ad52855aee3c60bbc4642afb180d6fe62173b.tar.gz
spark-de2ad52855aee3c60bbc4642afb180d6fe62173b.tar.bz2
spark-de2ad52855aee3c60bbc4642afb180d6fe62173b.zip
[SPARK-14625] TaskUIData and ExecutorUIData shouldn't be case classes
## What changes were proposed in this pull request? I was trying to understand the accumulator and metrics update source code and these two classes don't really need to be case classes. It would also be more consistent with other UI classes if they are not case classes. This is part of my bigger effort to simplify accumulators and task metrics. ## How was this patch tested? This is a straightforward refactoring without behavior change. Author: Reynold Xin <rxin@databricks.com> Closes #12386 from rxin/SPARK-14625.
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala10
6 files changed, 58 insertions, 57 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 9c92a50150..f8d6e9fbbb 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -147,7 +147,7 @@ private[v1] object AllStagesResource {
speculative = uiData.taskInfo.speculative,
accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
errorMessage = uiData.errorMessage,
- taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
+ taskMetrics = uiData.metrics.map { convertUiTaskMetrics }
)
}
@@ -155,7 +155,7 @@ private[v1] object AllStagesResource {
allTaskData: Iterable[TaskUIData],
quantiles: Array[Double]): TaskMetricDistributions = {
- val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
+ val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq
def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 788f35ec77..3fd0efd3a1 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -70,7 +70,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
- executorIdToData(eid) = ExecutorUIData(executorAdded.time)
+ executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
}
override def onExecutorRemoved(
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ed3ab66e3b..13f5f84d06 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -396,13 +396,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
None
}
taskMetrics.foreach { m =>
- val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
+ val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
- taskData.taskMetrics = taskMetrics
+ taskData.metrics = taskMetrics
taskData.errorMessage = errorMessage
for (
@@ -506,9 +506,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
taskData.foreach { t =>
if (!t.taskInfo.finished) {
- updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics)
+ updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
- t.taskMetrics = Some(metrics)
+ t.metrics = Some(metrics)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 689ab7dd5e..8a44bbd9fc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -330,7 +330,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else taskTable.dataSource.slicedTaskIds
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
+ val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined)
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -348,8 +348,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
}
- val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorDeserializeTime.toDouble
+ val deserializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
@@ -359,13 +359,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)
- val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.executorRunTime.toDouble
+ val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes)
- val gcTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.jvmGCTime.toDouble
+ val gcTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.jvmGCTime.toDouble
}
val gcQuantiles =
<td>
@@ -374,8 +374,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(gcTimes)
- val serializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.resultSerializationTime.toDouble
+ val serializationTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.resultSerializationTime.toDouble
}
val serializationQuantiles =
<td>
@@ -385,8 +385,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(serializationTimes)
- val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
- getGettingResultTime(info, currentTime).toDouble
+ val gettingResultTimes = validTasks.map { taskUIData: TaskUIData =>
+ getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
}
val gettingResultQuantiles =
<td>
@@ -397,8 +397,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.peakExecutionMemory.toDouble
+ val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
@@ -412,8 +412,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
- val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
- getSchedulerDelay(info, metrics.get, currentTime).toDouble
+ val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
+ getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble
}
val schedulerDelayTitle = <td><span data-toggle="tooltip"
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
@@ -427,30 +427,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
)
}
- val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+ val inputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val inputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val inputQuantiles = <td>Input Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
- val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val outputSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val outputRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val outputQuantiles = <td>Output Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
- val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
+ val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
val shuffleReadBlockedQuantiles =
<td>
@@ -461,11 +461,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)
- val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
+ val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
+ val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val shuffleReadTotalQuantiles =
<td>
@@ -476,8 +476,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
- val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadRemoteQuantiles =
<td>
@@ -488,25 +488,25 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantiles(shuffleReadRemoteSizes)
- val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+ val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
+ val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
- val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.memoryBytesSpilled.toDouble
+ val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.memoryBytesSpilled.toDouble
}
val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
getFormattedSizeQuantiles(memoryBytesSpilledSizes)
- val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
- metrics.get.diskBytesSpilled.toDouble
+ val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
+ taskUIData.metrics.get.diskBytesSpilled.toDouble
}
val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
getFormattedSizeQuantiles(diskBytesSpilledSizes)
@@ -601,7 +601,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100
- val metricsOpt = taskUIData.taskMetrics
+ val metricsOpt = taskUIData.metrics
val shuffleReadTime =
metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L)
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
@@ -868,7 +868,8 @@ private[ui] class TaskDataSource(
def slicedTaskIds: Set[Long] = _slicedTaskIds
private def taskRow(taskData: TaskUIData): TaskTableRowData = {
- val TaskUIData(info, metrics, errorMessage) = taskData
+ val info = taskData.taskInfo
+ val metrics = taskData.metrics
val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
@@ -1014,7 +1015,7 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- errorMessage.getOrElse(""))
+ taskData.errorMessage.getOrElse(""))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 78165d7b74..b454ef1b20 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -105,12 +105,12 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
- case class TaskUIData(
+ class TaskUIData(
var taskInfo: TaskInfo,
- var taskMetrics: Option[TaskMetrics] = None,
+ var metrics: Option[TaskMetrics] = None,
var errorMessage: Option[String] = None)
- case class ExecutorUIData(
+ class ExecutorUIData(
val startTime: Long,
var finishTime: Option[Long] = None,
var finishReason: Option[String] = None)
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 9876bded33..7d4c0863bc 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -322,11 +322,11 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 207)
assert(stage0Data.outputBytes == 116)
assert(stage1Data.outputBytes == 208)
- assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
- assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 102)
- assert(stage1Data.taskData.get(1236L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 202)
// task that was included in a heartbeat
@@ -355,9 +355,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 614)
assert(stage0Data.outputBytes == 416)
assert(stage1Data.outputBytes == 616)
- assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
- assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
+ assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 402)
}
}