aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-07-20 14:45:34 -0700
committerAaron Davidson <aaron@databricks.com>2014-07-20 14:45:34 -0700
commit9564f8548917f563930d5e87911a304bf206d26e (patch)
treeb86f56d0551ef57bd681ccd4c3f280668db78aef /core/src
parent1b10b8114a396f94fc82b0f3af1a5f66dfa0945d (diff)
downloadspark-9564f8548917f563930d5e87911a304bf206d26e.tar.gz
spark-9564f8548917f563930d5e87911a304bf206d26e.tar.bz2
spark-9564f8548917f563930d5e87911a304bf206d26e.zip
SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant
Author: Sandy Ryza <sandy@cloudera.com> Closes #1474 from sryza/sandy-spark-2564 and squashes the following commits: 35b8388 [Sandy Ryza] Fix compile error on upmerge 7b985fb [Sandy Ryza] Fix test compile error 43f79e6 [Sandy Ryza] SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala3
5 files changed, 2 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 5d59e00636..21fe643b8d 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -99,7 +99,6 @@ class TaskMetrics extends Serializable {
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
- existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
case None =>
_shuffleReadMetrics = Some(newMetrics)
@@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
- var totalBlocksFetched: Int = _
+ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
/**
* Number of remote blocks fetched in this shuffle by this task
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 3795994cd9..9978882898 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
- shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 2f0296c20f..69905a960a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
- def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def fetchWaitTime: Long
@@ -192,7 +191,7 @@ object BlockFetcherIterator {
}
}
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
- totalBlocks + " blocks")
+ (numLocal + numRemote) + " blocks")
remoteRequests
}
@@ -235,7 +234,6 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}
- override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def fetchWaitTime: Long = _fetchWaitTime
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2ff8b25a56..3448aaaf57 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -237,7 +237,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
- ("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@@ -548,7 +547,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
- metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 11f70a6090..9305b6d973 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -314,7 +314,6 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
- assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched)
assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
@@ -513,7 +512,6 @@ class JsonProtocolSuite extends FunSuite {
} else {
val sr = new ShuffleReadMetrics
sr.shuffleFinishTime = b + c
- sr.totalBlocksFetched = e + f
sr.remoteBytesRead = b + d
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
@@ -584,7 +582,6 @@ class JsonProtocolSuite extends FunSuite {
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
| "Shuffle Read Metrics":{
| "Shuffle Finish Time":900,
- | "Total Blocks Fetched":1500,
| "Remote Blocks Fetched":800,
| "Local Blocks Fetched":700,
| "Fetch Wait Time":900,