From ec30188a2ad6325356a5ccfe85cefee6872e8646 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sat, 9 Mar 2013 21:16:53 -0800 Subject: rename remoteFetchWaitTime to fetchWaitTime, since it also includes time from local fetches --- core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 2 +- core/src/main/scala/spark/executor/TaskMetrics.scala | 4 ++-- core/src/main/scala/spark/scheduler/SparkListener.scala | 4 ++-- core/src/main/scala/spark/storage/BlockFetchTracker.scala | 2 +- core/src/main/scala/spark/storage/BlockManager.scala | 6 +++--- core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 53b0389c3a..c27ed36406 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime + shuffleMetrics.fetchWaitTime = itr.fetchWaitTime shuffleMetrics.remoteBytesRead = itr.remoteBytesRead shuffleMetrics.totalBlocksFetched = itr.totalBlocks shuffleMetrics.localBlocksFetched = itr.numLocalBlocks diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index b9c07830f5..93bbb6b458 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable { var shuffleReadMillis: Long = _ /** - * Total time that is spent blocked waiting for shuffle to fetch remote data + * Total time that is spent blocked waiting for shuffle to fetch data */ - var remoteFetchWaitTime: Long = _ + var fetchWaitTime: Long = _ /** * The total amount of time for all the shuffle fetches. This adds up time from overlapping diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 21185227ab..a65140b145 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging { showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) //fetch & io - showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime}) + showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime}) showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead}) showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) @@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble - val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime} + val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime} val fetch = fetchTime.map{_ / denom} val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom val other = 1.0 - (exec + fetch.getOrElse(0d)) diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala index ababb04305..993aece1f7 100644 --- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala +++ b/core/src/main/scala/spark/storage/BlockFetchTracker.scala @@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker { def numLocalBlocks: Int def numRemoteBlocks: Int def remoteFetchTime : Long - def remoteFetchWaitTime: Long + def fetchWaitTime: Long def remoteBytesRead : Long } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 5849045a55..254ce1a4a4 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -903,7 +903,7 @@ class BlockFetcherIterator( private var _remoteBytesRead = 0l private var _remoteFetchTime = 0l - private var _remoteFetchWaitTime = 0l + private var _fetchWaitTime = 0l if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") @@ -1046,7 +1046,7 @@ class BlockFetcherIterator( val startFetchWait = System.currentTimeMillis() val result = results.take() val stopFetchWait = System.currentTimeMillis() - _remoteFetchWaitTime += (stopFetchWait - startFetchWait) + _fetchWaitTime += (stopFetchWait - startFetchWait) bytesInFlight -= result.size while (!fetchRequests.isEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { @@ -1061,7 +1061,7 @@ class BlockFetcherIterator( def numRemoteBlocks = remoteBlockIds.size def remoteFetchTime = _remoteFetchTime - def remoteFetchWaitTime = _remoteFetchWaitTime + def fetchWaitTime = _fetchWaitTime def remoteBytesRead = _remoteBytesRead diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala index 5c491877ba..f6c28dce52 100644 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala @@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { def numLocalBlocks = delegate.numLocalBlocks def numRemoteBlocks = delegate.numRemoteBlocks def remoteFetchTime = delegate.remoteFetchTime - def remoteFetchWaitTime = delegate.remoteFetchWaitTime + def fetchWaitTime = delegate.fetchWaitTime def remoteBytesRead = delegate.remoteBytesRead } -- cgit v1.2.3