aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-08-17 22:39:06 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-17 22:39:06 -0700
commitdf652ea02a3e42d987419308ef14874300347373 (patch)
tree91401e1f9ca6b0298c62c7d3fc7c137d42c8cfd8 /core
parent5173f3c40f6b64f224f11364e038953826013895 (diff)
downloadspark-df652ea02a3e42d987419308ef14874300347373.tar.gz
spark-df652ea02a3e42d987419308ef14874300347373.tar.bz2
spark-df652ea02a3e42d987419308ef14874300347373.zip
SPARK-2900. aggregate inputBytes per stage
Author: Sandy Ryza <sandy@cloudera.com> Closes #1826 from sryza/sandy-spark-2900 and squashes the following commits: 43f9091 [Sandy Ryza] SPARK-2900
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala9
2 files changed, 14 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index a3e9566832..74cd637d88 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.shuffleReadBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
+ val inputBytesDelta =
+ (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
+ - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
+ stageData.inputBytes += inputBytesDelta
+ execSummary.inputBytes += inputBytesDelta
+
val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index f5ba31c309..147ec0bc52 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.Matchers
import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
-import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
+import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.executorRunTime = base + 4
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ taskMetrics.inputMetrics = Some(inputMetrics)
+ inputMetrics.bytesRead = base + 7
taskMetrics
}
@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 205)
assert(stage0Data.memoryBytesSpilled == 112)
assert(stage1Data.memoryBytesSpilled == 206)
+ assert(stage0Data.inputBytes == 114)
+ assert(stage1Data.inputBytes == 207)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 610)
assert(stage0Data.memoryBytesSpilled == 412)
assert(stage1Data.memoryBytesSpilled == 612)
+ assert(stage0Data.inputBytes == 414)
+ assert(stage1Data.inputBytes == 614)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get