From 362ba4b6f8e8fc2355368742c5adced7573fec00 Mon Sep 17 00:00:00 2001 From: Alex Bozarth Date: Sat, 8 Oct 2016 11:24:00 +0100 Subject: [SPARK-17793][WEB UI] Sorting on the description on the Job or Stage page doesn’t always work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Added secondary sorting on stage name for the description column. This provide a clearer behavior in the common case where the Description column only comprises of Stage names instead of the option description value. ## How was this patch tested? manual testing and dev/run-tests Screenshots of sorting on both description and stage name as well as an example of both: ![screen shot 2016-10-04 at 1 09 39 pm](https://cloud.githubusercontent.com/assets/13952758/19135523/067b042e-8b1a-11e6-912e-e6371d006d21.png) ![screen shot 2016-10-04 at 1 09 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135526/06960936-8b1a-11e6-85e9-8aaf694c5f7b.png) ![screen shot 2016-10-05 at 1 14 45 pm](https://cloud.githubusercontent.com/assets/13952758/19135525/069547da-8b1a-11e6-8692-6524c75c4c07.png) ![screen shot 2016-10-05 at 1 14 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135524/0694b4d2-8b1a-11e6-92dc-c8aa514e4f62.png) ![screen shot 2016-10-05 at 4 42 52 pm](https://cloud.githubusercontent.com/assets/13952758/19135618/e232eafe-8b1a-11e6-88b3-ff0bbb26b7f8.png) Author: Alex Bozarth Closes #15366 from ajbozarth/spark17793. --- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 25 +--- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 134 ++++----------------- .../org/apache/spark/ui/jobs/StageTable.scala | 51 ++------ .../org/apache/spark/ui/storage/RDDPage.scala | 27 +---- 4 files changed, 49 insertions(+), 188 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 19bb41a141..f6713097b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -457,23 +457,11 @@ private[ui] class JobDataSource( * Return Ordering according to sortColumn and desc */ private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = { - val ordering = sortColumn match { - case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] { - override def compare(x: JobTableRowData, y: JobTableRowData): Int = - Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId) - } - case "Description" => new Ordering[JobTableRowData] { - override def compare(x: JobTableRowData, y: JobTableRowData): Int = - Ordering.String.compare(x.lastStageDescription, y.lastStageDescription) - } - case "Submitted" => new Ordering[JobTableRowData] { - override def compare(x: JobTableRowData, y: JobTableRowData): Int = - Ordering.Long.compare(x.submissionTime, y.submissionTime) - } - case "Duration" => new Ordering[JobTableRowData] { - override def compare(x: JobTableRowData, y: JobTableRowData): Int = - Ordering.Long.compare(x.duration, y.duration) - } + val ordering: Ordering[JobTableRowData] = sortColumn match { + case "Job Id" | "Job Id (Job Group)" => Ordering.by(_.jobData.jobId) + case "Description" => Ordering.by(x => (x.lastStageDescription, x.lastStageName)) + case "Submitted" => Ordering.by(_.submissionTime) + case "Duration" => Ordering.by(_.duration) case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" => throw new IllegalArgumentException(s"Unsortable column: $sortColumn") case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") @@ -501,8 +489,7 @@ private[ui] class JobPagedTable( sortColumn: String, desc: Boolean ) extends PagedTable[JobTableRowData] { - val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" + - parameterOtherTable.mkString("&") + val parameterPath = basePath + s"/$subPath/?" + parameterOtherTable.mkString("&") override def tableId: String = jobTag + "-table" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c322ae0972..8c7cefe200 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1050,89 +1050,38 @@ private[ui] class TaskDataSource( * Return Ordering according to sortColumn and desc */ private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = { - val ordering = sortColumn match { - case "Index" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Int.compare(x.index, y.index) - } - case "ID" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.taskId, y.taskId) - } - case "Attempt" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Int.compare(x.attempt, y.attempt) - } - case "Status" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.String.compare(x.status, y.status) - } - case "Locality Level" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.String.compare(x.taskLocality, y.taskLocality) - } - case "Executor ID / Host" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost) - } - case "Launch Time" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.launchTime, y.launchTime) - } - case "Duration" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.duration, y.duration) - } - case "Scheduler Delay" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay) - } - case "Task Deserialization Time" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime) - } - case "GC Time" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.gcTime, y.gcTime) - } - case "Result Serialization Time" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.serializationTime, y.serializationTime) - } - case "Getting Result Time" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime) - } - case "Peak Execution Memory" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.peakExecutionMemoryUsed, y.peakExecutionMemoryUsed) - } + val ordering: Ordering[TaskTableRowData] = sortColumn match { + case "Index" => Ordering.by(_.index) + case "ID" => Ordering.by(_.taskId) + case "Attempt" => Ordering.by(_.attempt) + case "Status" => Ordering.by(_.status) + case "Locality Level" => Ordering.by(_.taskLocality) + case "Executor ID / Host" => Ordering.by(_.executorIdAndHost) + case "Launch Time" => Ordering.by(_.launchTime) + case "Duration" => Ordering.by(_.duration) + case "Scheduler Delay" => Ordering.by(_.schedulerDelay) + case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime) + case "GC Time" => Ordering.by(_.gcTime) + case "Result Serialization Time" => Ordering.by(_.serializationTime) + case "Getting Result Time" => Ordering.by(_.gettingResultTime) + case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed) case "Accumulators" => if (hasAccumulators) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.String.compare(x.accumulators.get, y.accumulators.get) - } + Ordering.by(_.accumulators.get) } else { throw new IllegalArgumentException( "Cannot sort by Accumulators because of no accumulators") } case "Input Size / Records" => if (hasInput) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable) - } + Ordering.by(_.input.get.inputSortable) } else { throw new IllegalArgumentException( "Cannot sort by Input Size / Records because of no inputs") } case "Output Size / Records" => if (hasOutput) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable) - } + Ordering.by(_.output.get.outputSortable) } else { throw new IllegalArgumentException( "Cannot sort by Output Size / Records because of no outputs") @@ -1140,33 +1089,21 @@ private[ui] class TaskDataSource( // ShuffleRead case "Shuffle Read Blocked Time" => if (hasShuffleRead) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable, - y.shuffleRead.get.shuffleReadBlockedTimeSortable) - } + Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads") } case "Shuffle Read Size / Records" => if (hasShuffleRead) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable, - y.shuffleRead.get.shuffleReadSortable) - } + Ordering.by(_.shuffleRead.get.shuffleReadSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Read Size / Records because of no shuffle reads") } case "Shuffle Remote Reads" => if (hasShuffleRead) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable, - y.shuffleRead.get.shuffleReadRemoteSortable) - } + Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Remote Reads because of no shuffle reads") @@ -1174,22 +1111,14 @@ private[ui] class TaskDataSource( // ShuffleWrite case "Write Time" => if (hasShuffleWrite) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable, - y.shuffleWrite.get.writeTimeSortable) - } + Ordering.by(_.shuffleWrite.get.writeTimeSortable) } else { throw new IllegalArgumentException( "Cannot sort by Write Time because of no shuffle writes") } case "Shuffle Write Size / Records" => if (hasShuffleWrite) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable, - y.shuffleWrite.get.shuffleWriteSortable) - } + Ordering.by(_.shuffleWrite.get.shuffleWriteSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Write Size / Records because of no shuffle writes") @@ -1197,30 +1126,19 @@ private[ui] class TaskDataSource( // BytesSpilled case "Shuffle Spill (Memory)" => if (hasBytesSpilled) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable, - y.bytesSpilled.get.memoryBytesSpilledSortable) - } + Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Spill (Memory) because of no spills") } case "Shuffle Spill (Disk)" => if (hasBytesSpilled) { - new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable, - y.bytesSpilled.get.diskBytesSpilledSortable) - } + Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable) } else { throw new IllegalArgumentException( "Cannot sort by Shuffle Spill (Disk) because of no spills") } - case "Errors" => new Ordering[TaskTableRowData] { - override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = - Ordering.String.compare(x.error, y.error) - } + case "Errors" => Ordering.by(_.error) case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") } if (desc) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 40a6762c28..9b9b4681ba 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -109,7 +109,6 @@ private[ui] class StageTableRowData( val stageId: Int, val attemptId: Int, val schedulingPool: String, - val description: String, val descriptionOption: Option[String], val submissionTime: Long, val formattedSubmissionTime: String, @@ -128,7 +127,7 @@ private[ui] class MissingStageTableRowData( stageInfo: StageInfo, stageId: Int, attemptId: Int) extends StageTableRowData( - stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") + stageInfo, None, stageId, attemptId, "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") /** Page showing list of all ongoing and recently finished stages */ private[ui] class StagePagedTable( @@ -470,7 +469,6 @@ private[ui] class StageDataSource( s.stageId, s.attemptId, stageData.schedulingPool, - description.getOrElse(""), description, s.submissionTime.getOrElse(0), formattedSubmissionTime, @@ -491,43 +489,16 @@ private[ui] class StageDataSource( * Return Ordering according to sortColumn and desc */ private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = { - val ordering = sortColumn match { - case "Stage Id" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Int.compare(x.stageId, y.stageId) - } - case "Pool Name" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.String.compare(x.schedulingPool, y.schedulingPool) - } - case "Description" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.String.compare(x.description, y.description) - } - case "Submitted" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.submissionTime, y.submissionTime) - } - case "Duration" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.duration, y.duration) - } - case "Input" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.inputRead, y.inputRead) - } - case "Output" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.outputWrite, y.outputWrite) - } - case "Shuffle Read" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.shuffleRead, y.shuffleRead) - } - case "Shuffle Write" => new Ordering[StageTableRowData] { - override def compare(x: StageTableRowData, y: StageTableRowData): Int = - Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite) - } + val ordering: Ordering[StageTableRowData] = sortColumn match { + case "Stage Id" => Ordering.by(_.stageId) + case "Pool Name" => Ordering.by(_.schedulingPool) + case "Description" => Ordering.by(x => (x.descriptionOption, x.stageInfo.name)) + case "Submitted" => Ordering.by(_.submissionTime) + case "Duration" => Ordering.by(_.duration) + case "Input" => Ordering.by(_.inputRead) + case "Output" => Ordering.by(_.outputWrite) + case "Shuffle Read" => Ordering.by(_.shuffleRead) + case "Shuffle Write" => Ordering.by(_.shuffleWrite) case "Tasks: Succeeded/Total" => throw new IllegalArgumentException(s"Unsortable column: $sortColumn") case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 606d15d599..227e940c9c 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -197,27 +197,12 @@ private[ui] class BlockDataSource( * Return Ordering according to sortColumn and desc */ private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = { - val ordering = sortColumn match { - case "Block Name" => new Ordering[BlockTableRowData] { - override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = - Ordering.String.compare(x.blockName, y.blockName) - } - case "Storage Level" => new Ordering[BlockTableRowData] { - override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = - Ordering.String.compare(x.storageLevel, y.storageLevel) - } - case "Size in Memory" => new Ordering[BlockTableRowData] { - override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = - Ordering.Long.compare(x.memoryUsed, y.memoryUsed) - } - case "Size on Disk" => new Ordering[BlockTableRowData] { - override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = - Ordering.Long.compare(x.diskUsed, y.diskUsed) - } - case "Executors" => new Ordering[BlockTableRowData] { - override def compare(x: BlockTableRowData, y: BlockTableRowData): Int = - Ordering.String.compare(x.executors, y.executors) - } + val ordering: Ordering[BlockTableRowData] = sortColumn match { + case "Block Name" => Ordering.by(_.blockName) + case "Storage Level" => Ordering.by(_.storageLevel) + case "Size in Memory" => Ordering.by(_.memoryUsed) + case "Size on Disk" => Ordering.by(_.diskUsed) + case "Executors" => Ordering.by(_.executors) case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") } if (desc) { -- cgit v1.2.3