From de2ad52855aee3c60bbc4642afb180d6fe62173b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 14 Apr 2016 10:12:29 -0700 Subject: [SPARK-14625] TaskUIData and ExecutorUIData shouldn't be case classes ## What changes were proposed in this pull request? I was trying to understand the accumulator and metrics update source code and these two classes don't really need to be case classes. It would also be more consistent with other UI classes if they are not case classes. This is part of my bigger effort to simplify accumulators and task metrics. ## How was this patch tested? This is a straightforward refactoring without behavior change. Author: Reynold Xin Closes #12386 from rxin/SPARK-14625. --- .../spark/status/api/v1/AllStagesResource.scala | 4 +- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 2 +- .../apache/spark/ui/jobs/JobProgressListener.scala | 8 +- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 85 +++++++++++----------- .../scala/org/apache/spark/ui/jobs/UIData.scala | 6 +- 5 files changed, 53 insertions(+), 52 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 9c92a50150..f8d6e9fbbb 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -147,7 +147,7 @@ private[v1] object AllStagesResource { speculative = uiData.taskInfo.speculative, accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, errorMessage = uiData.errorMessage, - taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics } + taskMetrics = uiData.metrics.map { convertUiTaskMetrics } ) } @@ -155,7 +155,7 @@ private[v1] object AllStagesResource { allTaskData: Iterable[TaskUIData], quantiles: Array[Double]): TaskMetricDistributions = { - val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq + val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] = Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 788f35ec77..3fd0efd3a1 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -70,7 +70,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap executorToTotalCores(eid) = executorAdded.executorInfo.totalCores executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1) - executorIdToData(eid) = ExecutorUIData(executorAdded.time) + executorIdToData(eid) = new ExecutorUIData(executorAdded.time) } override def onExecutorRemoved( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index ed3ab66e3b..13f5f84d06 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -396,13 +396,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { None } taskMetrics.foreach { m => - val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics) + val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) taskData.taskInfo = info - taskData.taskMetrics = taskMetrics + taskData.metrics = taskMetrics taskData.errorMessage = errorMessage for ( @@ -506,9 +506,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates) taskData.foreach { t => if (!t.taskInfo.finished) { - updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics) + updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics) // Overwrite task metrics - t.taskMetrics = Some(metrics) + t.metrics = Some(metrics) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 689ab7dd5e..8a44bbd9fc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -330,7 +330,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { else taskTable.dataSource.slicedTaskIds // Excludes tasks which failed and have incomplete metrics - val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) + val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined) val summaryTable: Option[Seq[Node]] = if (validTasks.size == 0) { @@ -348,8 +348,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getDistributionQuantiles(data).map(d => {Utils.bytesToString(d.toLong)}) } - val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.executorDeserializeTime.toDouble + val deserializationTimes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.executorDeserializeTime.toDouble } val deserializationQuantiles = @@ -359,13 +359,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(deserializationTimes) - val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.executorRunTime.toDouble + val serviceTimes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.executorRunTime.toDouble } val serviceQuantiles = Duration +: getFormattedTimeQuantiles(serviceTimes) - val gcTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.jvmGCTime.toDouble + val gcTimes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.jvmGCTime.toDouble } val gcQuantiles = @@ -374,8 +374,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(gcTimes) - val serializationTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.resultSerializationTime.toDouble + val serializationTimes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.resultSerializationTime.toDouble } val serializationQuantiles = @@ -385,8 +385,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(serializationTimes) - val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) => - getGettingResultTime(info, currentTime).toDouble + val gettingResultTimes = validTasks.map { taskUIData: TaskUIData => + getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble } val gettingResultQuantiles = @@ -397,8 +397,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(gettingResultTimes) - val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.peakExecutionMemory.toDouble + val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.peakExecutionMemory.toDouble } val peakExecutionMemoryQuantiles = { @@ -412,8 +412,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // The scheduler delay includes the network delay to send the task to the worker // machine and to send back the result (but not the time to fetch the task result, // if it needed to be fetched from the block manager on the worker). - val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) => - getSchedulerDelay(info, metrics.get, currentTime).toDouble + val schedulerDelays = validTasks.map { taskUIData: TaskUIData => + getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble } val schedulerDelayTitle = Scheduler Delay @@ -427,30 +427,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ) } - val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + val inputSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble } - val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble + val inputRecords = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble } val inputQuantiles = Input Size / Records +: getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) - val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble + val outputSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble } - val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + val outputRecords = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble } val outputQuantiles = Output Size / Records +: getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) - val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble + val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble } val shuffleReadBlockedQuantiles = @@ -461,11 +461,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(shuffleReadBlockedTimes) - val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble + val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble } - val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble + val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble } val shuffleReadTotalQuantiles = @@ -476,8 +476,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords) - val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } val shuffleReadRemoteQuantiles = @@ -488,25 +488,25 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedSizeQuantiles(shuffleReadRemoteSizes) - val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble + val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble } - val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble } val shuffleWriteQuantiles = Shuffle Write Size / Records +: getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) - val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.memoryBytesSpilled.toDouble + val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.memoryBytesSpilled.toDouble } val memoryBytesSpilledQuantiles = Shuffle spill (memory) +: getFormattedSizeQuantiles(memoryBytesSpilledSizes) - val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.diskBytesSpilled.toDouble + val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => + taskUIData.metrics.get.diskBytesSpilled.toDouble } val diskBytesSpilledQuantiles = Shuffle spill (disk) +: getFormattedSizeQuantiles(diskBytesSpilledSizes) @@ -601,7 +601,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100 - val metricsOpt = taskUIData.taskMetrics + val metricsOpt = taskUIData.metrics val shuffleReadTime = metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L) val shuffleReadTimeProportion = toProportion(shuffleReadTime) @@ -868,7 +868,8 @@ private[ui] class TaskDataSource( def slicedTaskIds: Set[Long] = _slicedTaskIds private def taskRow(taskData: TaskUIData): TaskTableRowData = { - val TaskUIData(info, metrics, errorMessage) = taskData + val info = taskData.taskInfo + val metrics = taskData.metrics val duration = if (info.status == "RUNNING") info.timeRunning(currentTime) else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) @@ -1014,7 +1015,7 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - errorMessage.getOrElse("")) + taskData.errorMessage.getOrElse("")) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 78165d7b74..b454ef1b20 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -105,12 +105,12 @@ private[spark] object UIData { /** * These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation. */ - case class TaskUIData( + class TaskUIData( var taskInfo: TaskInfo, - var taskMetrics: Option[TaskMetrics] = None, + var metrics: Option[TaskMetrics] = None, var errorMessage: Option[String] = None) - case class ExecutorUIData( + class ExecutorUIData( val startTime: Long, var finishTime: Option[Long] = None, var finishReason: Option[String] = None) -- cgit v1.2.3