From 48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Mon, 9 Dec 2013 00:02:59 +0800 Subject: SPARK-968, In stage UI, add an overview section that shows task stats grouped by executor id --- .../org/apache/spark/ui/jobs/ExecutorSummary.scala | 27 ++++++++ .../org/apache/spark/ui/jobs/ExecutorTable.scala | 73 ++++++++++++++++++++++ .../scala/org/apache/spark/ui/jobs/IndexPage.scala | 7 +++ .../apache/spark/ui/jobs/JobProgressListener.scala | 38 +++++++++++ 4 files changed, 145 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala new file mode 100644 index 0000000000..f2ee12081c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +private[spark] class ExecutorSummary() { + var duration : Long = 0 + var totalTasks : Int = 0 + var failedTasks : Int = 0 + var succeedTasks : Int = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala new file mode 100644 index 0000000000..c6823cd823 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + + +import scala.xml.Node + +import org.apache.spark.scheduler.SchedulingMode + + +/** Page showing executor summary */ +private[spark] class ExecutorTable(val parent: JobProgressUI) { + + val listener = parent.listener + val dateFmt = parent.dateFmt + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + + def toNodeSeq(): Seq[Node] = { + listener.synchronized { + executorTable() + } + } + + /** Special table which merges two header cells. */ + private def executorTable[T](): Seq[Node] = { + + + + + + + + + + + + {createExecutorTable()} + +
Executor IDDuration#Tasks#Failed Tasks#Succeed TasksShuffle ReadShuffle Write
+ } + + private def createExecutorTable() : Seq[Node] = { + val executorIdToSummary = listener.executorIdToSummary + executorIdToSummary.toSeq.sortBy(_._1).map{ + case (k,v) => { + + {k} + {v.duration} ms + {v.totalTasks} + {v.failedTasks} + {v.succeedTasks} + {v.shuffleRead} + {v.shuffleWrite} + + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index ca5a28625b..653a84b60f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -45,6 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + val executorTable = new ExecutorTable(parent) val pools = listener.sc.getAllPools val poolTable = new PoolTable(pools, listener) @@ -56,6 +57,10 @@ private[spark] class IndexPage(parent: JobProgressUI) { {parent.formatDuration(now - listener.sc.startTime)}
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • +
  • + Executor Summary: + {listener.executorIdToSummary.size} +
  • Active Stages: {activeStages.size} @@ -77,6 +82,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { } else { Seq() }} ++ +

    Executor Summary

    ++ + executorTable.toNodeSeq++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq++

    Completed Stages ({completedStages.size})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6b854740d6..2635478592 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + val executorIdToSummary = HashMap[String, ExecutorSummary]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -114,6 +115,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageIdToTaskInfos(sid) = taskList + val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId, + op = new ExecutorSummary()) + executorSummary.totalTasks += 1 } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) @@ -123,9 +127,43 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + // update executor summary + val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId) + executorSummary match { + case Some(x) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case e: ExceptionFailure => + x.failedTasks += 1 + case _ => + x.succeedTasks += 1 + } + + // update duration + x.duration += taskEnd.taskInfo.duration + + // update shuffle read/write + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + x.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + x.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} + } + } + case _ => {} + } + val sid = taskEnd.task.stageId val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => -- cgit v1.2.3 From 36060f4f50ead2632117bb12e8c5bc1fb4f91f1e Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Tue, 17 Dec 2013 17:55:38 +0800 Subject: spark-898, changes according to review comments --- .../org/apache/spark/ui/exec/ExecutorsUI.scala | 39 +++++++++++++++++-- .../org/apache/spark/ui/jobs/ExecutorSummary.scala | 3 +- .../org/apache/spark/ui/jobs/ExecutorTable.scala | 40 ++++++++++--------- .../scala/org/apache/spark/ui/jobs/IndexPage.scala | 5 +-- .../apache/spark/ui/jobs/JobProgressListener.scala | 31 ++++++++------- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 3 +- .../spark/ui/jobs/JobProgressListenerSuite.scala | 45 ++++++---------------- 7 files changed, 90 insertions(+), 76 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index e596690bc3..808bbe8c8f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks") + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read", + "Shuffle Write") def execRow(kv: Seq[String]) = { @@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { {kv(7)} {kv(8)} {kv(9)} + {Utils.msDurationToString(kv(10).toLong)} + {Utils.bytesToString(kv(11).toLong)} + {Utils.bytesToString(kv(12).toLong)} } @@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks + val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) + val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) Seq( execId, @@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks.toString, failedTasks.toString, completedTasks.toString, - totalTasks.toString + totalTasks.toString, + totalDuration.toString, + totalShuffleRead.toString, + totalShuffleWrite.toString ) } @@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId @@ -137,9 +150,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks += taskStart.taskInfo } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) + val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration + executorToDuration.put(eid, newDuration) + activeTasks -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { @@ -150,6 +166,23 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } + + // update shuffle read/write + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead + executorToShuffleRead.put(eid, newShuffleRead) + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten + executorToShuffleWrite.put(eid, newShuffleWrite) + } + case _ => {} + } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index f2ee12081c..75c0dd2c7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -19,9 +19,8 @@ package org.apache.spark.ui.jobs private[spark] class ExecutorSummary() { var duration : Long = 0 - var totalTasks : Int = 0 var failedTasks : Int = 0 - var succeedTasks : Int = 0 + var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index c6823cd823..763d5a344b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -17,14 +17,13 @@ package org.apache.spark.ui.jobs - import scala.xml.Node import org.apache.spark.scheduler.SchedulingMode - +import org.apache.spark.util.Utils /** Page showing executor summary */ -private[spark] class ExecutorTable(val parent: JobProgressUI) { +private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { val listener = parent.listener val dateFmt = parent.dateFmt @@ -42,9 +41,9 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) { Executor ID Duration - #Tasks - #Failed Tasks - #Succeed Tasks + Total Tasks + Failed Tasks + Succeeded Tasks Shuffle Read Shuffle Write @@ -55,19 +54,24 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) { } private def createExecutorTable() : Seq[Node] = { - val executorIdToSummary = listener.executorIdToSummary - executorIdToSummary.toSeq.sortBy(_._1).map{ - case (k,v) => { - - {k} - {v.duration} ms - {v.totalTasks} - {v.failedTasks} - {v.succeedTasks} - {v.shuffleRead} - {v.shuffleWrite} - + val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) + executorIdToSummary match { + case Some(x) => { + x.toSeq.sortBy(_._1).map{ + case (k,v) => { + + {k} + {parent.formatDuration(v.duration)} + {v.failedTasks + v.succeededTasks} + {v.failedTasks} + {v.succeededTasks} + {Utils.bytesToString(v.shuffleRead)} + {Utils.bytesToString(v.shuffleWrite)} + + } + } } + case _ => { Seq[Node]() } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 653a84b60f..854afb665a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -45,7 +45,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - val executorTable = new ExecutorTable(parent) val pools = listener.sc.getAllPools val poolTable = new PoolTable(pools, listener) @@ -59,7 +58,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • Executor Summary: - {listener.executorIdToSummary.size} + {listener.stageIdToExecutorSummaries.size}
  • Active Stages: @@ -82,8 +81,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { } else { Seq() }} ++ -

    Executor Summary

    ++ - executorTable.toNodeSeq++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq++

    Completed Stages ({completedStages.size})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2635478592..8c92ff19a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,7 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() - val executorIdToSummary = HashMap[String, ExecutorSummary]() + val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -115,9 +115,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageIdToTaskInfos(sid) = taskList - val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId, - op = new ExecutorSummary()) - executorSummary.totalTasks += 1 } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) @@ -127,32 +124,39 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - // update executor summary - val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId) + val sid = taskEnd.task.stageId + + // create executor summary map if necessary + val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, + op = new HashMap[String, ExecutorSummary]()) + executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId, + op = new ExecutorSummary()) + + val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId) executorSummary match { - case Some(x) => { + case Some(y) => { // first update failed-task, succeed-task taskEnd.reason match { - case e: ExceptionFailure => - x.failedTasks += 1 + case Success => + y.succeededTasks += 1 case _ => - x.succeedTasks += 1 + y.failedTasks += 1 } // update duration - x.duration += taskEnd.taskInfo.duration + y.duration += taskEnd.taskInfo.duration // update shuffle read/write val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics shuffleRead match { case Some(s) => - x.shuffleRead += s.remoteBytesRead + y.shuffleRead += s.remoteBytesRead case _ => {} } val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics shuffleWrite match { case Some(s) => { - x.shuffleWrite += s.shuffleBytesWritten + y.shuffleWrite += s.shuffleBytesWritten } case _ => {} } @@ -160,7 +164,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList case _ => {} } - val sid = taskEnd.task.stageId val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 69f9446bab..c077613b1d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -160,9 +160,10 @@ private[spark] class StagePage(parent: JobProgressUI) { def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } - + val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ +

    Summary Metrics for Executors

    ++ executorTable.toNodeSeq() ++

    Summary Metrics for {numCompleted} Completed Tasks

    ++
    {summaryTable.getOrElse("No tasks have reported metrics yet.")}
    ++

    Tasks

    ++ taskTable diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 861d37a862..67a57a0e7f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -19,26 +19,19 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.apache.spark.scheduler._ -import org.apache.spark.SparkContext -import org.apache.spark.Success +import org.apache.spark.{LocalSparkContext, SparkContext, Success} import org.apache.spark.scheduler.SparkListenerTaskStart import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} -class JobProgressListenerSuite extends FunSuite { +class JobProgressListenerSuite extends FunSuite with LocalSparkContext { test("test executor id to summary") { - val sc = new SparkContext("local", "joblogger") + val sc = new SparkContext("local", "test") val listener = new JobProgressListener(sc) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() // nothing in it - assert(listener.executorIdToSummary.size == 0) - - // launched a task, should get an item in map - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 1) + assert(listener.stageIdToExecutorSummaries.size == 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -47,20 +40,15 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.size == 1) - - // launched a task - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 1) + assert(listener.stageIdToExecutorSummaries.size == 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -69,13 +57,8 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000) - - // launched a task in another exec - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 2) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 2000) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -84,13 +67,7 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000) - - // do finalize - sc.stop() - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) + .shuffleRead == 1000) } } -- cgit v1.2.3 From 59e53fa21caa202a57093c74ada128fca2be5bac Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Tue, 17 Dec 2013 17:57:27 +0800 Subject: spark-968, changes for avoid a NPE --- .../org/apache/spark/ui/exec/ExecutorsUI.scala | 30 ++++++++++++---------- .../apache/spark/ui/jobs/JobProgressListener.scala | 24 +++++++++-------- 2 files changed, 29 insertions(+), 25 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 808bbe8c8f..f62ae37466 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -150,7 +150,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks += taskStart.taskInfo } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration @@ -168,20 +168,22 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead + executorToShuffleRead.put(eid, newShuffleRead) + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten + executorToShuffleWrite.put(eid, newShuffleWrite) + } + case _ => {} } - case _ => {} } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8c92ff19a6..64ce715993 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -147,18 +147,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList y.duration += taskEnd.taskInfo.duration // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + y.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + y.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} } - case _ => {} } } case _ => {} -- cgit v1.2.3 From c979eecdf6a11462595aba9d5b8fc942682cf85d Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Sun, 22 Dec 2013 21:43:15 +0800 Subject: added changes according to comments from rxin --- .../org/apache/spark/ui/exec/ExecutorsUI.scala | 24 +++++++-------------- .../org/apache/spark/ui/jobs/ExecutorSummary.scala | 5 +++-- .../org/apache/spark/ui/jobs/ExecutorTable.scala | 4 ++-- .../scala/org/apache/spark/ui/jobs/IndexPage.scala | 4 ---- .../apache/spark/ui/jobs/JobProgressListener.scala | 25 +++++++--------------- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- 7 files changed, 24 insertions(+), 44 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index f62ae37466..a31a7e1d58 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read", + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read", "Shuffle Write") def execRow(kv: Seq[String]) = { @@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { // update shuffle read/write if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) - } - case _ => {} - } + taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead => + executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) + + shuffleRead.remoteBytesRead)) + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite => + executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) + + shuffleWrite.shuffleBytesWritten)) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 75c0dd2c7f..3c53e88380 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -17,8 +17,9 @@ package org.apache.spark.ui.jobs -private[spark] class ExecutorSummary() { - var duration : Long = 0 +/** class for reporting aggregated metrics for each executors in stageUI */ +private[spark] class ExecutorSummary { + var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 var shuffleRead : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 763d5a344b..0e9dd4a8c7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) - + @@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) case (k,v) => { - + diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 854afb665a..ca5a28625b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -56,10 +56,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { {parent.formatDuration(now - listener.sc.startTime)}
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • -
  • - Executor Summary: - {listener.stageIdToExecutorSummaries.size} -
  • Active Stages: {activeStages.size} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 64ce715993..07a42f0503 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } // update duration - y.duration += taskEnd.taskInfo.duration - - // update shuffle read/write - if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten - } - case _ => {} - } + y.taskTime += taskEnd.taskInfo.duration + + taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => + y.shuffleRead += shuffleRead.remoteBytesRead + } + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite => + y.shuffleWrite += shuffleWrite.shuffleBytesWritten } } case _ => {} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c077613b1d..d8a6c9e2dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
    • - Total duration across all tasks: + Total task time across all tasks: {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
    • {if (hasShuffleRead) @@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ -

      Summary Metrics for Executors

      ++ executorTable.toNodeSeq() ++

      Summary Metrics for {numCompleted} Completed Tasks

      ++
      {summaryTable.getOrElse("No tasks have reported metrics yet.")}
      ++ +

      Aggregated Metrics by Executors

      ++ executorTable.toNodeSeq() ++

      Tasks

      ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9ad6de3c6d..463d85dfd5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr {if (isFairScheduler) {
  • } else {}} - + -- cgit v1.2.3 From 2f689ba97b437092bf52063cface12aa9ee09bf3 Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Mon, 23 Dec 2013 15:03:45 +0800 Subject: SPARK-968, added executor address showing in aggregated metrics by executors table --- .../main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0e9dd4a8c7..0dd876480a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -21,6 +21,7 @@ import scala.xml.Node import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.util.Utils +import scala.collection.mutable /** Page showing executor summary */ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { @@ -40,6 +41,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
    Executor IDDurationTask Time Total Tasks Failed Tasks Succeeded Tasks
    {k}{parent.formatDuration(v.duration)}{parent.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} Pool NameDescription SubmittedDurationTask Time Tasks: Succeeded/Total Shuffle Read Shuffle Write
    + @@ -54,6 +56,16 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) } private def createExecutorTable() : Seq[Node] = { + // make a executor-id -> address map + val executorIdToAddress = mutable.HashMap[String, String]() + val storageStatusList = parent.sc.getExecutorStorageStatus + for (statusId <- 0 until storageStatusList.size) { + val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId + val address = blockManagerId.hostPort + val executorId = blockManagerId.executorId + executorIdToAddress.put(executorId, address) + } + val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) executorIdToSummary match { case Some(x) => { @@ -61,6 +73,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) case (k,v) => { + -- cgit v1.2.3
    Executor IDAddress Task Time Total Tasks Failed Tasks
    {k}{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} {parent.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks}