diff options
author | Jean-Baptiste Onofré <jbonofre@apache.org> | 2015-11-12 15:46:21 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-11-12 15:46:21 -0800 |
commit | 74c30049a8bf9841eeca48f827572c2044912e21 (patch) | |
tree | 5def06d6758becb8897244ccdad2c6b0235e6e61 /core | |
parent | 380dfcc0dc865d361a97bb045a2ac546dacfdba9 (diff) | |
download | spark-74c30049a8bf9841eeca48f827572c2044912e21.tar.gz spark-74c30049a8bf9841eeca48f827572c2044912e21.tar.bz2 spark-74c30049a8bf9841eeca48f827572c2044912e21.zip |
[SPARK-2533] Add locality levels on stage summary view
Author: Jean-Baptiste Onofré <jbonofre@apache.org>
Closes #9487 from jbonofre/SPARK-2533-2.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 51425e599e..1b34ba9f03 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.{InternalAccumulator, SparkConf} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} +import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} @@ -70,6 +70,21 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) + private def getLocalitySummaryString(stageData: StageUIData): String = { + val localities = stageData.taskData.values.map(_.taskInfo.taskLocality) + val localityCounts = localities.groupBy(identity).mapValues(_.size) + val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) => + val localityName = locality match { + case TaskLocality.PROCESS_LOCAL => "Process local" + case TaskLocality.NODE_LOCAL => "Node local" + case TaskLocality.RACK_LOCAL => "Rack local" + case TaskLocality.ANY => "Any" + } + s"$localityName: $count" + } + localityNamesAndCounts.sorted.mkString("; ") + } + def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { val parameterId = request.getParameter("id") @@ -129,6 +144,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <strong>Total Time Across All Tasks: </strong> {UIUtils.formatDuration(stageData.executorRunTime)} </li> + <li> + <strong>Locality Level Summary: </strong> + {getLocalitySummaryString(stageData)} + </li> {if (stageData.hasInput) { <li> <strong>Input Size / Records: </strong> |