aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJean-Baptiste Onofré <jbonofre@apache.org>2015-11-12 15:46:21 -0800
committerAndrew Or <andrew@databricks.com>2015-11-12 15:46:21 -0800
commit74c30049a8bf9841eeca48f827572c2044912e21 (patch)
tree5def06d6758becb8897244ccdad2c6b0235e6e61 /core
parent380dfcc0dc865d361a97bb045a2ac546dacfdba9 (diff)
downloadspark-74c30049a8bf9841eeca48f827572c2044912e21.tar.gz
spark-74c30049a8bf9841eeca48f827572c2044912e21.tar.bz2
spark-74c30049a8bf9841eeca48f827572c2044912e21.zip
[SPARK-2533] Add locality levels on stage summary view
Author: Jean-Baptiste Onofré <jbonofre@apache.org> Closes #9487 from jbonofre/SPARK-2533-2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 51425e599e..1b34ba9f03 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.{InternalAccumulator, SparkConf}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
+import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}
@@ -70,6 +70,21 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
+ private def getLocalitySummaryString(stageData: StageUIData): String = {
+ val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
+ val localityCounts = localities.groupBy(identity).mapValues(_.size)
+ val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) =>
+ val localityName = locality match {
+ case TaskLocality.PROCESS_LOCAL => "Process local"
+ case TaskLocality.NODE_LOCAL => "Node local"
+ case TaskLocality.RACK_LOCAL => "Rack local"
+ case TaskLocality.ANY => "Any"
+ }
+ s"$localityName: $count"
+ }
+ localityNamesAndCounts.sorted.mkString("; ")
+ }
+
def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
val parameterId = request.getParameter("id")
@@ -129,6 +144,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<strong>Total Time Across All Tasks: </strong>
{UIUtils.formatDuration(stageData.executorRunTime)}
</li>
+ <li>
+ <strong>Locality Level Summary: </strong>
+ {getLocalitySummaryString(stageData)}
+ </li>
{if (stageData.hasInput) {
<li>
<strong>Input Size / Records: </strong>