aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Bozarth <ajbozart@us.ibm.com>2016-10-08 11:24:00 +0100
committerSean Owen <sowen@cloudera.com>2016-10-08 11:24:00 +0100
commit362ba4b6f8e8fc2355368742c5adced7573fec00 (patch)
tree0a341b105a8d8a2b49a71e11489d47893fd26a3f
parent471690f90f3bf29735faecd83d4671842c57b164 (diff)
downloadspark-362ba4b6f8e8fc2355368742c5adced7573fec00.tar.gz
spark-362ba4b6f8e8fc2355368742c5adced7573fec00.tar.bz2
spark-362ba4b6f8e8fc2355368742c5adced7573fec00.zip
[SPARK-17793][WEB UI] Sorting on the description on the Job or Stage page doesn’t always work
## What changes were proposed in this pull request? Added secondary sorting on stage name for the description column. This provide a clearer behavior in the common case where the Description column only comprises of Stage names instead of the option description value. ## How was this patch tested? manual testing and dev/run-tests Screenshots of sorting on both description and stage name as well as an example of both: ![screen shot 2016-10-04 at 1 09 39 pm](https://cloud.githubusercontent.com/assets/13952758/19135523/067b042e-8b1a-11e6-912e-e6371d006d21.png) ![screen shot 2016-10-04 at 1 09 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135526/06960936-8b1a-11e6-85e9-8aaf694c5f7b.png) ![screen shot 2016-10-05 at 1 14 45 pm](https://cloud.githubusercontent.com/assets/13952758/19135525/069547da-8b1a-11e6-8692-6524c75c4c07.png) ![screen shot 2016-10-05 at 1 14 51 pm](https://cloud.githubusercontent.com/assets/13952758/19135524/0694b4d2-8b1a-11e6-92dc-c8aa514e4f62.png) ![screen shot 2016-10-05 at 4 42 52 pm](https://cloud.githubusercontent.com/assets/13952758/19135618/e232eafe-8b1a-11e6-88b3-ff0bbb26b7f8.png) Author: Alex Bozarth <ajbozart@us.ibm.com> Closes #15366 from ajbozarth/spark17793.
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala134
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala27
4 files changed, 49 insertions, 188 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 19bb41a141..f6713097b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -457,23 +457,11 @@ private[ui] class JobDataSource(
* Return Ordering according to sortColumn and desc
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = {
- val ordering = sortColumn match {
- case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
- override def compare(x: JobTableRowData, y: JobTableRowData): Int =
- Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
- }
- case "Description" => new Ordering[JobTableRowData] {
- override def compare(x: JobTableRowData, y: JobTableRowData): Int =
- Ordering.String.compare(x.lastStageDescription, y.lastStageDescription)
- }
- case "Submitted" => new Ordering[JobTableRowData] {
- override def compare(x: JobTableRowData, y: JobTableRowData): Int =
- Ordering.Long.compare(x.submissionTime, y.submissionTime)
- }
- case "Duration" => new Ordering[JobTableRowData] {
- override def compare(x: JobTableRowData, y: JobTableRowData): Int =
- Ordering.Long.compare(x.duration, y.duration)
- }
+ val ordering: Ordering[JobTableRowData] = sortColumn match {
+ case "Job Id" | "Job Id (Job Group)" => Ordering.by(_.jobData.jobId)
+ case "Description" => Ordering.by(x => (x.lastStageDescription, x.lastStageName))
+ case "Submitted" => Ordering.by(_.submissionTime)
+ case "Duration" => Ordering.by(_.duration)
case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" =>
throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
@@ -501,8 +489,7 @@ private[ui] class JobPagedTable(
sortColumn: String,
desc: Boolean
) extends PagedTable[JobTableRowData] {
- val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
- parameterOtherTable.mkString("&")
+ val parameterPath = basePath + s"/$subPath/?" + parameterOtherTable.mkString("&")
override def tableId: String = jobTag + "-table"
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c322ae0972..8c7cefe200 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -1050,89 +1050,38 @@ private[ui] class TaskDataSource(
* Return Ordering according to sortColumn and desc
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
- val ordering = sortColumn match {
- case "Index" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Int.compare(x.index, y.index)
- }
- case "ID" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.taskId, y.taskId)
- }
- case "Attempt" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Int.compare(x.attempt, y.attempt)
- }
- case "Status" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.String.compare(x.status, y.status)
- }
- case "Locality Level" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.String.compare(x.taskLocality, y.taskLocality)
- }
- case "Executor ID / Host" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
- }
- case "Launch Time" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.launchTime, y.launchTime)
- }
- case "Duration" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.duration, y.duration)
- }
- case "Scheduler Delay" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
- }
- case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime)
- }
- case "GC Time" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.gcTime, y.gcTime)
- }
- case "Result Serialization Time" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.serializationTime, y.serializationTime)
- }
- case "Getting Result Time" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
- }
- case "Peak Execution Memory" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.peakExecutionMemoryUsed, y.peakExecutionMemoryUsed)
- }
+ val ordering: Ordering[TaskTableRowData] = sortColumn match {
+ case "Index" => Ordering.by(_.index)
+ case "ID" => Ordering.by(_.taskId)
+ case "Attempt" => Ordering.by(_.attempt)
+ case "Status" => Ordering.by(_.status)
+ case "Locality Level" => Ordering.by(_.taskLocality)
+ case "Executor ID / Host" => Ordering.by(_.executorIdAndHost)
+ case "Launch Time" => Ordering.by(_.launchTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
+ case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime)
+ case "GC Time" => Ordering.by(_.gcTime)
+ case "Result Serialization Time" => Ordering.by(_.serializationTime)
+ case "Getting Result Time" => Ordering.by(_.gettingResultTime)
+ case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed)
case "Accumulators" =>
if (hasAccumulators) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.String.compare(x.accumulators.get, y.accumulators.get)
- }
+ Ordering.by(_.accumulators.get)
} else {
throw new IllegalArgumentException(
"Cannot sort by Accumulators because of no accumulators")
}
case "Input Size / Records" =>
if (hasInput) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable)
- }
+ Ordering.by(_.input.get.inputSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Input Size / Records because of no inputs")
}
case "Output Size / Records" =>
if (hasOutput) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable)
- }
+ Ordering.by(_.output.get.outputSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Output Size / Records because of no outputs")
@@ -1140,33 +1089,21 @@ private[ui] class TaskDataSource(
// ShuffleRead
case "Shuffle Read Blocked Time" =>
if (hasShuffleRead) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable,
- y.shuffleRead.get.shuffleReadBlockedTimeSortable)
- }
+ Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
}
case "Shuffle Read Size / Records" =>
if (hasShuffleRead) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable,
- y.shuffleRead.get.shuffleReadSortable)
- }
+ Ordering.by(_.shuffleRead.get.shuffleReadSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
}
case "Shuffle Remote Reads" =>
if (hasShuffleRead) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable,
- y.shuffleRead.get.shuffleReadRemoteSortable)
- }
+ Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Remote Reads because of no shuffle reads")
@@ -1174,22 +1111,14 @@ private[ui] class TaskDataSource(
// ShuffleWrite
case "Write Time" =>
if (hasShuffleWrite) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable,
- y.shuffleWrite.get.writeTimeSortable)
- }
+ Ordering.by(_.shuffleWrite.get.writeTimeSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Write Time because of no shuffle writes")
}
case "Shuffle Write Size / Records" =>
if (hasShuffleWrite) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable,
- y.shuffleWrite.get.shuffleWriteSortable)
- }
+ Ordering.by(_.shuffleWrite.get.shuffleWriteSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
@@ -1197,30 +1126,19 @@ private[ui] class TaskDataSource(
// BytesSpilled
case "Shuffle Spill (Memory)" =>
if (hasBytesSpilled) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable,
- y.bytesSpilled.get.memoryBytesSpilledSortable)
- }
+ Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Spill (Memory) because of no spills")
}
case "Shuffle Spill (Disk)" =>
if (hasBytesSpilled) {
- new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable,
- y.bytesSpilled.get.diskBytesSpilledSortable)
- }
+ Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable)
} else {
throw new IllegalArgumentException(
"Cannot sort by Shuffle Spill (Disk) because of no spills")
}
- case "Errors" => new Ordering[TaskTableRowData] {
- override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
- Ordering.String.compare(x.error, y.error)
- }
+ case "Errors" => Ordering.by(_.error)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 40a6762c28..9b9b4681ba 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -109,7 +109,6 @@ private[ui] class StageTableRowData(
val stageId: Int,
val attemptId: Int,
val schedulingPool: String,
- val description: String,
val descriptionOption: Option[String],
val submissionTime: Long,
val formattedSubmissionTime: String,
@@ -128,7 +127,7 @@ private[ui] class MissingStageTableRowData(
stageInfo: StageInfo,
stageId: Int,
attemptId: Int) extends StageTableRowData(
- stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
+ stageInfo, None, stageId, attemptId, "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
/** Page showing list of all ongoing and recently finished stages */
private[ui] class StagePagedTable(
@@ -470,7 +469,6 @@ private[ui] class StageDataSource(
s.stageId,
s.attemptId,
stageData.schedulingPool,
- description.getOrElse(""),
description,
s.submissionTime.getOrElse(0),
formattedSubmissionTime,
@@ -491,43 +489,16 @@ private[ui] class StageDataSource(
* Return Ordering according to sortColumn and desc
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = {
- val ordering = sortColumn match {
- case "Stage Id" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Int.compare(x.stageId, y.stageId)
- }
- case "Pool Name" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.String.compare(x.schedulingPool, y.schedulingPool)
- }
- case "Description" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.String.compare(x.description, y.description)
- }
- case "Submitted" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.submissionTime, y.submissionTime)
- }
- case "Duration" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.duration, y.duration)
- }
- case "Input" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.inputRead, y.inputRead)
- }
- case "Output" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.outputWrite, y.outputWrite)
- }
- case "Shuffle Read" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.shuffleRead, y.shuffleRead)
- }
- case "Shuffle Write" => new Ordering[StageTableRowData] {
- override def compare(x: StageTableRowData, y: StageTableRowData): Int =
- Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite)
- }
+ val ordering: Ordering[StageTableRowData] = sortColumn match {
+ case "Stage Id" => Ordering.by(_.stageId)
+ case "Pool Name" => Ordering.by(_.schedulingPool)
+ case "Description" => Ordering.by(x => (x.descriptionOption, x.stageInfo.name))
+ case "Submitted" => Ordering.by(_.submissionTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Input" => Ordering.by(_.inputRead)
+ case "Output" => Ordering.by(_.outputWrite)
+ case "Shuffle Read" => Ordering.by(_.shuffleRead)
+ case "Shuffle Write" => Ordering.by(_.shuffleWrite)
case "Tasks: Succeeded/Total" =>
throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 606d15d599..227e940c9c 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -197,27 +197,12 @@ private[ui] class BlockDataSource(
* Return Ordering according to sortColumn and desc
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = {
- val ordering = sortColumn match {
- case "Block Name" => new Ordering[BlockTableRowData] {
- override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
- Ordering.String.compare(x.blockName, y.blockName)
- }
- case "Storage Level" => new Ordering[BlockTableRowData] {
- override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
- Ordering.String.compare(x.storageLevel, y.storageLevel)
- }
- case "Size in Memory" => new Ordering[BlockTableRowData] {
- override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
- Ordering.Long.compare(x.memoryUsed, y.memoryUsed)
- }
- case "Size on Disk" => new Ordering[BlockTableRowData] {
- override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
- Ordering.Long.compare(x.diskUsed, y.diskUsed)
- }
- case "Executors" => new Ordering[BlockTableRowData] {
- override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
- Ordering.String.compare(x.executors, y.executors)
- }
+ val ordering: Ordering[BlockTableRowData] = sortColumn match {
+ case "Block Name" => Ordering.by(_.blockName)
+ case "Storage Level" => Ordering.by(_.storageLevel)
+ case "Size in Memory" => Ordering.by(_.memoryUsed)
+ case "Size on Disk" => Ordering.by(_.diskUsed)
+ case "Executors" => Ordering.by(_.executors)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {