aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-21 16:54:23 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-21 16:54:23 -0800
commitfc78f67da2fd28744e8119e28f4bb8a29926b3ad (patch)
tree6a8cd583c838f225eaf8ddd094ff025b1236eec6 /core
parent2fead510f74b962b293de4d724136c24a9825271 (diff)
downloadspark-fc78f67da2fd28744e8119e28f4bb8a29926b3ad.tar.gz
spark-fc78f67da2fd28744e8119e28f4bb8a29926b3ad.tar.bz2
spark-fc78f67da2fd28744e8119e28f4bb8a29926b3ad.zip
Added logging of scheduler delays to UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala33
1 files changed, 31 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fbd822867f..fc8c334cb5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -60,11 +60,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
var activeTime = 0L
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
+ val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
+
val summary =
<div>
<ul class="unstyled">
<li>
- <strong>CPU time: </strong>
+ <strong>Total duration across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@@ -104,6 +106,30 @@ private[spark] class StagePage(parent: JobProgressUI) {
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
+ val gettingResultTimes = validTasks.map{case (info, metrics, exception) =>
+ if (info.gettingResultTime > 0) {
+ (info.finishTime - info.gettingResultTime).toDouble
+ } else {
+ 0.0
+ }
+ }
+ val gettingResultQuantiles = ("Time spent fetching task results" +:
+ Distribution(gettingResultTimes).get.getQuantiles().map(
+ millis => parent.formatDuration(millis.toLong)))
+ // The scheduler delay includes the network delay to send the task to the worker
+ // machine and to send back the result (but not the time to fetch the task result,
+ // if it needed to be fetched from the block manager on the worker).
+ val schedulerDelays = validTasks.map{case (info, metrics, exception) =>
+ if (info.gettingResultTime > 0) {
+ (info.gettingResultTime - info.launchTime).toDouble
+ } else {
+ (info.finishTime - info.launchTime).toDouble
+ }
+ }
+ val schedulerDelayQuantiles = ("Scheduler delay" +:
+ Distribution(schedulerDelays).get.getQuantiles().map(
+ millis => parent.formatDuration(millis.toLong)))
+
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
@@ -119,7 +145,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ val listings: Seq[Seq[String]] = Seq(
+ serviceQuantiles,
+ gettingResultQuantiles,
+ schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)