aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-03-09 21:16:53 -0800
committerImran Rashid <imran@quantifind.com>2013-03-09 21:16:53 -0800
commitec30188a2ad6325356a5ccfe85cefee6872e8646 (patch)
treeee9dd6b056632107f0ffc2db914e439bace461cf /core/src
parent9f0dc829cbaa9aa5011fb917010d13ea5e0a19d7 (diff)
downloadspark-ec30188a2ad6325356a5ccfe85cefee6872e8646.tar.gz
spark-ec30188a2ad6325356a5ccfe85cefee6872e8646.tar.bz2
spark-ec30188a2ad6325356a5ccfe85cefee6872e8646.zip
rename remoteFetchWaitTime to fetchWaitTime, since it also includes time from local fetches
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockFetchTracker.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala6
-rw-r--r--core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala2
6 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 53b0389c3a..c27ed36406 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
- shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime
+ shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = itr.totalBlocks
shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index b9c07830f5..93bbb6b458 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable {
var shuffleReadMillis: Long = _
/**
- * Total time that is spent blocked waiting for shuffle to fetch remote data
+ * Total time that is spent blocked waiting for shuffle to fetch data
*/
- var remoteFetchWaitTime: Long = _
+ var fetchWaitTime: Long = _
/**
* The total amount of time for all the shuffle fetches. This adds up time from overlapping
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 21185227ab..a65140b145 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging {
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io
- showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime})
+ showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe
object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime}
+ val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom}
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
index ababb04305..993aece1f7 100644
--- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala
+++ b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker {
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime : Long
- def remoteFetchWaitTime: Long
+ def fetchWaitTime: Long
def remoteBytesRead : Long
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 5849045a55..254ce1a4a4 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -903,7 +903,7 @@ class BlockFetcherIterator(
private var _remoteBytesRead = 0l
private var _remoteFetchTime = 0l
- private var _remoteFetchWaitTime = 0l
+ private var _fetchWaitTime = 0l
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
@@ -1046,7 +1046,7 @@ class BlockFetcherIterator(
val startFetchWait = System.currentTimeMillis()
val result = results.take()
val stopFetchWait = System.currentTimeMillis()
- _remoteFetchWaitTime += (stopFetchWait - startFetchWait)
+ _fetchWaitTime += (stopFetchWait - startFetchWait)
bytesInFlight -= result.size
while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
@@ -1061,7 +1061,7 @@ class BlockFetcherIterator(
def numRemoteBlocks = remoteBlockIds.size
def remoteFetchTime = _remoteFetchTime
- def remoteFetchWaitTime = _remoteFetchWaitTime
+ def fetchWaitTime = _fetchWaitTime
def remoteBytesRead = _remoteBytesRead
diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
index 5c491877ba..f6c28dce52 100644
--- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
+++ b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
def numLocalBlocks = delegate.numLocalBlocks
def numRemoteBlocks = delegate.numRemoteBlocks
def remoteFetchTime = delegate.remoteFetchTime
- def remoteFetchWaitTime = delegate.remoteFetchWaitTime
+ def fetchWaitTime = delegate.fetchWaitTime
def remoteBytesRead = delegate.remoteBytesRead
}