aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-04-29 10:10:14 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-04-29 11:13:43 -0700
commit224fbac0612d5c35259cc9f4963dcd4a65ecc832 (patch)
tree8c287179a6db88364bd5f5fa6d8fda5d8c7cde2f /core
parent1f20ef256715e5a84ba1661e235b6eda21a70b5b (diff)
downloadspark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.tar.gz
spark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.tar.bz2
spark-224fbac0612d5c35259cc9f4963dcd4a65ecc832.zip
Spark-742: TaskMetrics should not employ per-record timing.
This patch does three things: 1. Makes TimedIterator a trait with two implementations (one a no-op) 2. Makes the default behavior to use the no-op implementation 3. Removes DelegateBlockFetchTracker. This is just cleanup, but it seems like the triat doesn't really reduce complexity in any way. In the future we can add other implementations, e.g. ones which perform sampling.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala23
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala2
-rw-r--r--core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala12
-rw-r--r--core/src/main/scala/spark/util/TimedIterator.scala26
-rw-r--r--core/src/test/scala/spark/scheduler/SparkListenerSuite.scala2
5 files changed, 37 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index c27ed36406..83c22b1f14 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import spark.storage.{DelegateBlockFetchTracker, BlockManagerId}
-import util.{CompletionIterator, TimedIterator}
+import spark.storage.BlockManagerId
+import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator}
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
@@ -49,17 +49,20 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
- val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker
- itr.setDelegate(blockFetcherItr)
+ val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) {
+ new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock))
+ } else {
+ new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock))
+ }
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
- shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
- shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
- shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
- shuffleMetrics.totalBlocksFetched = itr.totalBlocks
- shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
- shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks
+ shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
+ shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
+ shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
+ shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
+ shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
+ shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
metrics.shuffleReadMetrics = Some(shuffleMetrics)
})
}
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 93bbb6b458..45f6d43971 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -51,7 +51,7 @@ class ShuffleReadMetrics extends Serializable {
/**
* Total time to read shuffle data
*/
- var shuffleReadMillis: Long = _
+ var shuffleReadMillis: Option[Long] = _
/**
* Total time that is spent blocked waiting for shuffle to fetch data
diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
deleted file mode 100644
index f6c28dce52..0000000000
--- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package spark.storage
-
-private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
- var delegate : BlockFetchTracker = _
- def setDelegate(d: BlockFetchTracker) {delegate = d}
- def totalBlocks = delegate.totalBlocks
- def numLocalBlocks = delegate.numLocalBlocks
- def numRemoteBlocks = delegate.numRemoteBlocks
- def remoteFetchTime = delegate.remoteFetchTime
- def fetchWaitTime = delegate.fetchWaitTime
- def remoteBytesRead = delegate.remoteBytesRead
-}
diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala
index 539b01f4ce..49f1276b4e 100644
--- a/core/src/main/scala/spark/util/TimedIterator.scala
+++ b/core/src/main/scala/spark/util/TimedIterator.scala
@@ -1,13 +1,21 @@
package spark.util
/**
- * A utility for tracking the total time an iterator takes to iterate through its elements.
+ * A utility for tracking the the time an iterator takes to iterate through its elements.
+ */
+trait TimedIterator {
+ def getNetMillis: Option[Long]
+ def getAverageTimePerItem: Option[Double]
+}
+
+/**
+ * A TimedIterator which uses System.currentTimeMillis() on every call to next().
*
* In general, this should only be used if you expect it to take a considerable amount of time
* (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate,
* and you are probably just adding more overhead
*/
-class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] {
+class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
private var netMillis = 0l
private var nElems = 0
def hasNext = {
@@ -26,7 +34,17 @@ class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] {
r
}
- def getNetMillis = netMillis
- def getAverageTimePerItem = netMillis / nElems.toDouble
+ def getNetMillis = Some(netMillis)
+ def getAverageTimePerItem = Some(netMillis / nElems.toDouble)
}
+
+/**
+ * A TimedIterator which doesn't perform any timing measurements.
+ */
+class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
+ def hasNext = sub.hasNext
+ def next = sub.next
+ def getNetMillis = None
+ def getAverageTimePerItem = None
+}
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
index 2f5af10e69..5ccab369db 100644
--- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -57,7 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
- sm.shuffleReadMillis should be > (0l)
+ sm.shuffleReadMillis.get should be > (0l)
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)