aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala104
1 files changed, 68 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index e985b35ace..50bb645d97 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,71 +17,103 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
+ * A collection of accumulators that represent metrics about reading shuffle data.
+ * Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
+class ShuffleReadMetrics private (
+ _remoteBlocksFetched: Accumulator[Int],
+ _localBlocksFetched: Accumulator[Int],
+ _remoteBytesRead: Accumulator[Long],
+ _localBytesRead: Accumulator[Long],
+ _fetchWaitTime: Accumulator[Long],
+ _recordsRead: Accumulator[Long])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
+ }
+
/**
- * Number of remote blocks fetched in this shuffle by this task
+ * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
+ * many places only to merge their values together later. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by
+ * [[TaskMetrics.mergeShuffleReadMetrics]].
*/
- private var _remoteBlocksFetched: Int = _
- def remoteBlocksFetched: Int = _remoteBlocksFetched
- private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
- private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+ private[spark] def this() {
+ this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
+ }
/**
- * Number of local blocks fetched in this shuffle by this task
+ * Number of remote blocks fetched in this shuffle by this task.
*/
- private var _localBlocksFetched: Int = _
- def localBlocksFetched: Int = _localBlocksFetched
- private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
- private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+ def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue
/**
- * Time the task spent waiting for remote shuffle blocks. This only includes the time
- * blocking on shuffle input data. For instance if block B is being fetched while the task is
- * still not finished processing block A, it is not considered to be blocking on block B.
+ * Number of local blocks fetched in this shuffle by this task.
*/
- private var _fetchWaitTime: Long = _
- def fetchWaitTime: Long = _fetchWaitTime
- private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
- private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+ def localBlocksFetched: Int = _localBlocksFetched.localValue
/**
- * Total number of remote bytes read from the shuffle by this task
+ * Total number of remote bytes read from the shuffle by this task.
*/
- private var _remoteBytesRead: Long = _
- def remoteBytesRead: Long = _remoteBytesRead
- private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
- private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+ def remoteBytesRead: Long = _remoteBytesRead.localValue
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
- private var _localBytesRead: Long = _
- def localBytesRead: Long = _localBytesRead
- private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+ def localBytesRead: Long = _localBytesRead.localValue
/**
- * Total bytes fetched in the shuffle by this task (both remote and local).
+ * Time the task spent waiting for remote shuffle blocks. This only includes the time
+ * blocking on shuffle input data. For instance if block B is being fetched while the task is
+ * still not finished processing block A, it is not considered to be blocking on block B.
+ */
+ def fetchWaitTime: Long = _fetchWaitTime.localValue
+
+ /**
+ * Total number of records read from the shuffle by this task.
*/
- def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+ def recordsRead: Long = _recordsRead.localValue
/**
- * Number of blocks fetched in this shuffle by this task (remote or local)
+ * Total bytes fetched in the shuffle by this task (both remote and local).
*/
- def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+ def totalBytesRead: Long = remoteBytesRead + localBytesRead
/**
- * Total number of records read from the shuffle by this task
+ * Number of blocks fetched in this shuffle by this task (remote or local).
*/
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- private[spark] def incRecordsRead(value: Long) = _recordsRead += value
- private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+
+ private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
+ private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
+ private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
+ private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
+ private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
+ private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+
+ private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
+ private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
+ private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
+ private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
+ private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
+ private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+
}