From 224fbac0612d5c35259cc9f4963dcd4a65ecc832 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 10:10:14 -0700 Subject: Spark-742: TaskMetrics should not employ per-record timing. This patch does three things: 1. Makes TimedIterator a trait with two implementations (one a no-op) 2. Makes the default behavior to use the no-op implementation 3. Removes DelegateBlockFetchTracker. This is just cleanup, but it seems like the triat doesn't really reduce complexity in any way. In the future we can add other implementations, e.g. ones which perform sampling. --- .../scala/spark/BlockStoreShuffleFetcher.scala | 23 ++++++++++--------- .../main/scala/spark/executor/TaskMetrics.scala | 2 +- .../spark/storage/DelegateBlockFetchTracker.scala | 12 ---------- core/src/main/scala/spark/util/TimedIterator.scala | 26 ++++++++++++++++++---- .../scala/spark/scheduler/SparkListenerSuite.scala | 2 +- 5 files changed, 37 insertions(+), 28 deletions(-) delete mode 100644 core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala (limited to 'core') diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index c27ed36406..83c22b1f14 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.{DelegateBlockFetchTracker, BlockManagerId} -import util.{CompletionIterator, TimedIterator} +import spark.storage.BlockManagerId +import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,17 +49,20 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker - itr.setDelegate(blockFetcherItr) + val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { + new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } else { + new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) + } CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis - shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.fetchWaitTime = itr.fetchWaitTime - shuffleMetrics.remoteBytesRead = itr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = itr.totalBlocks - shuffleMetrics.localBlocksFetched = itr.numLocalBlocks - shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks + shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime + shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime + shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead + shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks + shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks + shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks metrics.shuffleReadMetrics = Some(shuffleMetrics) }) } diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 93bbb6b458..45f6d43971 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -51,7 +51,7 @@ class ShuffleReadMetrics extends Serializable { /** * Total time to read shuffle data */ - var shuffleReadMillis: Long = _ + var shuffleReadMillis: Option[Long] = _ /** * Total time that is spent blocked waiting for shuffle to fetch data diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala deleted file mode 100644 index f6c28dce52..0000000000 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ /dev/null @@ -1,12 +0,0 @@ -package spark.storage - -private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { - var delegate : BlockFetchTracker = _ - def setDelegate(d: BlockFetchTracker) {delegate = d} - def totalBlocks = delegate.totalBlocks - def numLocalBlocks = delegate.numLocalBlocks - def numRemoteBlocks = delegate.numRemoteBlocks - def remoteFetchTime = delegate.remoteFetchTime - def fetchWaitTime = delegate.fetchWaitTime - def remoteBytesRead = delegate.remoteBytesRead -} diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala index 539b01f4ce..49f1276b4e 100644 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ b/core/src/main/scala/spark/util/TimedIterator.scala @@ -1,13 +1,21 @@ package spark.util /** - * A utility for tracking the total time an iterator takes to iterate through its elements. + * A utility for tracking the the time an iterator takes to iterate through its elements. + */ +trait TimedIterator { + def getNetMillis: Option[Long] + def getAverageTimePerItem: Option[Double] +} + +/** + * A TimedIterator which uses System.currentTimeMillis() on every call to next(). * * In general, this should only be used if you expect it to take a considerable amount of time * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, * and you are probably just adding more overhead */ -class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { +class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { private var netMillis = 0l private var nElems = 0 def hasNext = { @@ -26,7 +34,17 @@ class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { r } - def getNetMillis = netMillis - def getAverageTimePerItem = netMillis / nElems.toDouble + def getNetMillis = Some(netMillis) + def getAverageTimePerItem = Some(netMillis / nElems.toDouble) } + +/** + * A TimedIterator which doesn't perform any timing measurements. + */ +class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { + def hasNext = sub.hasNext + def next = sub.next + def getNetMillis = None + def getAverageTimePerItem = None +} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 2f5af10e69..5ccab369db 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis should be > (0l) + sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) -- cgit v1.2.3 From 540be6b1544d26c7db79ec84a98fc6696c7c6434 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 11:32:07 -0700 Subject: Modified version of the fix which just removes all per-record tracking. --- .../scala/spark/BlockStoreShuffleFetcher.scala | 9 +--- .../main/scala/spark/executor/TaskMetrics.scala | 5 --- core/src/main/scala/spark/util/TimedIterator.scala | 50 ---------------------- .../scala/spark/scheduler/SparkListenerSuite.scala | 1 - 4 files changed, 2 insertions(+), 63 deletions(-) delete mode 100644 core/src/main/scala/spark/util/TimedIterator.scala (limited to 'core') diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 83c22b1f14..ce61d27448 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} +import util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { - new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } else { - new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } + val itr = blockFetcherItr.flatMap(unpackBlock) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 45f6d43971..a7c56c2371 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable { */ var localBlocksFetched: Int = _ - /** - * Total time to read shuffle data - */ - var shuffleReadMillis: Option[Long] = _ - /** * Total time that is spent blocked waiting for shuffle to fetch data */ diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala deleted file mode 100644 index 49f1276b4e..0000000000 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ /dev/null @@ -1,50 +0,0 @@ -package spark.util - -/** - * A utility for tracking the the time an iterator takes to iterate through its elements. - */ -trait TimedIterator { - def getNetMillis: Option[Long] - def getAverageTimePerItem: Option[Double] -} - -/** - * A TimedIterator which uses System.currentTimeMillis() on every call to next(). - * - * In general, this should only be used if you expect it to take a considerable amount of time - * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, - * and you are probably just adding more overhead - */ -class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - private var netMillis = 0l - private var nElems = 0 - def hasNext = { - val start = System.currentTimeMillis() - val r = sub.hasNext - val end = System.currentTimeMillis() - netMillis += (end - start) - r - } - def next = { - val start = System.currentTimeMillis() - val r = sub.next - val end = System.currentTimeMillis() - netMillis += (end - start) - nElems += 1 - r - } - - def getNetMillis = Some(netMillis) - def getAverageTimePerItem = Some(netMillis / nElems.toDouble) - -} - -/** - * A TimedIterator which doesn't perform any timing measurements. - */ -class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - def hasNext = sub.hasNext - def next = sub.next - def getNetMillis = None - def getAverageTimePerItem = None -} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 5ccab369db..42a87d8b90 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) -- cgit v1.2.3 From 016ce1fa9c9ebbe45559b1cbd95a3674510fe880 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 29 Apr 2013 12:02:27 -0700 Subject: Using full package name for util --- core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index ce61d27448..2987dbbe58 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.CompletionIterator +import spark.util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { -- cgit v1.2.3