diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index cfaf121895..08107a3f62 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -64,7 +64,7 @@ private[spark] class StagePage(parent: JobProgressUI) { listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished) - + // scalastyle:off val summary = <div> <ul class="unstyled"> @@ -96,7 +96,7 @@ private[spark] class StagePage(parent: JobProgressUI) { } </ul> </div> - + // scalastyle:on val taskHeaders: Seq[String] = Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ Seq("Duration", "GC Time", "Result Ser Time") ++ @@ -105,7 +105,8 @@ private[spark] class StagePage(parent: JobProgressUI) { {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") - val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + val taskTable = listingTable( + taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) @@ -117,8 +118,9 @@ private[spark] class StagePage(parent: JobProgressUI) { else { val serializationTimes = validTasks.map{case (info, metrics, exception) => metrics.get.resultSerializationTime.toDouble} - val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map( - ms => parent.formatDuration(ms.toLong)) + val serializationQuantiles = + "Result serialization time" +: Distribution(serializationTimes). + get.getQuantiles().map(ms => parent.formatDuration(ms.toLong)) val serviceTimes = validTasks.map{case (info, metrics, exception) => metrics.get.executorRunTime.toDouble} @@ -225,7 +227,8 @@ private[spark] class StagePage(parent: JobProgressUI) { val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("") - val maybeShuffleWrite = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten} + val maybeShuffleWrite = + metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten} val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("") @@ -236,7 +239,8 @@ private[spark] class StagePage(parent: JobProgressUI) { val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled} val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)} + .getOrElse("") val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled} val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") |