aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
6 files changed, 27 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index ac73288442..5d59e00636 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
- var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+ private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+
+ def shuffleReadMetrics = _shuffleReadMetrics
/**
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
@@ -87,6 +89,22 @@ class TaskMetrics extends Serializable {
* Storage statuses of any blocks that have been updated as a result of this task.
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
+
+ /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
+ def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
+ _shuffleReadMetrics match {
+ case Some(existingMetrics) =>
+ existingMetrics.shuffleFinishTime = math.max(
+ existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
+ existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
+ existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
+ existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
+ existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
+ existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
+ case None =>
+ _shuffleReadMetrics = Some(newMetrics)
+ }
+ }
}
private[spark] object TaskMetrics {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index a932455776..3795994cd9 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
- context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
+ context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
})
new InterruptibleIterator[T](context, completionIter)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 47eb44b530..2ff8b25a56 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
- metrics.shuffleReadMetrics =
- Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
+ Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics =>
+ metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
+ }
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 71f48e295e..3b0b8e2f68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
- sm.totalBlocksFetched should be > (0)
- sm.localBlocksFetched should be > (0)
+ sm.totalBlocksFetched should be (128)
+ sm.localBlocksFetched should be (128)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index a855662480..b52f81877d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
@@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
@@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead === 2000)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 058d314530..11f70a6090 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
sr.remoteBlocksFetched = f
- t.shuffleReadMetrics = Some(sr)
+ t.updateShuffleReadMetrics(sr)
}
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d