aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-04-29 11:32:07 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-04-29 11:32:07 -0700
commit540be6b1544d26c7db79ec84a98fc6696c7c6434 (patch)
treeea72de3a8b8ff6de64d36e22333dfe76468ef9cc /core/src/main
parent224fbac0612d5c35259cc9f4963dcd4a65ecc832 (diff)
downloadspark-540be6b1544d26c7db79ec84a98fc6696c7c6434.tar.gz
spark-540be6b1544d26c7db79ec84a98fc6696c7c6434.tar.bz2
spark-540be6b1544d26c7db79ec84a98fc6696c7c6434.zip
Modified version of the fix which just removes all per-record tracking.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala9
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/spark/util/TimedIterator.scala50
3 files changed, 2 insertions, 62 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 83c22b1f14..ce61d27448 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.storage.BlockManagerId
-import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator}
+import util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
@@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
- val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) {
- new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock))
- } else {
- new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock))
- }
+ val itr = blockFetcherItr.flatMap(unpackBlock)
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
- shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 45f6d43971..a7c56c2371 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -49,11 +49,6 @@ class ShuffleReadMetrics extends Serializable {
var localBlocksFetched: Int = _
/**
- * Total time to read shuffle data
- */
- var shuffleReadMillis: Option[Long] = _
-
- /**
* Total time that is spent blocked waiting for shuffle to fetch data
*/
var fetchWaitTime: Long = _
diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala
deleted file mode 100644
index 49f1276b4e..0000000000
--- a/core/src/main/scala/spark/util/TimedIterator.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package spark.util
-
-/**
- * A utility for tracking the the time an iterator takes to iterate through its elements.
- */
-trait TimedIterator {
- def getNetMillis: Option[Long]
- def getAverageTimePerItem: Option[Double]
-}
-
-/**
- * A TimedIterator which uses System.currentTimeMillis() on every call to next().
- *
- * In general, this should only be used if you expect it to take a considerable amount of time
- * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate,
- * and you are probably just adding more overhead
- */
-class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
- private var netMillis = 0l
- private var nElems = 0
- def hasNext = {
- val start = System.currentTimeMillis()
- val r = sub.hasNext
- val end = System.currentTimeMillis()
- netMillis += (end - start)
- r
- }
- def next = {
- val start = System.currentTimeMillis()
- val r = sub.next
- val end = System.currentTimeMillis()
- netMillis += (end - start)
- nElems += 1
- r
- }
-
- def getNetMillis = Some(netMillis)
- def getAverageTimePerItem = Some(netMillis / nElems.toDouble)
-
-}
-
-/**
- * A TimedIterator which doesn't perform any timing measurements.
- */
-class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
- def hasNext = sub.hasNext
- def next = sub.next
- def getNetMillis = None
- def getAverageTimePerItem = None
-}