aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-03-03 16:12:00 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2014-03-03 16:12:00 -0800
commitb55cade853003d86356a50c6dba82210c8adb667 (patch)
treee9862ee43d3e0517ae69135f91a2832afcec02e9 /core
parent9d225a91043ac92a0e727ba281b10c250a945614 (diff)
downloadspark-b55cade853003d86356a50c6dba82210c8adb667.tar.gz
spark-b55cade853003d86356a50c6dba82210c8adb667.tar.bz2
spark-b55cade853003d86356a50c6dba82210c8adb667.zip
Remove the remoteFetchTime metric.
This metric is confusing: it adds up all of the time to fetch shuffle inputs, but fetches often happen in parallel, so remoteFetchTime can be much longer than the task execution time. @squito it looks like you added this metric -- do you have a use case for it? cc @shivaram -- I know you've looked at the shuffle performance a lot so chime in here if this metric has turned out to be useful for you! Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #62 from kayousterhout/remove_fetch_variable and squashes the following commits: 43341eb [Kay Ousterhout] Remote the remoteFetchTime metric.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala1
5 files changed, 0 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
index 754b46a4c7..a67392441e 100644
--- a/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/BlockStoreShuffleFetcher.scala
@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
- shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 455339943f..760458cb02 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -104,13 +104,6 @@ class ShuffleReadMetrics extends Serializable {
var fetchWaitTime: Long = _
/**
- * Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all
- * input blocks. Since block fetches are both pipelined and parallelized, this can
- * exceed fetchWaitTime and executorRunTime.
- */
- var remoteFetchTime: Long = _
-
- /**
* Total number of remote bytes read from the shuffle by this task
*/
var remoteBytesRead: Long = _
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 006e2a3335..80f9ec7d03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
- " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
case None => ""
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index fb50b45bd4..bcfc39146a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
- def remoteFetchTime: Long
def fetchWaitTime: Long
def remoteBytesRead: Long
}
@@ -79,7 +78,6 @@ object BlockFetcherIterator {
import blockManager._
private var _remoteBytesRead = 0L
- private var _remoteFetchTime = 0L
private var _fetchWaitTime = 0L
if (blocksByAddress == null) {
@@ -125,7 +123,6 @@ object BlockFetcherIterator {
future.onSuccess {
case Some(message) => {
val fetchDone = System.currentTimeMillis()
- _remoteFetchTime += fetchDone - fetchStart
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
for (blockMessage <- blockMessageArray) {
@@ -241,7 +238,6 @@ object BlockFetcherIterator {
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
- override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 368c5154ea..7c4f2b4361 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
- sm.remoteFetchTime should be (0l)
}
}
}