aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 51425e599e..1b34ba9f03 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.{InternalAccumulator, SparkConf}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
+import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}
@@ -70,6 +70,21 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
+ private def getLocalitySummaryString(stageData: StageUIData): String = {
+ val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
+ val localityCounts = localities.groupBy(identity).mapValues(_.size)
+ val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) =>
+ val localityName = locality match {
+ case TaskLocality.PROCESS_LOCAL => "Process local"
+ case TaskLocality.NODE_LOCAL => "Node local"
+ case TaskLocality.RACK_LOCAL => "Rack local"
+ case TaskLocality.ANY => "Any"
+ }
+ s"$localityName: $count"
+ }
+ localityNamesAndCounts.sorted.mkString("; ")
+ }
+
def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
val parameterId = request.getParameter("id")
@@ -129,6 +144,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<strong>Total Time Across All Tasks: </strong>
{UIUtils.formatDuration(stageData.executorRunTime)}
</li>
+ <li>
+ <strong>Locality Level Summary: </strong>
+ {getLocalitySummaryString(stageData)}
+ </li>
{if (stageData.hasInput) {
<li>
<strong>Input Size / Records: </strong>