aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-07-18 14:40:32 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2014-07-18 14:40:32 -0700
commit7b971b91caeebda57f1506ffc4fd266a1b379290 (patch)
treecf7e04ce6243d83b370901999713b14e8afa9c3a /core/src/test
parent7f1720813793e155743b58eae5228298e894b90d (diff)
downloadspark-7b971b91caeebda57f1506ffc4fd266a1b379290.tar.gz
spark-7b971b91caeebda57f1506ffc4fd266a1b379290.tar.bz2
spark-7b971b91caeebda57f1506ffc4fd266a1b379290.zip
[SPARK-2571] Correctly report shuffle read metrics.
Currently, shuffle read metrics are incorrectly reported when stages have multiple shuffle dependencies (they are set to be the metrics from just one of the shuffle dependencies, rather than the accumulated metrics from all of the shuffle dependencies). This fixes that problem, and should probably be back-ported to the 0.9 branch. Thanks ryanra for discovering this problem! cc rxin andrewor14 Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1476 from kayousterhout/join_bug and squashes the following commits: 0203a16 [Kay Ousterhout] Fix broken unit tests. f463c2e [Kay Ousterhout] [SPARK-2571] Correctly report shuffle read metrics.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
3 files changed, 4 insertions, 8 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 71f48e295e..3b0b8e2f68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
- sm.totalBlocksFetched should be > (0)
- sm.localBlocksFetched should be > (0)
+ sm.totalBlocksFetched should be (128)
+ sm.localBlocksFetched should be (128)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index a855662480..b52f81877d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
@@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
@@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead === 2000)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 058d314530..11f70a6090 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
sr.remoteBlocksFetched = f
- t.shuffleReadMetrics = Some(sr)
+ t.updateShuffleReadMetrics(sr)
}
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d