diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-04-29 11:32:07 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-04-29 11:32:07 -0700 |
commit | 540be6b1544d26c7db79ec84a98fc6696c7c6434 (patch) | |
tree | ea72de3a8b8ff6de64d36e22333dfe76468ef9cc | |
parent | 224fbac0612d5c35259cc9f4963dcd4a65ecc832 (diff) | |
download | spark-540be6b1544d26c7db79ec84a98fc6696c7c6434.tar.gz spark-540be6b1544d26c7db79ec84a98fc6696c7c6434.tar.bz2 spark-540be6b1544d26c7db79ec84a98fc6696c7c6434.zip |
Modified version of the fix which just removes all per-record tracking.
4 files changed, 2 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 83c22b1f14..ce61d27448 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} +import util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { - new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } else { - new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } + val itr = blockFetcherItr.flatMap(unpackBlock) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 45f6d43971..a7c56c2371 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -49,11 +49,6 @@ class ShuffleReadMetrics extends Serializable { var localBlocksFetched: Int = _ /** - * Total time to read shuffle data - */ - var shuffleReadMillis: Option[Long] = _ - - /** * Total time that is spent blocked waiting for shuffle to fetch data */ var fetchWaitTime: Long = _ diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala deleted file mode 100644 index 49f1276b4e..0000000000 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ /dev/null @@ -1,50 +0,0 @@ -package spark.util - -/** - * A utility for tracking the the time an iterator takes to iterate through its elements. - */ -trait TimedIterator { - def getNetMillis: Option[Long] - def getAverageTimePerItem: Option[Double] -} - -/** - * A TimedIterator which uses System.currentTimeMillis() on every call to next(). - * - * In general, this should only be used if you expect it to take a considerable amount of time - * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, - * and you are probably just adding more overhead - */ -class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - private var netMillis = 0l - private var nElems = 0 - def hasNext = { - val start = System.currentTimeMillis() - val r = sub.hasNext - val end = System.currentTimeMillis() - netMillis += (end - start) - r - } - def next = { - val start = System.currentTimeMillis() - val r = sub.next - val end = System.currentTimeMillis() - netMillis += (end - start) - nElems += 1 - r - } - - def getNetMillis = Some(netMillis) - def getAverageTimePerItem = Some(netMillis / nElems.toDouble) - -} - -/** - * A TimedIterator which doesn't perform any timing measurements. - */ -class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - def hasNext = sub.hasNext - def next = sub.next - def getNetMillis = None - def getAverageTimePerItem = None -} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 5ccab369db..42a87d8b90 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) |