diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-04-29 10:10:14 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-04-29 11:13:43 -0700 |
commit | 224fbac0612d5c35259cc9f4963dcd4a65ecc832 (patch) | |
tree | 8c287179a6db88364bd5f5fa6d8fda5d8c7cde2f /core | |
parent | 1f20ef256715e5a84ba1661e235b6eda21a70b5b (diff) | |
download | spark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.tar.gz spark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.tar.bz2 spark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.zip |
Spark-742: TaskMetrics should not employ per-record timing.
This patch does three things:
1. Makes TimedIterator a trait with two implementations (one a no-op)
2. Makes the default behavior to use the no-op implementation
3. Removes DelegateBlockFetchTracker. This is just cleanup, but it seems like
the triat doesn't really reduce complexity in any way.
In the future we can add other implementations, e.g. ones which perform sampling.
Diffstat (limited to 'core')
5 files changed, 37 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index c27ed36406..83c22b1f14 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.{DelegateBlockFetchTracker, BlockManagerId} -import util.{CompletionIterator, TimedIterator} +import spark.storage.BlockManagerId +import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,17 +49,20 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker - itr.setDelegate(blockFetcherItr) + val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { + new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } else { + new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis - shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.fetchWaitTime = itr.fetchWaitTime - shuffleMetrics.remoteBytesRead = itr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = itr.totalBlocks - shuffleMetrics.localBlocksFetched = itr.numLocalBlocks - shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks + shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime + shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime + shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead + shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks + shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks + shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks metrics.shuffleReadMetrics = Some(shuffleMetrics) }) } diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 93bbb6b458..45f6d43971 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -51,7 +51,7 @@ class ShuffleReadMetrics extends Serializable { /** * Total time to read shuffle data */ - var shuffleReadMillis: Long = _ + var shuffleReadMillis: Option[Long] = _ /** * Total time that is spent blocked waiting for shuffle to fetch data diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala deleted file mode 100644 index f6c28dce52..0000000000 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ /dev/null @@ -1,12 +0,0 @@ -package spark.storage - -private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { - var delegate : BlockFetchTracker = _ - def setDelegate(d: BlockFetchTracker) {delegate = d} - def totalBlocks = delegate.totalBlocks - def numLocalBlocks = delegate.numLocalBlocks - def numRemoteBlocks = delegate.numRemoteBlocks - def remoteFetchTime = delegate.remoteFetchTime - def fetchWaitTime = delegate.fetchWaitTime - def remoteBytesRead = delegate.remoteBytesRead -} diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala index 539b01f4ce..49f1276b4e 100644 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ b/core/src/main/scala/spark/util/TimedIterator.scala @@ -1,13 +1,21 @@ package spark.util /** - * A utility for tracking the total time an iterator takes to iterate through its elements. + * A utility for tracking the the time an iterator takes to iterate through its elements. + */ +trait TimedIterator { + def getNetMillis: Option[Long] + def getAverageTimePerItem: Option[Double] +} + +/** + * A TimedIterator which uses System.currentTimeMillis() on every call to next(). * * In general, this should only be used if you expect it to take a considerable amount of time * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, * and you are probably just adding more overhead */ -class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { +class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { private var netMillis = 0l private var nElems = 0 def hasNext = { @@ -26,7 +34,17 @@ class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { r } - def getNetMillis = netMillis - def getAverageTimePerItem = netMillis / nElems.toDouble + def getNetMillis = Some(netMillis) + def getAverageTimePerItem = Some(netMillis / nElems.toDouble) } + +/** + * A TimedIterator which doesn't perform any timing measurements. + */ +class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { + def hasNext = sub.hasNext + def next = sub.next + def getNetMillis = None + def getAverageTimePerItem = None +} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 2f5af10e69..5ccab369db 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis should be > (0l) + sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) |